blob: 62498912a5c9bb77a939d99eb3aeb38eb3a3e8db [file] [log] [blame]
Jiri Simsab04fbb22014-06-27 17:38:04 -07001// The implementation of the binary repository interface stores
Bogdan Capritad9281a32014-07-02 14:40:39 -07002// objects identified by object name suffixes using the local file
3// system. Given an object name suffix, the implementation computes an
Jiri Simsab04fbb22014-06-27 17:38:04 -07004// MD5 hash of the suffix and generates the following path in the
Jiri Simsa432cc2e2014-12-08 15:53:38 -08005// local filesystem: /<root_dir>/<dir_1>/.../<dir_n>/<hash>. The root
6// directory and the directory depth are parameters of the
7// implementation. The contents of the directory include the checksum
8// and data for each of the individual parts of the binary, and the
9// name of the object:
Jiri Simsab04fbb22014-06-27 17:38:04 -070010//
Robin Thellendc5856f62014-11-17 10:30:54 -080011// name
Jiri Simsab04fbb22014-06-27 17:38:04 -070012// <part_1>/checksum
13// <part_1>/data
14// ...
15// <part_n>/checksum
16// <part_n>/data
17//
18// TODO(jsimsa): Add an "fsck" method that cleans up existing on-disk
19// repository and provide a command-line flag that identifies whether
20// fsck should run when new repository server process starts up.
21package impl
22
23import (
24 "crypto/md5"
25 "encoding/hex"
Robin Thellende2627892014-11-26 09:34:37 -080026 "encoding/json"
Jiri Simsab04fbb22014-06-27 17:38:04 -070027 "io"
28 "io/ioutil"
29 "os"
30 "path/filepath"
Robin Thellendc5856f62014-11-17 10:30:54 -080031 "strings"
Jiri Simsab04fbb22014-06-27 17:38:04 -070032 "syscall"
33
Jiri Simsa519c5072014-09-17 21:37:57 -070034 "veyron.io/veyron/veyron2/ipc"
35 "veyron.io/veyron/veyron2/services/mgmt/binary"
36 "veyron.io/veyron/veyron2/services/mgmt/repository"
37 "veyron.io/veyron/veyron2/verror"
38 "veyron.io/veyron/veyron2/vlog"
Jiri Simsab04fbb22014-06-27 17:38:04 -070039)
40
Robin Thellend9bc8fcb2014-11-17 10:23:04 -080041// binaryService implements the Binary server interface.
42type binaryService struct {
Jiri Simsab04fbb22014-06-27 17:38:04 -070043 // path is the local filesystem path to the object identified by the
Bogdan Capritad9281a32014-07-02 14:40:39 -070044 // object name suffix.
Jiri Simsab04fbb22014-06-27 17:38:04 -070045 path string
46 // state holds the state shared across different binary repository
47 // invocations.
48 state *state
Robin Thellend9bc8fcb2014-11-17 10:23:04 -080049 // suffix is the name of the binary object.
Jiri Simsab04fbb22014-06-27 17:38:04 -070050 suffix string
51}
52
Jiri Simsab04fbb22014-06-27 17:38:04 -070053var (
Jiri Simsa518ef152014-08-08 18:07:55 -070054 errExists = verror.Existsf("binary already exists")
Tilak Sharma492e8e92014-09-18 10:58:14 -070055 errNotFound = verror.NoExistf("binary not found")
Jiri Simsa518ef152014-08-08 18:07:55 -070056 errInProgress = verror.Internalf("identical upload already in progress")
57 errInvalidParts = verror.BadArgf("invalid number of binary parts")
Bogdan Capritae783dcc2014-11-04 14:16:55 -080058 errInvalidPart = verror.BadArgf("invalid binary part number")
Jiri Simsa518ef152014-08-08 18:07:55 -070059 errOperationFailed = verror.Internalf("operation failed")
Jiri Simsab04fbb22014-06-27 17:38:04 -070060)
61
62// TODO(jsimsa): When VDL supports composite literal constants, remove
63// this definition.
64var MissingPart = binary.PartInfo{
65 Checksum: binary.MissingChecksum,
66 Size: binary.MissingSize,
67}
68
Robin Thellend9bc8fcb2014-11-17 10:23:04 -080069// newBinaryService returns a new Binary service implementation.
70func newBinaryService(state *state, suffix string) *binaryService {
71 return &binaryService{
Bogdan Caprita1c626252014-11-05 14:29:14 -080072 path: state.dir(suffix),
Jiri Simsab04fbb22014-06-27 17:38:04 -070073 state: state,
74 suffix: suffix,
75 }
76}
77
Jiri Simsab04fbb22014-06-27 17:38:04 -070078const bufferLength = 4096
79
Robin Thellende2627892014-11-26 09:34:37 -080080func (i *binaryService) Create(_ ipc.ServerContext, nparts int32, mediaInfo repository.MediaInfo) error {
81 vlog.Infof("%v.Create(%v, %v)", i.suffix, nparts, mediaInfo)
Jiri Simsab04fbb22014-06-27 17:38:04 -070082 if nparts < 1 {
83 return errInvalidParts
84 }
85 parent, perm := filepath.Dir(i.path), os.FileMode(0700)
86 if err := os.MkdirAll(parent, perm); err != nil {
87 vlog.Errorf("MkdirAll(%v, %v) failed: %v", parent, perm, err)
88 return errOperationFailed
89 }
90 prefix := "creating-"
91 tmpDir, err := ioutil.TempDir(parent, prefix)
92 if err != nil {
93 vlog.Errorf("TempDir(%v, %v) failed: %v", parent, prefix, err)
94 return errOperationFailed
95 }
Robin Thellendc5856f62014-11-17 10:30:54 -080096 nameFile := filepath.Join(tmpDir, "name")
97 if err := ioutil.WriteFile(nameFile, []byte(i.suffix), os.FileMode(0600)); err != nil {
98 vlog.Errorf("WriteFile(%q) failed: %v", nameFile)
99 return errOperationFailed
100 }
Robin Thellende2627892014-11-26 09:34:37 -0800101 infoFile := filepath.Join(tmpDir, "mediainfo")
102 jInfo, err := json.Marshal(mediaInfo)
103 if err != nil {
104 vlog.Errorf("json.Marshal(%v) failed: %v", mediaInfo, err)
105 return errOperationFailed
106 }
107 if err := ioutil.WriteFile(infoFile, jInfo, os.FileMode(0600)); err != nil {
108 vlog.Errorf("WriteFile(%q) failed: %v", infoFile, err)
109 return errOperationFailed
110 }
Jiri Simsab04fbb22014-06-27 17:38:04 -0700111 for j := 0; j < int(nparts); j++ {
Bogdan Capritae783dcc2014-11-04 14:16:55 -0800112 partPath, partPerm := generatePartPath(tmpDir, j), os.FileMode(0700)
Jiri Simsab04fbb22014-06-27 17:38:04 -0700113 if err := os.MkdirAll(partPath, partPerm); err != nil {
114 vlog.Errorf("MkdirAll(%v, %v) failed: %v", partPath, partPerm, err)
115 if err := os.RemoveAll(tmpDir); err != nil {
116 vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
117 }
118 return errOperationFailed
119 }
120 }
121 // Use os.Rename() to atomically create the binary directory
122 // structure.
123 if err := os.Rename(tmpDir, i.path); err != nil {
124 defer func() {
125 if err := os.RemoveAll(tmpDir); err != nil {
126 vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
127 }
128 }()
Jiri Simsa518ef152014-08-08 18:07:55 -0700129 if linkErr, ok := err.(*os.LinkError); ok && linkErr.Err == syscall.ENOTEMPTY {
130 return errExists
Jiri Simsab04fbb22014-06-27 17:38:04 -0700131 }
132 vlog.Errorf("Rename(%v, %v) failed: %v", tmpDir, i.path, err)
133 return errOperationFailed
134 }
135 return nil
136}
137
Robin Thellend9bc8fcb2014-11-17 10:23:04 -0800138func (i *binaryService) Delete(context ipc.ServerContext) error {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700139 vlog.Infof("%v.Delete()", i.suffix)
140 if _, err := os.Stat(i.path); err != nil {
141 if os.IsNotExist(err) {
Jiri Simsa518ef152014-08-08 18:07:55 -0700142 return errNotFound
Jiri Simsab04fbb22014-06-27 17:38:04 -0700143 }
144 vlog.Errorf("Stat(%v) failed: %v", i.path, err)
145 return errOperationFailed
146 }
147 // Use os.Rename() to atomically remove the binary directory
148 // structure.
149 path := filepath.Join(filepath.Dir(i.path), "removing-"+filepath.Base(i.path))
Jiri Simsab04fbb22014-06-27 17:38:04 -0700150 if err := os.Rename(i.path, path); err != nil {
151 vlog.Errorf("Rename(%v, %v) failed: %v", i.path, path, err)
152 return errOperationFailed
153 }
154 if err := os.RemoveAll(path); err != nil {
155 vlog.Errorf("Remove(%v) failed: %v", path, err)
156 return errOperationFailed
157 }
158 for {
159 // Remove the binary and all directories on the path back to the
Jiri Simsa432cc2e2014-12-08 15:53:38 -0800160 // root directory that are left empty after the binary is removed.
Jiri Simsab04fbb22014-06-27 17:38:04 -0700161 path = filepath.Dir(path)
Jiri Simsa432cc2e2014-12-08 15:53:38 -0800162 if i.state.rootDir == path {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700163 break
164 }
165 if err := os.Remove(path); err != nil {
166 if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
167 break
168 }
169 vlog.Errorf("Remove(%v) failed: %v", path, err)
170 return errOperationFailed
171 }
172 }
173 return nil
174}
175
Robin Thellend9bc8fcb2014-11-17 10:23:04 -0800176func (i *binaryService) Download(context repository.BinaryDownloadContext, part int32) error {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700177 vlog.Infof("%v.Download(%v)", i.suffix, part)
178 path := i.generatePartPath(int(part))
Bogdan Capritae783dcc2014-11-04 14:16:55 -0800179 if err := checksumExists(path); err != nil {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700180 return err
181 }
182 dataPath := filepath.Join(path, data)
183 file, err := os.Open(dataPath)
184 if err != nil {
Bogdan Capritae783dcc2014-11-04 14:16:55 -0800185 vlog.Errorf("Open(%v) failed: %v", dataPath, err)
Jiri Simsab04fbb22014-06-27 17:38:04 -0700186 return errOperationFailed
187 }
188 defer file.Close()
189 buffer := make([]byte, bufferLength)
Todd Wang702385a2014-11-07 01:54:08 -0800190 sender := context.SendStream()
Jiri Simsab04fbb22014-06-27 17:38:04 -0700191 for {
192 n, err := file.Read(buffer)
193 if err != nil && err != io.EOF {
194 vlog.Errorf("Read() failed: %v", err)
195 return errOperationFailed
196 }
197 if n == 0 {
198 break
199 }
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700200 if err := sender.Send(buffer[:n]); err != nil {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700201 vlog.Errorf("Send() failed: %v", err)
202 return errOperationFailed
203 }
204 }
205 return nil
206}
207
Jiri Simsa432cc2e2014-12-08 15:53:38 -0800208// TODO(jsimsa): Design and implement an access control mechanism for
209// the URL-based downloads.
Robin Thellend9bc8fcb2014-11-17 10:23:04 -0800210func (i *binaryService) DownloadURL(ipc.ServerContext) (string, int64, error) {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700211 vlog.Infof("%v.DownloadURL()", i.suffix)
Jiri Simsa432cc2e2014-12-08 15:53:38 -0800212 return i.state.rootURL + "/" + i.suffix, 0, nil
Jiri Simsab04fbb22014-06-27 17:38:04 -0700213}
214
Robin Thellende2627892014-11-26 09:34:37 -0800215func (i *binaryService) Stat(ipc.ServerContext) ([]binary.PartInfo, repository.MediaInfo, error) {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700216 vlog.Infof("%v.Stat()", i.suffix)
217 result := make([]binary.PartInfo, 0)
Bogdan Capritae783dcc2014-11-04 14:16:55 -0800218 parts, err := getParts(i.path)
Jiri Simsab04fbb22014-06-27 17:38:04 -0700219 if err != nil {
Robin Thellende2627892014-11-26 09:34:37 -0800220 return []binary.PartInfo{}, repository.MediaInfo{}, err
Jiri Simsab04fbb22014-06-27 17:38:04 -0700221 }
222 for _, part := range parts {
223 checksumFile := filepath.Join(part, checksum)
224 bytes, err := ioutil.ReadFile(checksumFile)
225 if err != nil {
226 if os.IsNotExist(err) {
227 result = append(result, MissingPart)
228 continue
229 }
230 vlog.Errorf("ReadFile(%v) failed: %v", checksumFile, err)
Robin Thellende2627892014-11-26 09:34:37 -0800231 return []binary.PartInfo{}, repository.MediaInfo{}, errOperationFailed
Jiri Simsab04fbb22014-06-27 17:38:04 -0700232 }
233 dataFile := filepath.Join(part, data)
234 fi, err := os.Stat(dataFile)
235 if err != nil {
236 if os.IsNotExist(err) {
237 result = append(result, MissingPart)
238 continue
239 }
240 vlog.Errorf("Stat(%v) failed: %v", dataFile, err)
Robin Thellende2627892014-11-26 09:34:37 -0800241 return []binary.PartInfo{}, repository.MediaInfo{}, errOperationFailed
Jiri Simsab04fbb22014-06-27 17:38:04 -0700242 }
243 result = append(result, binary.PartInfo{Checksum: string(bytes), Size: fi.Size()})
244 }
Robin Thellende2627892014-11-26 09:34:37 -0800245 infoFile := filepath.Join(i.path, "mediainfo")
246 jInfo, err := ioutil.ReadFile(infoFile)
247 if err != nil {
248 vlog.Errorf("ReadFile(%q) failed: %v", infoFile)
249 return []binary.PartInfo{}, repository.MediaInfo{}, errOperationFailed
250 }
251 var mediaInfo repository.MediaInfo
252 if err := json.Unmarshal(jInfo, &mediaInfo); err != nil {
253 vlog.Errorf("json.Unmarshal(%v) failed: %v", jInfo, err)
254 return []binary.PartInfo{}, repository.MediaInfo{}, errOperationFailed
255 }
256 return result, mediaInfo, nil
Jiri Simsab04fbb22014-06-27 17:38:04 -0700257}
258
Robin Thellend9bc8fcb2014-11-17 10:23:04 -0800259func (i *binaryService) Upload(context repository.BinaryUploadContext, part int32) error {
Jiri Simsab04fbb22014-06-27 17:38:04 -0700260 vlog.Infof("%v.Upload(%v)", i.suffix, part)
261 path, suffix := i.generatePartPath(int(part)), ""
Bogdan Capritae783dcc2014-11-04 14:16:55 -0800262 err := checksumExists(path)
Jiri Simsab04fbb22014-06-27 17:38:04 -0700263 switch err {
264 case nil:
Jiri Simsa518ef152014-08-08 18:07:55 -0700265 return errExists
266 case errNotFound:
Jiri Simsab04fbb22014-06-27 17:38:04 -0700267 default:
268 return err
269 }
270 // Use os.OpenFile() to resolve races.
271 lockPath, flags, perm := filepath.Join(path, lock), os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(0600)
272 lockFile, err := os.OpenFile(lockPath, flags, perm)
273 if err != nil {
274 if os.IsExist(err) {
275 return errInProgress
276 }
277 vlog.Errorf("OpenFile(%v, %v, %v) failed: %v", lockPath, flags, suffix, err)
278 return errOperationFailed
279 }
280 defer os.Remove(lockFile.Name())
281 defer lockFile.Close()
282 file, err := ioutil.TempFile(path, suffix)
283 if err != nil {
284 vlog.Errorf("TempFile(%v, %v) failed: %v", path, suffix, err)
285 return errOperationFailed
286 }
287 defer file.Close()
288 h := md5.New()
Todd Wang702385a2014-11-07 01:54:08 -0800289 rStream := context.RecvStream()
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700290 for rStream.Advance() {
291 bytes := rStream.Value()
Jiri Simsab04fbb22014-06-27 17:38:04 -0700292 if _, err := file.Write(bytes); err != nil {
293 vlog.Errorf("Write() failed: %v", err)
294 if err := os.Remove(file.Name()); err != nil {
295 vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
296 }
297 return errOperationFailed
298 }
299 h.Write(bytes)
300 }
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700301
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700302 if err := rStream.Err(); err != nil {
303 vlog.Errorf("Advance() failed: %v", err)
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700304 if err := os.Remove(file.Name()); err != nil {
305 vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
306 }
307 return errOperationFailed
308 }
309
Jiri Simsab04fbb22014-06-27 17:38:04 -0700310 hash := hex.EncodeToString(h.Sum(nil))
311 checksumFile, perm := filepath.Join(path, checksum), os.FileMode(0600)
312 if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
313 vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
314 if err := os.Remove(file.Name()); err != nil {
315 vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
316 }
317 return errOperationFailed
318 }
Jiri Simsab04fbb22014-06-27 17:38:04 -0700319 dataFile := filepath.Join(path, data)
320 if err := os.Rename(file.Name(), dataFile); err != nil {
321 vlog.Errorf("Rename(%v, %v) failed: %v", file.Name(), dataFile, err)
322 if err := os.Remove(file.Name()); err != nil {
323 vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
324 }
325 return errOperationFailed
326 }
Jiri Simsab04fbb22014-06-27 17:38:04 -0700327 return nil
328}
Robin Thellendc5856f62014-11-17 10:30:54 -0800329
Robin Thellend39ac3232014-12-02 09:50:41 -0800330func (i *binaryService) GlobChildren__(ipc.ServerContext) (<-chan string, error) {
Robin Thellendc5856f62014-11-17 10:30:54 -0800331 elems := strings.Split(i.suffix, "/")
332 if len(elems) == 1 && elems[0] == "" {
333 elems = nil
334 }
335 n := i.createObjectNameTree().find(elems, false)
336 if n == nil {
337 return nil, errOperationFailed
338 }
Robin Thellend8e9cc242014-11-26 09:43:10 -0800339 ch := make(chan string, 100)
340 go func() {
341 for k, _ := range n.children {
342 ch <- k
343 }
344 close(ch)
345 }()
346 return ch, nil
Robin Thellendc5856f62014-11-17 10:30:54 -0800347}