| // The implementation of the binary repository interface stores |
| // objects identified by veyron name suffixes using the local file |
| // system. Given a veyron name suffix, the implementation computes an |
| // MD5 hash of the suffix and generates the following path in the |
| // local filesystem: /<root>/<dir_1>/.../<dir_n>/<hash>. The root and |
| // the directory depth are parameters of the implementation. The |
| // contents of the directory include the checksum and data for the |
| // object and each of its individual parts: |
| // |
| // checksum |
| // data |
| // <part_1>/checksum |
| // <part_1>/data |
| // ... |
| // <part_n>/checksum |
| // <part_n>/data |
| // |
| // TODO(jsimsa): Add an "fsck" method that cleans up existing on-disk |
| // repository and provide a command-line flag that identifies whether |
| // fsck should run when new repository server process starts up. |
| package impl |
| |
| import ( |
| "crypto/md5" |
| "encoding/hex" |
| "errors" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "strconv" |
| "syscall" |
| |
| "veyron2/ipc" |
| "veyron2/services/mgmt/binary" |
| "veyron2/services/mgmt/repository" |
| "veyron2/vlog" |
| ) |
| |
| // state holds the state shared across different binary repository |
| // invocations. |
| type state struct { |
| // depth determines the depth of the directory hierarchy that the |
| // binary repository uses to organize binaries in the local file |
| // system. There is a trade-off here: smaller values lead to faster |
| // access, while higher values allow the performance to scale to |
| // larger collections of binaries. The number should be a value |
| // between 0 and (md5.Size - 1). |
| // |
| // Note that the cardinality of each level (except the leaf level) |
| // is at most 256. If you expect to have X total binary items, and |
| // you want to limit directories to at most Y entries (because of |
| // filesystem limitations), then you should set depth to at least |
| // log_256(X/Y). For example, using hierarchyDepth = 3 with a local |
| // filesystem that can handle up to 1,000 entries per directory |
| // before its performance degrades allows the binary repository to |
| // store 16B objects. |
| depth int |
| // root identifies the local filesystem directory in which the |
| // binary repository stores its objects. |
| root string |
| } |
| |
| // invoker holds the state of a binary repository invocation. |
| type invoker struct { |
| // path is the local filesystem path to the object identified by the |
| // veyron name suffix. |
| path string |
| // state holds the state shared across different binary repository |
| // invocations. |
| state *state |
| // suffix is the suffix of the current invocation that is assumed to |
| // be used as a relative veyron name to identify a binary. |
| suffix string |
| } |
| |
| const ( |
| checksum = "checksum" |
| data = "data" |
| lock = "lock" |
| ) |
| |
| var ( |
| errExist = errors.New("binary already exists") |
| errNotExist = errors.New("binary does not exist") |
| errInProgress = errors.New("identical upload already in progress") |
| errInvalidParts = errors.New("invalid number of parts") |
| errOperationFailed = errors.New("operation failed") |
| ) |
| |
| // TODO(jsimsa): When VDL supports composite literal constants, remove |
| // this definition. |
| var MissingPart = binary.PartInfo{ |
| Checksum: binary.MissingChecksum, |
| Size: binary.MissingSize, |
| } |
| |
| // newInvoker is the invoker factory. |
| func newInvoker(state *state, suffix string) *invoker { |
| // Generate the local filesystem path for the object identified by |
| // the veyron name suffix. |
| h := md5.New() |
| h.Write([]byte(suffix)) |
| hash := hex.EncodeToString(h.Sum(nil)) |
| dir := "" |
| for j := 0; j < state.depth; j++ { |
| dir = filepath.Join(dir, hash[j*2:(j+1)*2]) |
| } |
| path := filepath.Join(state.root, dir, hash) |
| return &invoker{ |
| path: path, |
| state: state, |
| suffix: suffix, |
| } |
| } |
| |
| // BINARY INTERFACE IMPLEMENTATION |
| |
| const bufferLength = 4096 |
| |
| // consolidate checks if all parts of a binary have been uploaded and |
| // if so, creates the aggregate binary and checksum. |
| func (i *invoker) consolidate() error { |
| // Coordinate concurrent uploaders to make sure the aggregate binary |
| // and checksum is created only once. |
| err := i.checksumExists(i.path) |
| switch err { |
| case nil: |
| return nil |
| case errNotExist: |
| default: |
| return err |
| } |
| // Check if all parts have been uploaded. |
| parts, err := i.getParts() |
| if err != nil { |
| return err |
| } |
| for _, part := range parts { |
| err := i.checksumExists(part) |
| switch err { |
| case errNotExist: |
| return nil |
| case nil: |
| default: |
| return err |
| } |
| } |
| // Create the aggregate binary and its checksum. |
| suffix := "" |
| output, err := ioutil.TempFile(i.path, suffix) |
| if err != nil { |
| vlog.Errorf("TempFile(%v, %v) failed: %v", i.path, suffix, err) |
| return errOperationFailed |
| } |
| defer output.Close() |
| buffer, h := make([]byte, bufferLength), md5.New() |
| for _, part := range parts { |
| input, err := os.Open(filepath.Join(part, data)) |
| if err != nil { |
| vlog.Errorf("Open(%v) failed: %v", filepath.Join(part, data), err) |
| if err := os.Remove(output.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", output.Name(), err) |
| } |
| return errOperationFailed |
| } |
| defer input.Close() |
| for { |
| n, err := input.Read(buffer) |
| if err != nil && err != io.EOF { |
| vlog.Errorf("Read() failed: %v", err) |
| if err := os.Remove(output.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", output.Name(), err) |
| } |
| return errOperationFailed |
| } |
| if n == 0 { |
| break |
| } |
| if _, err := output.Write(buffer[:n]); err != nil { |
| vlog.Errorf("Write() failed: %v", err) |
| if err := os.Remove(output.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", output.Name(), err) |
| } |
| return errOperationFailed |
| } |
| h.Write(buffer[:n]) |
| } |
| } |
| hash := hex.EncodeToString(h.Sum(nil)) |
| checksumFile, perm := filepath.Join(i.path, checksum), os.FileMode(0600) |
| if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil { |
| vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err) |
| if err := os.Remove(output.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", output.Name(), err) |
| } |
| return errOperationFailed |
| } |
| dataFile := filepath.Join(i.path, data) |
| if err := os.Rename(output.Name(), dataFile); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", output.Name(), dataFile, err) |
| if err := os.Remove(output.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", output.Name(), err) |
| } |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| // checksumExists checks whether the given path contains a |
| // checksum. The implementation uses the existence of checksum to |
| // determine whether the binary (part) identified by the given path |
| // exists. |
| func (i *invoker) checksumExists(path string) error { |
| checksumFile := filepath.Join(path, checksum) |
| _, err := os.Stat(checksumFile) |
| switch { |
| case os.IsNotExist(err): |
| return errNotExist |
| case err != nil: |
| vlog.Errorf("Stat(%v) failed: %v", path, err) |
| return errOperationFailed |
| default: |
| return nil |
| } |
| } |
| |
| // generatePartPath generates a path for the given binary part. |
| func (i *invoker) generatePartPath(part int) string { |
| return filepath.Join(i.path, fmt.Sprintf("%d", part)) |
| } |
| |
| // getParts returns a collection of paths to the parts of the binary. |
| func (i *invoker) getParts() ([]string, error) { |
| infos, err := ioutil.ReadDir(i.path) |
| if err != nil { |
| vlog.Errorf("ReadDir(%v) failed: %v", i.path, err) |
| return []string{}, errOperationFailed |
| } |
| n := 0 |
| result := make([]string, len(infos)) |
| for _, info := range infos { |
| if info.IsDir() { |
| idx, err := strconv.Atoi(info.Name()) |
| if err != nil { |
| vlog.Errorf("Atoi(%v) failed: %v", info.Name(), err) |
| return []string{}, errOperationFailed |
| } |
| result[idx] = filepath.Join(i.path, info.Name()) |
| n++ |
| } |
| } |
| result = result[:n] |
| return result, nil |
| } |
| |
| func (i *invoker) Create(_ ipc.ServerContext, nparts int32) error { |
| vlog.Infof("%v.Create(%v)", i.suffix, nparts) |
| if nparts < 1 { |
| return errInvalidParts |
| } |
| parent, perm := filepath.Dir(i.path), os.FileMode(0700) |
| if err := os.MkdirAll(parent, perm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", parent, perm, err) |
| return errOperationFailed |
| } |
| prefix := "creating-" |
| tmpDir, err := ioutil.TempDir(parent, prefix) |
| if err != nil { |
| vlog.Errorf("TempDir(%v, %v) failed: %v", parent, prefix, err) |
| return errOperationFailed |
| } |
| for j := 0; j < int(nparts); j++ { |
| partPath, partPerm := filepath.Join(tmpDir, fmt.Sprintf("%d", j)), os.FileMode(0700) |
| if err := os.MkdirAll(partPath, partPerm); err != nil { |
| vlog.Errorf("MkdirAll(%v, %v) failed: %v", partPath, partPerm, err) |
| if err := os.RemoveAll(tmpDir); err != nil { |
| vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err) |
| } |
| return errOperationFailed |
| } |
| } |
| // Use os.Rename() to atomically create the binary directory |
| // structure. |
| if err := os.Rename(tmpDir, i.path); err != nil { |
| defer func() { |
| if err := os.RemoveAll(tmpDir); err != nil { |
| vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err) |
| } |
| }() |
| if os.IsExist(err) { |
| return errExist |
| } |
| vlog.Errorf("Rename(%v, %v) failed: %v", tmpDir, i.path, err) |
| return errOperationFailed |
| } |
| return nil |
| } |
| |
| func (i *invoker) Delete(context ipc.ServerContext) error { |
| vlog.Infof("%v.Delete()", i.suffix) |
| if _, err := os.Stat(i.path); err != nil { |
| if os.IsNotExist(err) { |
| return errNotExist |
| } |
| vlog.Errorf("Stat(%v) failed: %v", i.path, err) |
| return errOperationFailed |
| } |
| // Use os.Rename() to atomically remove the binary directory |
| // structure. |
| path := filepath.Join(filepath.Dir(i.path), "removing-"+filepath.Base(i.path)) |
| if err := os.Rename(i.path, path); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", i.path, path, err) |
| return errOperationFailed |
| } |
| if err := os.RemoveAll(path); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| for { |
| // Remove the binary and all directories on the path back to the |
| // root that are left empty after the binary is removed. |
| path = filepath.Dir(path) |
| if i.state.root == path { |
| break |
| } |
| if err := os.Remove(path); err != nil { |
| if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() { |
| break |
| } |
| vlog.Errorf("Remove(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| } |
| return nil |
| } |
| |
| func (i *invoker) Download(context ipc.ServerContext, part int32, stream repository.BinaryServiceDownloadStream) error { |
| vlog.Infof("%v.Download(%v)", i.suffix, part) |
| path := i.generatePartPath(int(part)) |
| if err := i.checksumExists(path); err != nil { |
| return err |
| } |
| dataPath := filepath.Join(path, data) |
| file, err := os.Open(dataPath) |
| if err != nil { |
| vlog.Errorf("Open(%v) failed: %v", path, err) |
| return errOperationFailed |
| } |
| defer file.Close() |
| buffer := make([]byte, bufferLength) |
| for { |
| n, err := file.Read(buffer) |
| if err != nil && err != io.EOF { |
| vlog.Errorf("Read() failed: %v", err) |
| return errOperationFailed |
| } |
| if n == 0 { |
| break |
| } |
| if err := stream.Send(buffer[:n]); err != nil { |
| vlog.Errorf("Send() failed: %v", err) |
| return errOperationFailed |
| } |
| } |
| return nil |
| } |
| |
| func (i *invoker) DownloadURL(ipc.ServerContext) (string, int64, error) { |
| vlog.Infof("%v.DownloadURL()", i.suffix) |
| // TODO(jsimsa): Implement. |
| return "", 0, nil |
| } |
| |
| func (i *invoker) Stat(ipc.ServerContext) ([]binary.PartInfo, error) { |
| vlog.Infof("%v.Stat()", i.suffix) |
| result := make([]binary.PartInfo, 0) |
| parts, err := i.getParts() |
| if err != nil { |
| return []binary.PartInfo{}, err |
| } |
| for _, part := range parts { |
| checksumFile := filepath.Join(part, checksum) |
| bytes, err := ioutil.ReadFile(checksumFile) |
| if err != nil { |
| if os.IsNotExist(err) { |
| result = append(result, MissingPart) |
| continue |
| } |
| vlog.Errorf("ReadFile(%v) failed: %v", checksumFile, err) |
| return []binary.PartInfo{}, errOperationFailed |
| } |
| dataFile := filepath.Join(part, data) |
| fi, err := os.Stat(dataFile) |
| if err != nil { |
| if os.IsNotExist(err) { |
| result = append(result, MissingPart) |
| continue |
| } |
| vlog.Errorf("Stat(%v) failed: %v", dataFile, err) |
| return []binary.PartInfo{}, errOperationFailed |
| } |
| result = append(result, binary.PartInfo{Checksum: string(bytes), Size: fi.Size()}) |
| } |
| return result, nil |
| } |
| |
| func (i *invoker) Upload(context ipc.ServerContext, part int32, stream repository.BinaryServiceUploadStream) error { |
| vlog.Infof("%v.Upload(%v)", i.suffix, part) |
| path, suffix := i.generatePartPath(int(part)), "" |
| err := i.checksumExists(path) |
| switch err { |
| case nil: |
| return errExist |
| case errNotExist: |
| default: |
| return err |
| } |
| // Use os.OpenFile() to resolve races. |
| lockPath, flags, perm := filepath.Join(path, lock), os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(0600) |
| lockFile, err := os.OpenFile(lockPath, flags, perm) |
| if err != nil { |
| if os.IsExist(err) { |
| return errInProgress |
| } |
| vlog.Errorf("OpenFile(%v, %v, %v) failed: %v", lockPath, flags, suffix, err) |
| return errOperationFailed |
| } |
| defer os.Remove(lockFile.Name()) |
| defer lockFile.Close() |
| file, err := ioutil.TempFile(path, suffix) |
| if err != nil { |
| vlog.Errorf("TempFile(%v, %v) failed: %v", path, suffix, err) |
| return errOperationFailed |
| } |
| defer file.Close() |
| h := md5.New() |
| for { |
| bytes, err := stream.Recv() |
| if err != nil && err != io.EOF { |
| vlog.Errorf("Recv() failed: %v", err) |
| if err := os.Remove(file.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", file.Name(), err) |
| } |
| return errOperationFailed |
| } |
| if err == io.EOF { |
| break |
| } |
| if _, err := file.Write(bytes); err != nil { |
| vlog.Errorf("Write() failed: %v", err) |
| if err := os.Remove(file.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", file.Name(), err) |
| } |
| return errOperationFailed |
| } |
| h.Write(bytes) |
| } |
| hash := hex.EncodeToString(h.Sum(nil)) |
| checksumFile, perm := filepath.Join(path, checksum), os.FileMode(0600) |
| if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil { |
| vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err) |
| if err := os.Remove(file.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", file.Name(), err) |
| } |
| return errOperationFailed |
| } |
| dataFile := filepath.Join(path, data) |
| if err := os.Rename(file.Name(), dataFile); err != nil { |
| vlog.Errorf("Rename(%v, %v) failed: %v", file.Name(), dataFile, err) |
| if err := os.Remove(file.Name()); err != nil { |
| vlog.Errorf("Remove(%v) failed: %v", file.Name(), err) |
| } |
| return errOperationFailed |
| } |
| return nil |
| } |