veyron2/services/mgmt/binary: binary API
Change-Id: I62b6d20686c7cab22a122eaa1ca96a1b614cf915
diff --git a/services/mgmt/binary/binaryd/main.go b/services/mgmt/binary/binaryd/main.go
new file mode 100644
index 0000000..49a6b05
--- /dev/null
+++ b/services/mgmt/binary/binaryd/main.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "flag"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "veyron/lib/signals"
+ vflag "veyron/security/flag"
+
+ "veyron/services/mgmt/binary/impl"
+
+ "veyron2/rt"
+ "veyron2/vlog"
+)
+
+const (
+ defaultDepth = 3
+ defaultRootPrefix = "veyron_binary_repository"
+)
+
+func main() {
+ var address, protocol, name, root string
+ // TODO(rthellend): Remove the address and protocol flags when the config manager is working.
+ flag.StringVar(&address, "address", "localhost:0", "network address to listen on")
+ flag.StringVar(&name, "name", "", "name to mount the binary repository as")
+ flag.StringVar(&protocol, "protocol", "tcp", "network type to listen on")
+ flag.StringVar(&root, "root", "", "root directory for the binary repository")
+ flag.Parse()
+ if root == "" {
+ var err error
+ if root, err = ioutil.TempDir("", defaultRootPrefix); err != nil {
+ vlog.Errorf("TempDir() failed: %v\n", err)
+ return
+ }
+ path, perm := filepath.Join(root, impl.VersionFile), os.FileMode(0600)
+ if err := ioutil.WriteFile(path, []byte(impl.Version), perm); err != nil {
+ vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", path, impl.Version, perm, err)
+ return
+ }
+ } else {
+ _, err := os.Stat(root)
+ switch {
+ case err == nil:
+ case os.IsNotExist(err):
+ perm := os.FileMode(0700)
+ if err := os.MkdirAll(root, perm); err != nil {
+ vlog.Errorf("MkdirAll(%v, %v) failed: %v", root, perm, err)
+ return
+ }
+ path, perm := filepath.Join(root, impl.VersionFile), os.FileMode(0600)
+ if err := ioutil.WriteFile(path, []byte(impl.Version), perm); err != nil {
+ vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", path, impl.Version, perm, err)
+ return
+ }
+ default:
+ vlog.Errorf("Stat(%v) failed: %v", root, err)
+ return
+ }
+ }
+ vlog.Infof("Binary repository rooted at %v", root)
+ runtime := rt.Init()
+ defer runtime.Shutdown()
+ server, err := runtime.NewServer()
+ if err != nil {
+ vlog.Errorf("NewServer() failed: %v", err)
+ return
+ }
+ defer server.Stop()
+ auth := vflag.NewAuthorizerOrDie()
+ dispatcher, err := impl.NewDispatcher(root, defaultDepth, auth)
+ if err != nil {
+ vlog.Errorf("NewDispatcher(%v, %v, %v) failed: %v", root, defaultDepth, auth, err)
+ return
+ }
+ suffix := ""
+ if err := server.Register(suffix, dispatcher); err != nil {
+ vlog.Errorf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
+ return
+ }
+ endpoint, err := server.Listen(protocol, address)
+ if err != nil {
+ vlog.Errorf("Listen(%v, %v) failed: %v", protocol, address, err)
+ return
+ }
+ if err := server.Publish(name); err != nil {
+ vlog.Errorf("Publish(%v) failed: %v", name, err)
+ return
+ }
+ vlog.Infof("Binary repository published at %v/%v", endpoint, name)
+
+ // Wait until shutdown.
+ <-signals.ShutdownOnSignals()
+}
diff --git a/services/mgmt/binary/impl/dispatcher.go b/services/mgmt/binary/impl/dispatcher.go
new file mode 100644
index 0000000..de10a33
--- /dev/null
+++ b/services/mgmt/binary/impl/dispatcher.go
@@ -0,0 +1,56 @@
+package impl
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "veyron2/ipc"
+ "veyron2/security"
+ "veyron2/services/mgmt/repository"
+)
+
+const (
+ VersionFile = "VERSION"
+ Version = "1.0"
+)
+
+// dispatcher holds the state of the binary repository dispatcher.
+type dispatcher struct {
+ auth security.Authorizer
+ state *state
+}
+
+// newDispatcher is the dispatcher factory.
+func NewDispatcher(root string, depth int, authorizer security.Authorizer) (*dispatcher, error) {
+ if min, max := 0, md5.Size-1; min > depth || depth > max {
+ return nil, fmt.Errorf("Unexpected depth, expected a value between %v and %v, got %v", min, max, depth)
+ }
+ if _, err := os.Stat(root); err != nil {
+ return nil, fmt.Errorf("Stat(%v) failed: %v", root, err)
+ }
+ path := filepath.Join(root, VersionFile)
+ output, err := ioutil.ReadFile(path)
+ if err != nil {
+ return nil, fmt.Errorf("ReadFile(%v) failed: %v", path, err)
+ }
+ if expected, got := Version, string(output); expected != got {
+ return nil, fmt.Errorf("Unexpected version: expected %v, got %v", expected, got)
+ }
+ return &dispatcher{
+ auth: authorizer,
+ state: &state{
+ depth: depth,
+ root: root,
+ },
+ }, nil
+}
+
+// DISPATCHER INTERFACE IMPLEMENTATION
+
+func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ invoker := ipc.ReflectInvoker(repository.NewServerBinary(newInvoker(d.state, suffix)))
+ return invoker, d.auth, nil
+}
diff --git a/services/mgmt/binary/impl/impl_test.go b/services/mgmt/binary/impl/impl_test.go
new file mode 100644
index 0000000..b547171
--- /dev/null
+++ b/services/mgmt/binary/impl/impl_test.go
@@ -0,0 +1,371 @@
+package impl
+
+import (
+ "bytes"
+ "crypto/md5"
+ "encoding/hex"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "strconv"
+ "testing"
+ "time"
+
+ "veyron2/naming"
+ "veyron2/rt"
+ "veyron2/services/mgmt/repository"
+ "veyron2/vlog"
+)
+
+const (
+ seedEnv = "VEYRON_RNG_SEED"
+ veyronPrefix = "veyron_binary_repository"
+)
+
+var (
+ random []byte
+ rnd *rand.Rand
+)
+
+func init() {
+ rt.Init()
+ // Initialize pseudo-random number generator.
+ seed := time.Now().UnixNano()
+ seedString := os.Getenv(seedEnv)
+ if seedString != "" {
+ var err error
+ base, bitSize := 0, 64
+ seed, err = strconv.ParseInt(seedString, 0, 64)
+ if err != nil {
+ vlog.Fatalf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err)
+ }
+ }
+ vlog.VI(0).Infof("Using pseudo-random number generator seed = %v", seed)
+ rnd = rand.New(rand.NewSource(seed))
+}
+
+// invokeUpload invokes the Upload RPC using the given client binary
+// <binary> and streams the given binary <binary> to it.
+func invokeUpload(t *testing.T, binary repository.Binary, data []byte, part int32) error {
+ stream, err := binary.Upload(rt.R().NewContext(), part)
+ if err != nil {
+ t.Errorf("Upload() failed: %v", err)
+ return err
+ }
+ if err := stream.Send(data); err != nil {
+ if err := stream.Finish(); err != nil {
+ t.Logf("Finish() failed: %v", err)
+ }
+ t.Logf("Send() failed: %v", err)
+ return err
+ }
+ if err := stream.CloseSend(); err != nil {
+ if err := stream.Finish(); err != nil {
+ t.Logf("Finish() failed: %v", err)
+ }
+ t.Logf("CloseSend() failed: %v", err)
+ return err
+ }
+ if err := stream.Finish(); err != nil {
+ t.Logf("Finish() failed: %v", err)
+ return err
+ }
+ return nil
+}
+
+// invokeDownload invokes the Download RPC using the given client binary
+// <binary> and streams binary from to it.
+func invokeDownload(t *testing.T, binary repository.Binary, part int32) ([]byte, error) {
+ stream, err := binary.Download(rt.R().NewContext(), part)
+ if err != nil {
+ t.Errorf("Download() failed: %v", err)
+ return nil, err
+ }
+ output := make([]byte, 0)
+ for {
+ bytes, err := stream.Recv()
+ if err != nil && err != io.EOF {
+ if err := stream.Finish(); err != nil {
+ t.Logf("Finish() failed: %v", err)
+ }
+ t.Logf("Recv() failed: %v", err)
+ return nil, err
+ }
+ if err == io.EOF {
+ break
+ }
+ output = append(output, bytes...)
+ }
+ if err := stream.Finish(); err != nil {
+ t.Logf("Finish() failed: %v", err)
+ return nil, err
+ }
+ return output, nil
+}
+
+func generateBits(size int) []byte {
+ buffer := make([]byte, size)
+ offset := 0
+ for {
+ bits := int64(rnd.Int63())
+ for i := 0; i < 8; i++ {
+ buffer[offset] = byte(bits & 0xff)
+ size--
+ if size == 0 {
+ return buffer
+ }
+ offset++
+ bits >>= 8
+ }
+ }
+}
+
+func randomBytes(size int) []byte {
+ buffer := make([]byte, size)
+ // Generate a 4MB of random bytes since that is a value commonly
+ // used in this test.
+ if len(random) == 0 {
+ random = generateBits(4 << 20)
+ }
+ if size > len(random) {
+ extra := generateBits(size - len(random))
+ random = append(random, extra...)
+ }
+ start := rnd.Intn(len(random) - size + 1)
+ copy(buffer, random[start:start+size])
+ return buffer
+}
+
+// startServer starts the binary repository server.
+func startServer(t *testing.T, depth int) (repository.Binary, func()) {
+ // Setup the root of the binary repository.
+ root, err := ioutil.TempDir("", veyronPrefix)
+ if err != nil {
+ t.Fatalf("TempDir() failed: %v", err)
+ }
+ path, perm := filepath.Join(root, VersionFile), os.FileMode(0600)
+ if err := ioutil.WriteFile(path, []byte(Version), perm); err != nil {
+ vlog.Fatalf("WriteFile(%v, %v, %v) failed: %v", path, Version, perm, err)
+ }
+ // Setup and start the binary repository server.
+ server, err := rt.R().NewServer()
+ if err != nil {
+ t.Fatalf("NewServer() failed: %v", err)
+ }
+ dispatcher, err := NewDispatcher(root, depth, nil)
+ if err != nil {
+ t.Fatalf("NewDispatcher(%v, %v, %v) failed: %v", root, depth, nil, err)
+ }
+ suffix := ""
+ if err := server.Register(suffix, dispatcher); err != nil {
+ t.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
+ }
+ protocol, hostname := "tcp", "localhost:0"
+ endpoint, err := server.Listen(protocol, hostname)
+ if err != nil {
+ t.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
+ }
+ name := naming.JoinAddressName(endpoint.String(), "//test")
+ binary, err := repository.BindBinary(name)
+ if err != nil {
+ t.Fatalf("BindRepository() failed: %v", err)
+ }
+ return binary, func() {
+ // Shutdown the binary repository server.
+ if err := server.Stop(); err != nil {
+ t.Fatalf("Stop() failed: %v", err)
+ }
+ if err := os.Remove(path); err != nil {
+ t.Fatalf("Remove(%v) failed: %v", path, err)
+ }
+ // Check that any directories and files that were created to
+ // represent the binary objects have been garbage collected.
+ if err := os.Remove(root); err != nil {
+ t.Fatalf("Remove(%v) failed: %v", root, err)
+ }
+ }
+}
+
+// TestHierarchy checks that the binary repository works correctly for
+// all possible valid values of the depth used for the directory
+// hierarchy that stores binary objects in the local file system.
+func TestHierarchy(t *testing.T) {
+ for i := 0; i < md5.Size; i++ {
+ binary, cleanup := startServer(t, i)
+ defer cleanup()
+ // Create up to 4MB of random bytes.
+ size := rnd.Intn(1000 * bufferLength)
+ data := randomBytes(size)
+ // Test the binary repository interface.
+ if err := binary.Create(rt.R().NewContext(), 1); err != nil {
+ t.Fatalf("Create() failed: %v", err)
+ }
+ if err := invokeUpload(t, binary, data, 0); err != nil {
+ t.FailNow()
+ }
+ parts, err := binary.Stat(rt.R().NewContext())
+ if err != nil {
+ t.Fatalf("Stat() failed: %v", err)
+ }
+ h := md5.New()
+ h.Write(data)
+ checksum := hex.EncodeToString(h.Sum(nil))
+ if expected, got := checksum, parts[0].Checksum; expected != got {
+ t.Fatalf("Unexpected checksum: expected %v, got %v", expected, got)
+ }
+ if expected, got := len(data), int(parts[0].Size); expected != got {
+ t.Fatalf("Unexpected size: expected %v, got %v", expected, got)
+ }
+ output, err := invokeDownload(t, binary, 0)
+ if err != nil {
+ t.FailNow()
+ }
+ if bytes.Compare(output, data) != 0 {
+ t.Fatalf("Unexpected output: expected %v, got %v", data, output)
+ }
+ if err := binary.Delete(rt.R().NewContext()); err != nil {
+ t.Fatalf("Delete() failed: %v", err)
+ }
+ }
+}
+
+// TestMultiPart checks that the binary repository supports multi-part
+// uploads and downloads ranging the number of parts the test binary
+// consists of.
+func TestMultiPart(t *testing.T) {
+ for length := 2; length < 5; length++ {
+ binary, cleanup := startServer(t, 2)
+ defer cleanup()
+ // Create <length> chunks of up to 4MB of random bytes.
+ data := make([][]byte, length)
+ for i := 0; i < length; i++ {
+ size := rnd.Intn(1000 * bufferLength)
+ data[i] = randomBytes(size)
+ }
+ // Test the binary repository interface.
+ if err := binary.Create(rt.R().NewContext(), int32(length)); err != nil {
+ t.Fatalf("Create() failed: %v", err)
+ }
+ for i := 0; i < length; i++ {
+ if err := invokeUpload(t, binary, data[i], int32(i)); err != nil {
+ t.FailNow()
+ }
+ }
+ parts, err := binary.Stat(rt.R().NewContext())
+ if err != nil {
+ t.Fatalf("Stat() failed: %v", err)
+ }
+ h := md5.New()
+ for i := 0; i < length; i++ {
+ hpart := md5.New()
+ output, err := invokeDownload(t, binary, int32(i))
+ if err != nil {
+ t.FailNow()
+ }
+ if bytes.Compare(output, data[i]) != 0 {
+ t.Fatalf("Unexpected output: expected %v, got %v", data[i], output)
+ }
+ h.Write(data[i])
+ hpart.Write(data[i])
+ checksum := hex.EncodeToString(hpart.Sum(nil))
+ if expected, got := checksum, parts[i].Checksum; expected != got {
+ t.Fatalf("Unexpected checksum: expected %v, got %v", expected, got)
+ }
+ if expected, got := len(data[i]), int(parts[i].Size); expected != got {
+ t.Fatalf("Unexpected size: expected %v, got %v", expected, got)
+ }
+ }
+ if err := binary.Delete(rt.R().NewContext()); err != nil {
+ t.Fatalf("Delete() failed: %v", err)
+ }
+ }
+}
+
+// TestResumption checks that the binary interface supports upload
+// resumption ranging the number of parts the uploaded binary consists
+// of.
+func TestResumption(t *testing.T) {
+ for length := 2; length < 5; length++ {
+ binary, cleanup := startServer(t, 2)
+ defer cleanup()
+ // Create <length> chunks of up to 4MB of random bytes.
+ data := make([][]byte, length)
+ for i := 0; i < length; i++ {
+ size := rnd.Intn(1000 * bufferLength)
+ data[i] = randomBytes(size)
+ }
+ if err := binary.Create(rt.R().NewContext(), int32(length)); err != nil {
+ t.Fatalf("Create() failed: %v", err)
+ }
+ // Simulate a flaky upload client that keeps uploading parts until
+ // finished.
+ for {
+ parts, err := binary.Stat(rt.R().NewContext())
+ if err != nil {
+ t.Fatalf("Stat() failed: %v", err)
+ }
+ finished := true
+ for _, part := range parts {
+ finished = finished && (part != MissingPart)
+ }
+ if finished {
+ break
+ }
+ for i := 0; i < length; i++ {
+ fail := rnd.Intn(2)
+ if parts[i] == MissingPart && fail != 0 {
+ if err := invokeUpload(t, binary, data[i], int32(i)); err != nil {
+ t.FailNow()
+ }
+ }
+ }
+ }
+ if err := binary.Delete(rt.R().NewContext()); err != nil {
+ t.Fatalf("Delete() failed: %v", err)
+ }
+ }
+}
+
+// TestErrors checks that the binary interface correctly reports errors.
+func TestErrors(t *testing.T) {
+ binary, cleanup := startServer(t, 2)
+ defer cleanup()
+ length := 2
+ data := make([][]byte, length)
+ for i := 0; i < length; i++ {
+ size := rnd.Intn(1000 * bufferLength)
+ data[i] = make([]byte, size)
+ for j := 0; j < size; j++ {
+ data[i][j] = byte(rnd.Int())
+ }
+ }
+ if err := binary.Create(rt.R().NewContext(), int32(length)); err != nil {
+ t.Fatalf("Create() failed: %v", err)
+ }
+ if err := binary.Create(rt.R().NewContext(), int32(length)); err == nil {
+ t.Fatalf("Create() did not fail when it should have")
+ }
+ if err := invokeUpload(t, binary, data[0], 0); err != nil {
+ t.Fatalf("Upload() failed: %v", err)
+ }
+ if err := invokeUpload(t, binary, data[0], 0); err == nil {
+ t.Fatalf("Upload() did not fail when it should have")
+ }
+ if _, err := invokeDownload(t, binary, 1); err == nil {
+ t.Fatalf("Download() did not fail when it should have")
+ }
+ if err := invokeUpload(t, binary, data[1], 1); err != nil {
+ t.Fatalf("Upload() failed: %v", err)
+ }
+ if _, err := invokeDownload(t, binary, 0); err != nil {
+ t.Fatalf("Download() failed: %v", err)
+ }
+ if err := binary.Delete(rt.R().NewContext()); err != nil {
+ t.Fatalf("Delete() failed: %v", err)
+ }
+ if err := binary.Delete(rt.R().NewContext()); err == nil {
+ t.Fatalf("Delete() did not fail when it should have")
+ }
+}
diff --git a/services/mgmt/binary/impl/invoker.go b/services/mgmt/binary/impl/invoker.go
new file mode 100644
index 0000000..05c91ec
--- /dev/null
+++ b/services/mgmt/binary/impl/invoker.go
@@ -0,0 +1,481 @@
+// The implementation of the binary repository interface stores
+// objects identified by veyron name suffixes using the local file
+// system. Given a veyron name suffix, the implementation computes an
+// MD5 hash of the suffix and generates the following path in the
+// local filesystem: /<root>/<dir_1>/.../<dir_n>/<hash>. The root and
+// the directory depth are parameters of the implementation. The
+// contents of the directory include the checksum and data for the
+// object and each of its individual parts:
+//
+// checksum
+// data
+// <part_1>/checksum
+// <part_1>/data
+// ...
+// <part_n>/checksum
+// <part_n>/data
+//
+// TODO(jsimsa): Add an "fsck" method that cleans up existing on-disk
+// repository and provide a command-line flag that identifies whether
+// fsck should run when new repository server process starts up.
+package impl
+
+import (
+ "crypto/md5"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strconv"
+ "syscall"
+
+ "veyron2/ipc"
+ "veyron2/services/mgmt/binary"
+ "veyron2/services/mgmt/repository"
+ "veyron2/vlog"
+)
+
+// state holds the state shared across different binary repository
+// invocations.
+type state struct {
+ // depth determines the depth of the directory hierarchy that the
+ // binary repository uses to organize binaries in the local file
+ // system. There is a trade-off here: smaller values lead to faster
+ // access, while higher values allow the performance to scale to
+ // larger collections of binaries. The number should be a value
+ // between 0 and (md5.Size - 1).
+ //
+ // Note that the cardinality of each level (except the leaf level)
+ // is at most 256. If you expect to have X total binary items, and
+ // you want to limit directories to at most Y entries (because of
+ // filesystem limitations), then you should set depth to at least
+ // log_256(X/Y). For example, using hierarchyDepth = 3 with a local
+ // filesystem that can handle up to 1,000 entries per directory
+ // before its performance degrades allows the binary repository to
+ // store 16B objects.
+ depth int
+ // root identifies the local filesystem directory in which the
+ // binary repository stores its objects.
+ root string
+}
+
+// invoker holds the state of a binary repository invocation.
+type invoker struct {
+ // path is the local filesystem path to the object identified by the
+ // veyron name suffix.
+ path string
+ // state holds the state shared across different binary repository
+ // invocations.
+ state *state
+ // suffix is the suffix of the current invocation that is assumed to
+ // be used as a relative veyron name to identify a binary.
+ suffix string
+}
+
+const (
+ checksum = "checksum"
+ data = "data"
+ lock = "lock"
+)
+
+var (
+ errExist = errors.New("binary already exists")
+ errNotExist = errors.New("binary does not exist")
+ errInProgress = errors.New("identical upload already in progress")
+ errInvalidParts = errors.New("invalid number of parts")
+ errOperationFailed = errors.New("operation failed")
+)
+
+// TODO(jsimsa): When VDL supports composite literal constants, remove
+// this definition.
+var MissingPart = binary.PartInfo{
+ Checksum: binary.MissingChecksum,
+ Size: binary.MissingSize,
+}
+
+// newInvoker is the invoker factory.
+func newInvoker(state *state, suffix string) *invoker {
+ // Generate the local filesystem path for the object identified by
+ // the veyron name suffix.
+ h := md5.New()
+ h.Write([]byte(suffix))
+ hash := hex.EncodeToString(h.Sum(nil))
+ dir := ""
+ for j := 0; j < state.depth; j++ {
+ dir = filepath.Join(dir, hash[j*2:(j+1)*2])
+ }
+ path := filepath.Join(state.root, dir, hash)
+ return &invoker{
+ path: path,
+ state: state,
+ suffix: suffix,
+ }
+}
+
+// BINARY INTERFACE IMPLEMENTATION
+
+const bufferLength = 4096
+
+// consolidate checks if all parts of a binary have been uploaded and
+// if so, creates the aggregate binary and checksum.
+func (i *invoker) consolidate() error {
+ // Coordinate concurrent uploaders to make sure the aggregate binary
+ // and checksum is created only once.
+ err := i.checksumExists(i.path)
+ switch err {
+ case nil:
+ return nil
+ case errNotExist:
+ default:
+ return err
+ }
+ // Check if all parts have been uploaded.
+ parts, err := i.getParts()
+ if err != nil {
+ return err
+ }
+ for _, part := range parts {
+ err := i.checksumExists(part)
+ switch err {
+ case errNotExist:
+ return nil
+ case nil:
+ default:
+ return err
+ }
+ }
+ // Create the aggregate binary and its checksum.
+ suffix := ""
+ output, err := ioutil.TempFile(i.path, suffix)
+ if err != nil {
+ vlog.Errorf("TempFile(%v, %v) failed: %v", i.path, suffix, err)
+ return errOperationFailed
+ }
+ defer output.Close()
+ buffer, h := make([]byte, bufferLength), md5.New()
+ for _, part := range parts {
+ input, err := os.Open(filepath.Join(part, data))
+ if err != nil {
+ vlog.Errorf("Open(%v) failed: %v", filepath.Join(part, data), err)
+ if err := os.Remove(output.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
+ }
+ return errOperationFailed
+ }
+ defer input.Close()
+ for {
+ n, err := input.Read(buffer)
+ if err != nil && err != io.EOF {
+ vlog.Errorf("Read() failed: %v", err)
+ if err := os.Remove(output.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
+ }
+ return errOperationFailed
+ }
+ if n == 0 {
+ break
+ }
+ if _, err := output.Write(buffer[:n]); err != nil {
+ vlog.Errorf("Write() failed: %v", err)
+ if err := os.Remove(output.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
+ }
+ return errOperationFailed
+ }
+ h.Write(buffer[:n])
+ }
+ }
+ hash := hex.EncodeToString(h.Sum(nil))
+ checksumFile, perm := filepath.Join(i.path, checksum), os.FileMode(0600)
+ if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
+ vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
+ if err := os.Remove(output.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
+ }
+ return errOperationFailed
+ }
+ dataFile := filepath.Join(i.path, data)
+ if err := os.Rename(output.Name(), dataFile); err != nil {
+ vlog.Errorf("Rename(%v, %v) failed: %v", output.Name(), dataFile, err)
+ if err := os.Remove(output.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
+ }
+ return errOperationFailed
+ }
+ return nil
+}
+
+// checksumExists checks whether the given path contains a
+// checksum. The implementation uses the existence of checksum to
+// determine whether the binary (part) identified by the given path
+// exists.
+func (i *invoker) checksumExists(path string) error {
+ checksumFile := filepath.Join(path, checksum)
+ _, err := os.Stat(checksumFile)
+ switch {
+ case os.IsNotExist(err):
+ return errNotExist
+ case err != nil:
+ vlog.Errorf("Stat(%v) failed: %v", path, err)
+ return errOperationFailed
+ default:
+ return nil
+ }
+}
+
+// generatePartPath generates a path for the given binary part.
+func (i *invoker) generatePartPath(part int) string {
+ return filepath.Join(i.path, fmt.Sprintf("%d", part))
+}
+
+// getParts returns a collection of paths to the parts of the binary.
+func (i *invoker) getParts() ([]string, error) {
+ infos, err := ioutil.ReadDir(i.path)
+ if err != nil {
+ vlog.Errorf("ReadDir(%v) failed: %v", i.path, err)
+ return []string{}, errOperationFailed
+ }
+ n := 0
+ result := make([]string, len(infos))
+ for _, info := range infos {
+ if info.IsDir() {
+ idx, err := strconv.Atoi(info.Name())
+ if err != nil {
+ vlog.Errorf("Atoi(%v) failed: %v", info.Name(), err)
+ return []string{}, errOperationFailed
+ }
+ result[idx] = filepath.Join(i.path, info.Name())
+ n++
+ }
+ }
+ result = result[:n]
+ return result, nil
+}
+
+func (i *invoker) Create(_ ipc.ServerContext, nparts int32) error {
+ vlog.Infof("%v.Create(%v)", i.suffix, nparts)
+ if nparts < 1 {
+ return errInvalidParts
+ }
+ parent, perm := filepath.Dir(i.path), os.FileMode(0700)
+ if err := os.MkdirAll(parent, perm); err != nil {
+ vlog.Errorf("MkdirAll(%v, %v) failed: %v", parent, perm, err)
+ return errOperationFailed
+ }
+ prefix := "creating-"
+ tmpDir, err := ioutil.TempDir(parent, prefix)
+ if err != nil {
+ vlog.Errorf("TempDir(%v, %v) failed: %v", parent, prefix, err)
+ return errOperationFailed
+ }
+ for j := 0; j < int(nparts); j++ {
+ partPath, partPerm := filepath.Join(tmpDir, fmt.Sprintf("%d", j)), os.FileMode(0700)
+ if err := os.MkdirAll(partPath, partPerm); err != nil {
+ vlog.Errorf("MkdirAll(%v, %v) failed: %v", partPath, partPerm, err)
+ if err := os.RemoveAll(tmpDir); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
+ }
+ return errOperationFailed
+ }
+ }
+ // Use os.Rename() to atomically create the binary directory
+ // structure.
+ if err := os.Rename(tmpDir, i.path); err != nil {
+ defer func() {
+ if err := os.RemoveAll(tmpDir); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
+ }
+ }()
+ if os.IsExist(err) {
+ return errExist
+ }
+ vlog.Errorf("Rename(%v, %v) failed: %v", tmpDir, i.path, err)
+ return errOperationFailed
+ }
+ return nil
+}
+
+func (i *invoker) Delete(context ipc.ServerContext) error {
+ vlog.Infof("%v.Delete()", i.suffix)
+ if _, err := os.Stat(i.path); err != nil {
+ if os.IsNotExist(err) {
+ return errNotExist
+ }
+ vlog.Errorf("Stat(%v) failed: %v", i.path, err)
+ return errOperationFailed
+ }
+ // Use os.Rename() to atomically remove the binary directory
+ // structure.
+ path := filepath.Join(filepath.Dir(i.path), "removing-"+filepath.Base(i.path))
+ vlog.Infof("from %v to %v", i.path, path)
+ if err := os.Rename(i.path, path); err != nil {
+ vlog.Errorf("Rename(%v, %v) failed: %v", i.path, path, err)
+ return errOperationFailed
+ }
+ if err := os.RemoveAll(path); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", path, err)
+ return errOperationFailed
+ }
+ for {
+ // Remove the binary and all directories on the path back to the
+ // root that are left empty after the binary is removed.
+ path = filepath.Dir(path)
+ if i.state.root == path {
+ break
+ }
+ if err := os.Remove(path); err != nil {
+ if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
+ break
+ }
+ vlog.Errorf("Remove(%v) failed: %v", path, err)
+ return errOperationFailed
+ }
+ }
+ return nil
+}
+
+func (i *invoker) Download(context ipc.ServerContext, part int32, stream repository.BinaryServiceDownloadStream) error {
+ vlog.Infof("%v.Download(%v)", i.suffix, part)
+ path := i.generatePartPath(int(part))
+ if err := i.checksumExists(path); err != nil {
+ return err
+ }
+ dataPath := filepath.Join(path, data)
+ file, err := os.Open(dataPath)
+ if err != nil {
+ vlog.Errorf("Open(%v) failed: %v", path, err)
+ return errOperationFailed
+ }
+ defer file.Close()
+ buffer := make([]byte, bufferLength)
+ for {
+ n, err := file.Read(buffer)
+ if err != nil && err != io.EOF {
+ vlog.Errorf("Read() failed: %v", err)
+ return errOperationFailed
+ }
+ if n == 0 {
+ break
+ }
+ if err := stream.Send(buffer[:n]); err != nil {
+ vlog.Errorf("Send() failed: %v", err)
+ return errOperationFailed
+ }
+ }
+ return nil
+}
+
+func (i *invoker) DownloadURL(ipc.ServerContext) (string, int64, error) {
+ vlog.Infof("%v.DownloadURL()", i.suffix)
+ // TODO(jsimsa): Implement.
+ return "", 0, nil
+}
+
+func (i *invoker) Stat(ipc.ServerContext) ([]binary.PartInfo, error) {
+ vlog.Infof("%v.Stat()", i.suffix)
+ result := make([]binary.PartInfo, 0)
+ parts, err := i.getParts()
+ if err != nil {
+ return []binary.PartInfo{}, err
+ }
+ for _, part := range parts {
+ checksumFile := filepath.Join(part, checksum)
+ bytes, err := ioutil.ReadFile(checksumFile)
+ if err != nil {
+ if os.IsNotExist(err) {
+ result = append(result, MissingPart)
+ continue
+ }
+ vlog.Errorf("ReadFile(%v) failed: %v", checksumFile, err)
+ return []binary.PartInfo{}, errOperationFailed
+ }
+ dataFile := filepath.Join(part, data)
+ fi, err := os.Stat(dataFile)
+ if err != nil {
+ if os.IsNotExist(err) {
+ result = append(result, MissingPart)
+ continue
+ }
+ vlog.Errorf("Stat(%v) failed: %v", dataFile, err)
+ return []binary.PartInfo{}, errOperationFailed
+ }
+ result = append(result, binary.PartInfo{Checksum: string(bytes), Size: fi.Size()})
+ }
+ return result, nil
+}
+
+func (i *invoker) Upload(context ipc.ServerContext, part int32, stream repository.BinaryServiceUploadStream) error {
+ vlog.Infof("%v.Upload(%v)", i.suffix, part)
+ path, suffix := i.generatePartPath(int(part)), ""
+ err := i.checksumExists(path)
+ switch err {
+ case nil:
+ return errExist
+ case errNotExist:
+ default:
+ return err
+ }
+ // Use os.OpenFile() to resolve races.
+ lockPath, flags, perm := filepath.Join(path, lock), os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(0600)
+ lockFile, err := os.OpenFile(lockPath, flags, perm)
+ if err != nil {
+ if os.IsExist(err) {
+ return errInProgress
+ }
+ vlog.Errorf("OpenFile(%v, %v, %v) failed: %v", lockPath, flags, suffix, err)
+ return errOperationFailed
+ }
+ defer os.Remove(lockFile.Name())
+ defer lockFile.Close()
+ file, err := ioutil.TempFile(path, suffix)
+ if err != nil {
+ vlog.Errorf("TempFile(%v, %v) failed: %v", path, suffix, err)
+ return errOperationFailed
+ }
+ defer file.Close()
+ h := md5.New()
+ for {
+ bytes, err := stream.Recv()
+ if err != nil && err != io.EOF {
+ vlog.Errorf("Recv() failed: %v", err)
+ if err := os.Remove(file.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
+ }
+ return errOperationFailed
+ }
+ if err == io.EOF {
+ break
+ }
+ if _, err := file.Write(bytes); err != nil {
+ vlog.Errorf("Write() failed: %v", err)
+ if err := os.Remove(file.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
+ }
+ return errOperationFailed
+ }
+ h.Write(bytes)
+ }
+ hash := hex.EncodeToString(h.Sum(nil))
+ checksumFile, perm := filepath.Join(path, checksum), os.FileMode(0600)
+ if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
+ vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
+ if err := os.Remove(file.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
+ }
+ return errOperationFailed
+ }
+ vlog.Infof("%v", checksumFile)
+ dataFile := filepath.Join(path, data)
+ if err := os.Rename(file.Name(), dataFile); err != nil {
+ vlog.Errorf("Rename(%v, %v) failed: %v", file.Name(), dataFile, err)
+ if err := os.Remove(file.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
+ }
+ return errOperationFailed
+ }
+ vlog.Infof("%v", dataFile)
+ return nil
+}
diff --git a/services/mgmt/content/contentd/main.go b/services/mgmt/content/contentd/main.go
deleted file mode 100644
index d176ae8..0000000
--- a/services/mgmt/content/contentd/main.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package main
-
-import (
- "flag"
- "io/ioutil"
- "os"
-
- "veyron/lib/signals"
- vflag "veyron/security/flag"
-
- "veyron/services/mgmt/content/impl"
-
- "veyron2/rt"
- "veyron2/vlog"
-)
-
-const (
- defaultDepth = 3
- defaultRootPrefix = "veyron_content_manager"
-)
-
-func main() {
- var address, protocol, name, root string
- // TODO(rthellend): Remove the address and protocol flags when the config manager is working.
- flag.StringVar(&address, "address", "localhost:0", "network address to listen on")
- flag.StringVar(&name, "name", "", "name to mount the content manager as")
- flag.StringVar(&protocol, "protocol", "tcp", "network type to listen on")
- flag.StringVar(&root, "root", "", "root directory for the content server")
- flag.Parse()
- if root == "" {
- var err error
- if root, err = ioutil.TempDir("", defaultRootPrefix); err != nil {
- vlog.Errorf("TempDir() failed: %v\n", err)
- return
- }
- } else {
- if _, err := os.Stat(root); os.IsNotExist(err) {
- if err := os.MkdirAll(root, 0700); err != nil {
- vlog.Errorf("Directory %v does not exist and cannot be created.", root)
- return
- }
- }
- }
- vlog.VI(0).Infof("Content manager mounted at %v", root)
- runtime := rt.Init()
- defer runtime.Shutdown()
- server, err := runtime.NewServer()
- if err != nil {
- vlog.Errorf("NewServer() failed: %v", err)
- return
- }
- defer server.Stop()
-
- dispatcher := impl.NewDispatcher(root, defaultDepth, vflag.NewAuthorizerOrDie())
- suffix := ""
- if err := server.Register(suffix, dispatcher); err != nil {
- vlog.Errorf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
- return
- }
- endpoint, err := server.Listen(protocol, address)
- if err != nil {
- vlog.Errorf("Listen(%v, %v) failed: %v", protocol, address, err)
- return
- }
- if err := server.Publish(name); err != nil {
- vlog.Errorf("Publish(%v) failed: %v", name, err)
- return
- }
- vlog.VI(0).Infof("Content manager published at %v/%v", endpoint, name)
-
- // Wait until shutdown.
- <-signals.ShutdownOnSignals()
-}
diff --git a/services/mgmt/content/impl/dispatcher.go b/services/mgmt/content/impl/dispatcher.go
deleted file mode 100644
index 065c101..0000000
--- a/services/mgmt/content/impl/dispatcher.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package impl
-
-import (
- "sync"
-
- "veyron2/ipc"
- "veyron2/security"
- "veyron2/services/mgmt/repository"
-)
-
-// dispatcher holds the state of the content repository dispatcher.
-type dispatcher struct {
- root string
- depth int
- fs sync.Mutex
- auth security.Authorizer
-}
-
-// newDispatcher is the dispatcher factory.
-func NewDispatcher(root string, depth int, authorizer security.Authorizer) *dispatcher {
- return &dispatcher{root: root, auth: authorizer}
-}
-
-// DISPATCHER INTERFACE IMPLEMENTATION
-
-func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerContent(newInvoker(d.root, d.depth, &d.fs, suffix)))
- return invoker, d.auth, nil
-}
diff --git a/services/mgmt/content/impl/impl_test.go b/services/mgmt/content/impl/impl_test.go
deleted file mode 100644
index 4d04924..0000000
--- a/services/mgmt/content/impl/impl_test.go
+++ /dev/null
@@ -1,151 +0,0 @@
-package impl
-
-import (
- "bytes"
- "crypto/md5"
- "encoding/hex"
- "io/ioutil"
- "os"
- "testing"
-
- "veyron2"
- "veyron2/naming"
- "veyron2/rt"
- "veyron2/services/mgmt/repository"
-)
-
-const (
- veyronPrefix = "veyron_content_server"
-)
-
-// quote is a content example used in this test.
-var (
- quote = []byte("Everything should be made as simple as possible, but not simpler.")
-)
-
-// invokeUpload invokes the Upload RPC using the given client stub
-// <stub> and streams the given content <content> to it.
-func invokeUpload(t *testing.T, stub repository.Content, content []byte) (string, error) {
- stream, err := stub.Upload(rt.R().NewContext())
- if err != nil {
- return "", err
- }
- if err := stream.Send(content); err != nil {
- stream.Finish()
- return "", err
- }
- if err := stream.CloseSend(); err != nil {
- stream.Finish()
- return "", err
- }
- name, err := stream.Finish()
- if err != nil {
- return "", err
- }
- return name, nil
-}
-
-// invokeDownload invokes the Download RPC using the given client stub
-// <stub> and streams content from to it.
-func invokeDownload(t *testing.T, stub repository.Content) ([]byte, error) {
- stream, err := stub.Download(rt.R().NewContext())
- if err != nil {
- return nil, err
- }
- defer stream.Finish()
- output, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- return output, nil
-}
-
-// invokeDelete invokes the Delete RPC using the given client stub
-// <stub>.
-func invokeDelete(t *testing.T, stub repository.Content) error {
- return stub.Delete(rt.R().NewContext())
-}
-
-// testInterface tests the content repository interface using the
-// given depth for hierarchy of content objects.
-func testInterface(t *testing.T, runtime veyron2.Runtime, depth int) {
- root, err := ioutil.TempDir("", veyronPrefix)
- if err != nil {
- t.Fatalf("TempDir() failed: %v", err)
- }
-
- // Setup and start the content repository server.
- server, err := runtime.NewServer()
- if err != nil {
- t.Fatalf("NewServer() failed: %v", err)
- }
- dispatcher := NewDispatcher(root, depth, nil)
- suffix := ""
- if err := server.Register(suffix, dispatcher); err != nil {
- t.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
- }
- protocol, hostname := "tcp", "localhost:0"
- endpoint, err := server.Listen(protocol, hostname)
- if err != nil {
- t.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
- }
-
- // Create client stubs for talking to the server.
- name := naming.JoinAddressName(endpoint.String(), "//")
- stub, err := repository.BindContent(name)
- if err != nil {
- t.Fatalf("BindRepository() failed: %v", err)
- }
-
- // Upload
- contentName, err := invokeUpload(t, stub, quote)
- if err != nil {
- t.Fatalf("invokeUpload() failed: %v", err)
- }
- h := md5.New()
- h.Write(quote)
- checksum := hex.EncodeToString(h.Sum(nil))
- if contentName != checksum {
- t.Fatalf("Unexpected checksum: expected %v, got %v", checksum, contentName)
- }
-
- // Download
- stub, err = repository.BindContent(naming.Join(name, checksum))
- if err != nil {
- t.Fatalf("BindRepository() failed: %v", err)
- }
- output, err := invokeDownload(t, stub)
- if err != nil {
- t.Fatalf("invokedDownload() failed: %v", err)
- }
- if bytes.Compare(output, quote) != 0 {
- t.Fatalf("Unexpected output: expected %v, got %v", quote, output)
- }
-
- // Delete
- if err := invokeDelete(t, stub); err != nil {
- t.Fatalf("invokedDelete() failed: %v", err)
- }
-
- // Check that any directories and files that were created to
- // represent the content objects have been garbage collected.
- if err := os.Remove(root); err != nil {
- t.Fatalf("Remove() failed: %v", err)
- }
-
- // Shutdown the content repository server.
- if err := server.Stop(); err != nil {
- t.Fatalf("Stop() failed: %v", err)
- }
-}
-
-// TestHierarchy checks that the content repository works correctly for
-// all possible valid values of the depth used for the directory
-// hierarchy that stores content objects in the local file system.
-func TestHierarchy(t *testing.T) {
- runtime := rt.Init()
- defer runtime.Shutdown()
- for i := 0; i < md5.Size; i++ {
- testInterface(t, runtime, i)
- }
-}
diff --git a/services/mgmt/content/impl/invoker.go b/services/mgmt/content/impl/invoker.go
deleted file mode 100644
index 2f5509d..0000000
--- a/services/mgmt/content/impl/invoker.go
+++ /dev/null
@@ -1,194 +0,0 @@
-package impl
-
-import (
- "crypto/md5"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "path/filepath"
- "regexp"
- "sync"
- "syscall"
-
- "veyron2/ipc"
- "veyron2/services/mgmt/repository"
- "veyron2/vlog"
-)
-
-// invoker holds the state of a content repository invocation.
-type invoker struct {
- // root is the directory at which the content repository namespace is
- // mounted.
- root string
- // depth determines the depth of the directory hierarchy that the
- // content repository uses to organize the content in the local file
- // system. There is a trade-off here: smaller values lead to faster
- // access, while higher values allow the performance to scale to
- // large content collections. The number should be a value in
- // between 0 and (md5.Size - 1).
- //
- // Note that the cardinality of each level (except the leaf level)
- // is at most 256. If you expect to have X total content items, and
- // you want to limit directories to at most Y entries (because of
- // filesystem limitations), then you should set depth to at least
- // log_256(X/Y). For example, using hierarchyDepth = 3 with a local
- // filesystem that can handle up to 1,000 entries per directory
- // before its performance degrades allows the content repository to
- // store 16B objects.
- depth int
- // fs is a lock used to atomatically modify the contents of the
- // local filesystem used for storing the content.
- fs *sync.Mutex
- // suffix is the suffix of the current invocation that is assumed to
- // be used as a relative veyron name to identify content.
- suffix string
-}
-
-var (
- errContentExists = errors.New("content already exists")
- errMissingChecksum = errors.New("missing checksum")
- errInvalidChecksum = errors.New("invalid checksum")
- errInvalidSuffix = errors.New("invalid suffix")
- errOperationFailed = errors.New("operation failed")
-)
-
-// newInvoker is the invoker factory.
-func newInvoker(root string, depth int, fs *sync.Mutex, suffix string) *invoker {
- if min, max := 0, md5.Size-1; min > depth || depth > max {
- panic(fmt.Sprintf("Depth should be a value between %v and %v, got %v.", min, max, depth))
- }
- return &invoker{
- root: root,
- depth: depth,
- fs: fs,
- suffix: suffix,
- }
-}
-
-var (
- reg = regexp.MustCompile("[a-f0-9]+")
-)
-
-func isValid(suffix string) bool {
- return reg.Match([]byte(suffix))
-}
-
-// CONTENT INTERFACE IMPLEMENTATION
-
-const bufferLength = 1024
-
-func (i *invoker) generateDir(checksum string) string {
- dir := ""
- for j := 0; j < i.depth; j++ {
- dir = filepath.Join(dir, checksum[j*2:(j+1)*2])
- }
- return dir
-}
-
-func (i *invoker) Delete(context ipc.ServerContext) error {
- vlog.VI(0).Infof("%v.Delete()", i.suffix)
- if !isValid(i.suffix) {
- return errInvalidSuffix
- }
- filename := filepath.Join(i.root, i.generateDir(i.suffix), i.suffix)
- i.fs.Lock()
- defer i.fs.Unlock()
- // Remove the content and all directories on the path back to the
- // root that are left empty after the content is removed.
- if err := os.Remove(filename); err != nil {
- return errOperationFailed
- }
- for {
- filename = filepath.Dir(filename)
- if i.root == filename {
- break
- }
- if err := os.Remove(filename); err != nil {
- if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
- break
- }
- return errOperationFailed
- }
- }
- return nil
-}
-
-func (i *invoker) Download(context ipc.ServerContext, stream repository.ContentServiceDownloadStream) error {
- vlog.VI(0).Infof("%v.Download()", i.suffix)
- if !isValid(i.suffix) {
- return errInvalidSuffix
- }
- h := md5.New()
- file, err := os.Open(filepath.Join(i.root, i.generateDir(i.suffix), i.suffix))
- if err != nil {
- return errOperationFailed
- }
- defer file.Close()
- buffer := make([]byte, bufferLength)
- for {
- n, err := file.Read(buffer)
- if err != nil && err != io.EOF {
- return errOperationFailed
- }
- if n == 0 {
- break
- }
- if err := stream.Send(buffer[:n]); err != nil {
- return errOperationFailed
- }
- h.Write(buffer[:n])
- }
- if hex.EncodeToString(h.Sum(nil)) != i.suffix {
- return errInvalidChecksum
- }
- return nil
-}
-
-func (i *invoker) Upload(context ipc.ServerContext, stream repository.ContentServiceUploadStream) (string, error) {
- vlog.VI(0).Infof("%v.Upload()", i.suffix)
- if i.suffix != "" {
- return "", errInvalidSuffix
- }
- h := md5.New()
- file, err := ioutil.TempFile(i.root, "")
- if err != nil {
- return "", errOperationFailed
- }
- defer file.Close()
- for {
- bytes, err := stream.Recv()
- if err != nil && err != io.EOF {
- return "", errOperationFailed
- }
- if err == io.EOF {
- break
- }
- if _, err := file.Write(bytes); err != nil {
- return "", errOperationFailed
- }
- h.Write(bytes)
- }
- checksum := hex.EncodeToString(h.Sum(nil))
- dir := filepath.Join(i.root, i.generateDir(checksum))
- i.fs.Lock()
- defer i.fs.Unlock()
- if err := os.MkdirAll(dir, 0700); err != nil {
- return "", errOperationFailed
- }
- filename := filepath.Join(dir, checksum)
- if _, err := os.Stat(filename); err != nil && !os.IsNotExist(err) {
- os.Remove(file.Name())
- return "", errContentExists
- }
- // Note that this rename operation requires only a metadata update
- // (as opposed to copy and delete) because file.Name() and filename
- // share the same mount point.
- if err := os.Rename(file.Name(), filename); err != nil {
- os.Remove(file.Name())
- return "", errOperationFailed
- }
- return checksum, nil
-}
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index 57032a2..c1f18fd 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -26,6 +26,7 @@
"veyron2/naming"
"veyron2/rt"
"veyron2/services/mgmt/application"
+ "veyron2/services/mgmt/binary"
"veyron2/services/mgmt/repository"
"veyron2/vlog"
)
@@ -56,17 +57,22 @@
return *envelope, nil
}
-// crInvoker holds the state of a content repository invocation mock.
+// crInvoker holds the state of a binary repository invocation mock.
type crInvoker struct{}
-// CONTENT REPOSITORY INTERFACE IMPLEMENTATION
+// BINARY REPOSITORY INTERFACE IMPLEMENTATION
+
+func (*crInvoker) Create(ipc.ServerContext, int32) error {
+ vlog.VI(0).Infof("Create()")
+ return nil
+}
func (i *crInvoker) Delete(ipc.ServerContext) error {
vlog.VI(0).Infof("Delete()")
return nil
}
-func (i *crInvoker) Download(_ ipc.ServerContext, stream repository.ContentServiceDownloadStream) error {
+func (i *crInvoker) Download(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceDownloadStream) error {
vlog.VI(0).Infof("Download()")
file, err := os.Open(os.Args[0])
if err != nil {
@@ -93,9 +99,19 @@
}
}
-func (i *crInvoker) Upload(ipc.ServerContext, repository.ContentServiceUploadStream) (string, error) {
+func (*crInvoker) DownloadURL(ipc.ServerContext) (string, int64, error) {
+ vlog.VI(0).Infof("DownloadURL()")
+ return "", 0, nil
+}
+
+func (*crInvoker) Stat(ipc.ServerContext) ([]binary.PartInfo, error) {
+ vlog.VI(0).Infof("Stat()")
+ return make([]binary.PartInfo, 1), nil
+}
+
+func (i *crInvoker) Upload(ipc.ServerContext, int32, repository.BinaryServiceUploadStream) error {
vlog.VI(0).Infof("Upload()")
- return "", nil
+ return nil
}
func generateBinary(workspace string) string {
@@ -203,8 +219,8 @@
case "parent":
runtime := rt.Init()
defer runtime.Shutdown()
- // Set up a mock content repository, a mock application repository, and a node manager.
- _, crCleanup := startContentRepository()
+ // Set up a mock binary repository, a mock application repository, and a node manager.
+ _, crCleanup := startBinaryRepository()
defer crCleanup()
_, arCleanup := startApplicationRepository()
defer arCleanup()
@@ -266,12 +282,12 @@
}
}
-func startContentRepository() (string, func()) {
+func startBinaryRepository() (string, func()) {
server, err := rt.R().NewServer()
if err != nil {
vlog.Fatalf("NewServer() failed: %v", err)
}
- suffix, dispatcher := "", ipc.SoloDispatcher(repository.NewServerContent(&crInvoker{}), nil)
+ suffix, dispatcher := "", ipc.SoloDispatcher(repository.NewServerBinary(&crInvoker{}), nil)
if err := server.Register(suffix, dispatcher); err != nil {
vlog.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
}
@@ -280,7 +296,7 @@
if err != nil {
vlog.Fatalf("Listen(%v, %v) failed: %v", protocol, hostname, err)
}
- vlog.VI(1).Infof("Content repository running at endpoint: %s", endpoint)
+ vlog.VI(1).Infof("Binary repository running at endpoint: %s", endpoint)
name := "cr"
if err := server.Publish(name); err != nil {
vlog.Fatalf("Publish(%v) failed: %v", name, err)
@@ -366,13 +382,13 @@
// restarts itself using syscall.Exec(), effectively becoming the
// process described next.
//
-// 2) The "parent" branch sets up a mock application and content
+// 2) The "parent" branch sets up a mock application and binary
// repository and a node manager that is pointed to the mock
// application repository for updates. When all three services start,
// the TestUpdate() method is notified and it proceeds to invoke
// Update() on the node manager. This in turn results in the node
// manager downloading an application envelope from the mock
-// application repository and a binary from the mock content
+// application repository and a binary from the mock binary
// repository. These are identical to the application envelope of the
// "parent" node manager, except for the VEYRON_NM_TEST variable,
// which is set to "child". The Update() method then spawns the child
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index bcd0e5a..5a01403 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -329,14 +329,9 @@
// APPLICATION INTERFACE IMPLEMENTATION
func downloadBinary(workspace, binary string) error {
- stub, err := repository.BindContent(binary)
+ stub, err := repository.BindBinary(binary)
if err != nil {
- vlog.Errorf("BindContent(%q) failed: %v", binary, err)
- return errOperationFailed
- }
- stream, err := stub.Download(rt.R().NewContext())
- if err != nil {
- vlog.Errorf("Download() failed: %v", err)
+ vlog.Errorf("BindBinary(%q) failed: %v", binary, err)
return errOperationFailed
}
path := filepath.Join(workspace, "noded")
@@ -346,24 +341,39 @@
return errOperationFailed
}
defer file.Close()
- for {
- bytes, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- vlog.Errorf("Recv() failed: %v", err)
- return errOperationFailed
- }
- if _, err := file.Write(bytes); err != nil {
- vlog.Errorf("Write() failed: %v", err)
- return errOperationFailed
- }
- }
- if err := stream.Finish(); err != nil {
- vlog.Errorf("Finish() failed: %v", err)
+ parts, err := stub.Stat(rt.R().NewContext())
+ if err != nil {
+ vlog.Errorf("Stat() failed: %v", err)
return errOperationFailed
}
+ // TODO(jsimsa): Replace the code below with a call to a client-side
+ // binary library once this library exists. In particular, we should
+ // take care of resumption and consistency checking.
+ for i := 0; i < len(parts); i++ {
+ stream, err := stub.Download(rt.R().NewContext(), int32(i))
+ if err != nil {
+ vlog.Errorf("Download() failed: %v", err)
+ return errOperationFailed
+ }
+ for {
+ bytes, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ vlog.Errorf("Recv() failed: %v", err)
+ return errOperationFailed
+ }
+ if _, err := file.Write(bytes); err != nil {
+ vlog.Errorf("Write() failed: %v", err)
+ return errOperationFailed
+ }
+ }
+ if err := stream.Finish(); err != nil {
+ vlog.Errorf("Finish() failed: %v", err)
+ return errOperationFailed
+ }
+ }
mode := os.FileMode(0755)
if err := file.Chmod(mode); err != nil {
vlog.Errorf("Chmod(%v) failed: %v", mode, err)
diff --git a/tools/binary/impl/impl.go b/tools/binary/impl/impl.go
new file mode 100644
index 0000000..9075b72
--- /dev/null
+++ b/tools/binary/impl/impl.go
@@ -0,0 +1,173 @@
+package impl
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "veyron/lib/cmdline"
+
+ "veyron2/rt"
+ "veyron2/services/mgmt/repository"
+)
+
+var cmdDelete = &cmdline.Command{
+ Run: runDelete,
+ Name: "delete",
+ Short: "Delete binary",
+ Long: "Delete connects to the binary repository and deletes the specified binary",
+ ArgsName: "<binary>",
+ ArgsLong: "<binary> is the veyron name of the binary to delete",
+}
+
+func runDelete(cmd *cmdline.Command, args []string) error {
+ if expected, got := 1, len(args); expected != got {
+ return cmd.Errorf("delete: incorrect number of arguments, expected %d, got %d", expected, got)
+ }
+ binary := args[0]
+
+ c, err := repository.BindBinary(binary)
+ if err != nil {
+ return fmt.Errorf("bind error: %v", err)
+ }
+ if err = c.Delete(rt.R().NewContext()); err != nil {
+ return err
+ }
+ fmt.Fprintf(cmd.Stdout(), "Binary deleted successfully\n")
+ return nil
+}
+
+var cmdDownload = &cmdline.Command{
+ Run: runDownload,
+ Name: "download",
+ Short: "Download binary",
+ Long: `
+Download connects to the binary repository, downloads the specified binary, and
+writes it to a file.
+`,
+ ArgsName: "<binary> <filename>",
+ ArgsLong: `
+<binary> is the veyron name of the binary to download
+<filename> is the name of the file where the binary will be written
+`,
+}
+
+func runDownload(cmd *cmdline.Command, args []string) error {
+ if expected, got := 2, len(args); expected != got {
+ return cmd.Errorf("download: incorrect number of arguments, expected %d, got %d", expected, got)
+ }
+ binary, filename := args[0], args[1]
+ f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
+ if err != nil {
+ return fmt.Errorf("failed to open %q: %v", filename, err)
+ }
+ defer f.Close()
+
+ c, err := repository.BindBinary(binary)
+ if err != nil {
+ return fmt.Errorf("bind error: %v", err)
+ }
+
+ // TODO(jsimsa): Replace the code below with a call to a client-side
+ // binary library once this library exists. In particular, we should
+ // take care of resumption and consistency checking.
+ stream, err := c.Download(rt.R().NewContext(), 0)
+ if err != nil {
+ return err
+ }
+
+ for {
+ buf, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return fmt.Errorf("recv error: %v", err)
+ }
+ if _, err = f.Write(buf); err != nil {
+ return fmt.Errorf("write error: %v", err)
+ }
+ }
+
+ err = stream.Finish()
+ if err != nil {
+ return fmt.Errorf("finish error: %v", err)
+ }
+
+ fmt.Fprintf(cmd.Stdout(), "Binary downloaded to file %s\n", filename)
+ return nil
+}
+
+var cmdUpload = &cmdline.Command{
+ Run: runUpload,
+ Name: "upload",
+ Short: "Upload binary",
+ Long: `
+Upload connects to the binary repository and uploads the binary of the specified
+file. When successful, it writes the name of the new binary to stdout.
+`,
+ ArgsName: "<binary> <filename>",
+ ArgsLong: `
+<binary> is the veyron name of the binary to upload
+<filename> is the name of the file to upload
+`,
+}
+
+func runUpload(cmd *cmdline.Command, args []string) error {
+ if expected, got := 2, len(args); expected != got {
+ return cmd.Errorf("upload: incorrect number of arguments, expected %d, got %d", expected, got)
+ }
+ binary, filename := args[0], args[1]
+ f, err := os.Open(filename)
+ if err != nil {
+ return fmt.Errorf("failed to open %q: %v", filename, err)
+ }
+ defer f.Close()
+
+ c, err := repository.BindBinary(binary)
+ if err != nil {
+ return fmt.Errorf("bind error: %v", err)
+ }
+
+ // TODO(jsimsa): Add support for uploading multi-part binaries.
+ if err := c.Create(rt.R().NewContext(), 1); err != nil {
+ return err
+ }
+
+ stream, err := c.Upload(rt.R().NewContext(), 0)
+ if err != nil {
+ return err
+ }
+
+ var buf [4096]byte
+ for {
+ n, err := f.Read(buf[:])
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return fmt.Errorf("read error: %v", err)
+ }
+ if err := stream.Send(buf[:n]); err != nil {
+ return fmt.Errorf("send error: %v", err)
+ }
+ }
+ if err := stream.CloseSend(); err != nil {
+ return fmt.Errorf("closesend error: %v", err)
+ }
+
+ if err := stream.Finish(); err != nil {
+ return fmt.Errorf("finish error: %v", err)
+ }
+
+ return nil
+}
+
+func Root() *cmdline.Command {
+ return &cmdline.Command{
+ Name: "binary",
+ Short: "Command-line tool for interacting with the veyron binary repository",
+ Long: "Command-line tool for interacting with the veyron binary repository",
+ Children: []*cmdline.Command{cmdDelete, cmdDownload, cmdUpload},
+ }
+}
diff --git a/tools/content/impl/impl_test.go b/tools/binary/impl/impl_test.go
similarity index 68%
rename from tools/content/impl/impl_test.go
rename to tools/binary/impl/impl_test.go
index ed3f5a5..86595e1 100644
--- a/tools/content/impl/impl_test.go
+++ b/tools/binary/impl/impl_test.go
@@ -9,13 +9,14 @@
"strings"
"testing"
- "veyron/tools/content/impl"
+ "veyron/tools/binary/impl"
"veyron2"
"veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
"veyron2/security"
+ "veyron2/services/mgmt/binary"
"veyron2/services/mgmt/repository"
"veyron2/vlog"
)
@@ -24,29 +25,44 @@
suffix string
}
+func (s *server) Create(ipc.ServerContext, int32) error {
+ vlog.VI(2).Infof("Create() was called. suffix=%v", s.suffix)
+ return nil
+}
+
func (s *server) Delete(ipc.ServerContext) error {
vlog.VI(2).Infof("Delete() was called. suffix=%v", s.suffix)
if s.suffix != "exists" {
- return fmt.Errorf("content doesn't exist: %v", s.suffix)
+ return fmt.Errorf("binary doesn't exist: %v", s.suffix)
}
return nil
}
-func (s *server) Download(_ ipc.ServerContext, stream repository.ContentServiceDownloadStream) error {
+func (s *server) Download(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceDownloadStream) error {
vlog.VI(2).Infof("Download() was called. suffix=%v", s.suffix)
stream.Send([]byte("Hello"))
stream.Send([]byte("World"))
return nil
}
-func (s *server) Upload(_ ipc.ServerContext, stream repository.ContentServiceUploadStream) (string, error) {
+func (s *server) DownloadURL(ipc.ServerContext) (string, int64, error) {
+ vlog.VI(2).Infof("DownloadURL() was called. suffix=%v", s.suffix)
+ return "", 0, nil
+}
+
+func (s *server) Stat(ipc.ServerContext) ([]binary.PartInfo, error) {
+ vlog.VI(2).Infof("Stat() was called. suffix=%v", s.suffix)
+ return []binary.PartInfo{}, nil
+}
+
+func (s *server) Upload(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceUploadStream) error {
vlog.VI(2).Infof("Upload() was called. suffix=%v", s.suffix)
for {
if _, err := stream.Recv(); err != nil {
break
}
}
- return "newcontentid", nil
+ return nil
}
type dispatcher struct {
@@ -57,7 +73,7 @@
}
func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerContent(&server{suffix: suffix}))
+ invoker := ipc.ReflectInvoker(repository.NewServerBinary(&server{suffix: suffix}))
return invoker, nil, nil
}
@@ -86,7 +102,7 @@
}
}
-func TestContentClient(t *testing.T) {
+func TestBinaryClient(t *testing.T) {
runtime := rt.Init()
server, endpoint, err := startServer(t, runtime)
if err != nil {
@@ -102,13 +118,13 @@
if err := cmd.Execute([]string{"delete", naming.JoinAddressName(endpoint.String(), "//exists")}); err != nil {
t.Fatalf("%v", err)
}
- if expected, got := "Content deleted successfully", strings.TrimSpace(stdout.String()); got != expected {
+ if expected, got := "Binary deleted successfully", strings.TrimSpace(stdout.String()); got != expected {
t.Errorf("Got %q, expected %q", got, expected)
}
stdout.Reset()
// Test the 'download' command.
- dir, err := ioutil.TempDir("", "contentimpltest")
+ dir, err := ioutil.TempDir("", "binaryimpltest")
if err != nil {
t.Fatalf("%v", err)
}
@@ -118,7 +134,7 @@
if err := cmd.Execute([]string{"download", naming.JoinAddressName(endpoint.String(), "//exists"), file}); err != nil {
t.Fatalf("%v", err)
}
- if expected, got := "Content downloaded to file "+file, strings.TrimSpace(stdout.String()); got != expected {
+ if expected, got := "Binary downloaded to file "+file, strings.TrimSpace(stdout.String()); got != expected {
t.Errorf("Got %q, expected %q", got, expected)
}
buf, err := ioutil.ReadFile(file)
@@ -131,10 +147,7 @@
stdout.Reset()
// Test the 'upload' command.
- if err := cmd.Execute([]string{"upload", naming.JoinAddressName(endpoint.String(), ""), file}); err != nil {
+ if err := cmd.Execute([]string{"upload", naming.JoinAddressName(endpoint.String(), "//exists"), file}); err != nil {
t.Fatalf("%v", err)
}
- if expected, got := "newcontentid", strings.TrimSpace(stdout.String()); got != expected {
- t.Errorf("Got %q, expected %q", got, expected)
- }
}
diff --git a/tools/content/main.go b/tools/binary/main.go
similarity index 79%
rename from tools/content/main.go
rename to tools/binary/main.go
index 40f653b..de795aa 100644
--- a/tools/content/main.go
+++ b/tools/binary/main.go
@@ -1,7 +1,7 @@
package main
import (
- "veyron/tools/content/impl"
+ "veyron/tools/binary/impl"
"veyron2/rt"
)
diff --git a/tools/content/impl/impl.go b/tools/content/impl/impl.go
deleted file mode 100644
index 970de79..0000000
--- a/tools/content/impl/impl.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package impl
-
-import (
- "fmt"
- "io"
- "os"
-
- "veyron/lib/cmdline"
-
- "veyron2/rt"
- "veyron2/services/mgmt/repository"
-)
-
-var cmdDelete = &cmdline.Command{
- Run: runDelete,
- Name: "delete",
- Short: "Delete content",
- Long: "Delete connects to the content repository and deletes the specified content",
- ArgsName: "<content>",
- ArgsLong: "<content> is the full name of the content to delete.",
-}
-
-func runDelete(cmd *cmdline.Command, args []string) error {
- if expected, got := 1, len(args); expected != got {
- return cmd.Errorf("delete: incorrect number of arguments, expected %d, got %d", expected, got)
- }
- c, err := repository.BindContent(args[0])
- if err != nil {
- return fmt.Errorf("bind error: %v", err)
- }
- if err = c.Delete(rt.R().NewContext()); err != nil {
- return err
- }
- fmt.Fprintf(cmd.Stdout(), "Content deleted successfully\n")
- return nil
-}
-
-var cmdDownload = &cmdline.Command{
- Run: runDownload,
- Name: "download",
- Short: "Download content",
- Long: `
-Download connects to the content repository, downloads the specified content, and
-writes it to a file.
-`,
- ArgsName: "<content> <filename>",
- ArgsLong: `
-<content> is the full name of the content to download
-<filename> is the name of the file where the content will be written
-`,
-}
-
-func runDownload(cmd *cmdline.Command, args []string) error {
- if expected, got := 2, len(args); expected != got {
- return cmd.Errorf("download: incorrect number of arguments, expected %d, got %d", expected, got)
- }
-
- f, err := os.OpenFile(args[1], os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
- if err != nil {
- return fmt.Errorf("failed to open %q: %v", args[0], err)
- }
- defer f.Close()
-
- c, err := repository.BindContent(args[0])
- if err != nil {
- return fmt.Errorf("bind error: %v", err)
- }
-
- stream, err := c.Download(rt.R().NewContext())
- if err != nil {
- return err
- }
-
- for {
- buf, err := stream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("recv error: %v", err)
- }
- if _, err = f.Write(buf); err != nil {
- return fmt.Errorf("write error: %v", err)
- }
- }
-
- err = stream.Finish()
- if err != nil {
- return fmt.Errorf("finish error: %v", err)
- }
-
- fmt.Fprintf(cmd.Stdout(), "Content downloaded to file %s\n", args[1])
- return nil
-}
-
-var cmdUpload = &cmdline.Command{
- Run: runUpload,
- Name: "upload",
- Short: "Upload content",
- Long: `
-Upload connects to the content repository and uploads the content of the specified
-file. When successful, it writes the name of the new content to stdout.
-`,
- ArgsName: "<server> <filename>",
- ArgsLong: `
-<server> is the veyron name or endpoint of the content repository.
-<filename> is the name of the file to upload.
-`,
-}
-
-func runUpload(cmd *cmdline.Command, args []string) error {
- if expected, got := 2, len(args); expected != got {
- return cmd.Errorf("upload: incorrect number of arguments, expected %d, got %d", expected, got)
- }
-
- f, err := os.Open(args[1])
- if err != nil {
- return fmt.Errorf("failed to open %q: %v", args[1], err)
- }
- defer f.Close()
-
- c, err := repository.BindContent(args[0])
- if err != nil {
- return fmt.Errorf("bind error: %v", err)
- }
-
- stream, err := c.Upload(rt.R().NewContext())
- if err != nil {
- return err
- }
-
- var buf [4096]byte
- for {
- n, err := f.Read(buf[:])
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("read error: %v", err)
- }
- if err = stream.Send(buf[:n]); err != nil {
- return fmt.Errorf("send error: %v", err)
- }
- }
- if err = stream.CloseSend(); err != nil {
- return fmt.Errorf("closesend error: %v", err)
- }
-
- name, err := stream.Finish()
- if err != nil {
- return fmt.Errorf("finish error: %v", err)
- }
-
- fmt.Fprintf(cmd.Stdout(), "%s\n", name)
- return nil
-}
-
-func Root() *cmdline.Command {
- return &cmdline.Command{
- Name: "content",
- Short: "Command-line tool for interacting with the veyron content repository",
- Long: "Command-line tool for interacting with the veyron content repository",
- Children: []*cmdline.Command{cmdDelete, cmdDownload, cmdUpload},
- }
-}