veyron/services/mgmt/binary: HTTP download for binary service.

The implementation uses the http file server functionality, and we provide
an implementation of http.File that wraps the content parts we have in the
binary server.

For now, make any content available via http (DownloadURL is still not
implemented).  We'll revisit that later.

As part of this change, factor out some code from dispatcher and invoker
that was also needed for the http implementation.

While at it, fix some issues discovered while working with this codebase.
Among them, how Upload/Download handle invalid part indices.

Change-Id: Ief4153b68930d5edbf8b83e36c3b1396369d5f3c
diff --git a/services/mgmt/binary/binaryd/main.go b/services/mgmt/binary/binaryd/main.go
index 3a74c7c..28bb8ad 100644
--- a/services/mgmt/binary/binaryd/main.go
+++ b/services/mgmt/binary/binaryd/main.go
@@ -3,6 +3,7 @@
 import (
 	"flag"
 	"io/ioutil"
+	"net/http"
 	"os"
 	"path/filepath"
 
@@ -59,6 +60,21 @@
 		}
 	}
 	vlog.Infof("Binary repository rooted at %v", *root)
+
+	state, err := impl.NewState(*root, defaultDepth)
+	if err != nil {
+		vlog.Errorf("NewState(%v, %v) failed: %v", *root, defaultDepth, err)
+		return
+	}
+
+	// TODO(caprita): Flagify port.
+	go func() {
+		if err := http.ListenAndServe(":8080", http.FileServer(impl.NewHTTPRoot(state))); err != nil {
+			vlog.Errorf("ListenAndServe() failed: %v", err)
+			os.Exit(1)
+		}
+	}()
+
 	runtime := rt.Init()
 	defer runtime.Cleanup()
 	server, err := runtime.NewServer()
@@ -68,17 +84,12 @@
 	}
 	defer server.Stop()
 	auth := vflag.NewAuthorizerOrDie()
-	dispatcher, err := impl.NewDispatcher(*root, defaultDepth, auth)
-	if err != nil {
-		vlog.Errorf("NewDispatcher(%v, %v, %v) failed: %v", *root, defaultDepth, auth, err)
-		return
-	}
 	endpoint, err := server.Listen(roaming.ListenSpec)
 	if err != nil {
 		vlog.Errorf("Listen(%s) failed: %v", roaming.ListenSpec, err)
 		return
 	}
-	if err := server.Serve(*name, dispatcher); err != nil {
+	if err := server.Serve(*name, impl.NewDispatcher(state, auth)); err != nil {
 		vlog.Errorf("Serve(%v) failed: %v", *name, err)
 		return
 	}
diff --git a/services/mgmt/binary/binaryd/test.sh b/services/mgmt/binary/binaryd/test.sh
index dae104b..e3547e9 100755
--- a/services/mgmt/binary/binaryd/test.sh
+++ b/services/mgmt/binary/binaryd/test.sh
@@ -27,7 +27,8 @@
     || shell_test::fail "line ${LINENO} failed to start binaryd"
 
   # Create a binary file.
-  local -r BINARY="${REPO}/test-binary"
+  local -r BINARY_SUFFIX="test-binary"
+  local -r BINARY="${REPO}/${BINARY_SUFFIX}"
   local -r BINARY_FILE=$(shell::tmp_file)
   dd if=/dev/urandom of="${BINARY_FILE}" bs=1000000 count=16 \
     || shell_test::fail "line ${LINENO}: faile to create a random binary file"
@@ -35,9 +36,15 @@
 
   # Download the binary file.
   local -r BINARY_FILE2=$(shell::tmp_file)
-  "${BINARY_BIN}" download "${BINARY}" "${BINARY_FILE2}" || shell_test::fail "line ${LINENO}: 'download' failed"
+  "${BINARY_BIN}" download "${BINARY}" "${BINARY_FILE2}" || shell_test::fail "line ${LINENO}: 'RPC download' failed"
   if [[ $(cmp "${BINARY_FILE}" "${BINARY_FILE2}" &> /dev/null) ]]; then
-    shell_test::fail "mismatching binary files"
+    shell_test::fail "mismatching binary file downloaded via RPC"
+  fi
+
+  local -r BINARY_FILE3=$(shell::tmp_file)
+  curl -f -o "${BINARY_FILE3}" http://localhost:8080/"${BINARY_SUFFIX}" || shell_test::fail "line ${LINENO}: 'HTTP download' failed"
+  if [[ $(cmp "${BINARY_FILE}" "${BINARY_FILE3}" &> /dev/null) ]]; then
+    shell_test::fail "mismatching binary file downloaded via HTTP"
   fi
 
   # Remove the binary file.
diff --git a/services/mgmt/binary/impl/dispatcher.go b/services/mgmt/binary/impl/dispatcher.go
index 7f54fb7..465b0a6 100644
--- a/services/mgmt/binary/impl/dispatcher.go
+++ b/services/mgmt/binary/impl/dispatcher.go
@@ -24,8 +24,11 @@
 	state *state
 }
 
-// newDispatcher is the dispatcher factory.
-func NewDispatcher(root string, depth int, authorizer security.Authorizer) (*dispatcher, error) {
+// TODO(caprita): Move this together with state into a new file, state.go.
+
+// NewState creates a new state object for the binary service.  This
+// should be passed into both NewDispatcher and NewHTTPRoot.
+func NewState(root string, depth int) (*state, error) {
 	if min, max := 0, md5.Size-1; min > depth || depth > max {
 		return nil, fmt.Errorf("Unexpected depth, expected a value between %v and %v, got %v", min, max, depth)
 	}
@@ -40,15 +43,20 @@
 	if expected, got := Version, strings.TrimSpace(string(output)); expected != got {
 		return nil, fmt.Errorf("Unexpected version: expected %v, got %v", expected, got)
 	}
-	return &dispatcher{
-		auth: authorizer,
-		state: &state{
-			depth: depth,
-			root:  root,
-		},
+	return &state{
+		depth: depth,
+		root:  root,
 	}, nil
 }
 
+// NewDispatcher is the dispatcher factory.
+func NewDispatcher(state *state, authorizer security.Authorizer) ipc.Dispatcher {
+	return &dispatcher{
+		auth:  authorizer,
+		state: state,
+	}
+}
+
 // DISPATCHER INTERFACE IMPLEMENTATION
 
 func (d *dispatcher) Lookup(suffix, method string) (interface{}, security.Authorizer, error) {
diff --git a/services/mgmt/binary/impl/http.go b/services/mgmt/binary/impl/http.go
new file mode 100644
index 0000000..2c189b3
--- /dev/null
+++ b/services/mgmt/binary/impl/http.go
@@ -0,0 +1,49 @@
+package impl
+
+import (
+	"net/http"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"veyron.io/veyron/veyron2/vlog"
+
+	"veyron.io/veyron/veyron/services/mgmt/binary/impl/merge_file"
+)
+
+// NewHTTPRoot returns an implementation of http.FileSystem that can be used
+// to serve the content in the binary service.
+func NewHTTPRoot(state *state) http.FileSystem {
+	return &httpRoot{state}
+}
+
+type httpRoot struct {
+	state *state
+}
+
+// TODO(caprita): Tie this in with DownloadURL, to control which binaries
+// are downloadable via url.
+
+// Open implements http.FileSystem.  It uses the merge file implementation
+// to wrap the content parts into one logical file.
+func (r httpRoot) Open(name string) (http.File, error) {
+	name = strings.TrimPrefix(name, "/")
+	vlog.Infof("HTTP handler opening %s", name)
+	parts, err := getParts(dir(name, r.state))
+	if err != nil {
+		return nil, err
+	}
+	partFiles := make([]*os.File, len(parts))
+	for i, part := range parts {
+		if err := checksumExists(part); err != nil {
+			return nil, err
+		}
+		dataPath := filepath.Join(part, data)
+		var err error
+		if partFiles[i], err = os.Open(dataPath); err != nil {
+			vlog.Errorf("Open(%v) failed: %v", dataPath, err)
+			return nil, errOperationFailed
+		}
+	}
+	return merge_file.NewMergeFile(name, partFiles)
+}
diff --git a/services/mgmt/binary/impl/http_test.go b/services/mgmt/binary/impl/http_test.go
new file mode 100644
index 0000000..5b31d0f
--- /dev/null
+++ b/services/mgmt/binary/impl/http_test.go
@@ -0,0 +1,75 @@
+package impl
+
+import (
+	"bytes"
+	"crypto/md5"
+	"encoding/hex"
+	"io/ioutil"
+	"net/http"
+	"testing"
+
+	"veyron.io/veyron/veyron2/rt"
+
+	"veyron.io/veyron/veyron/lib/testutil"
+)
+
+// TestHTTP checks that HTTP download works.
+func TestHTTP(t *testing.T) {
+	// TODO(caprita): This is based on TestMultiPart (impl_test.go).  Share
+	// the code where possible.
+	for length := 2; length < 5; length++ {
+		binary, url, cleanup := startServer(t, 2)
+		defer cleanup()
+		// Create <length> chunks of up to 4MB of random bytes.
+		data := make([][]byte, length)
+		for i := 0; i < length; i++ {
+			// Random size, but at least 1 (avoid empty parts).
+			size := testutil.Rand.Intn(1000*bufferLength) + 1
+			data[i] = testutil.RandomBytes(size)
+		}
+		if err := binary.Create(rt.R().NewContext(), int32(length)); err != nil {
+			t.Fatalf("Create() failed: %v", err)
+		}
+		for i := 0; i < length; i++ {
+			if streamErr, err := invokeUpload(t, binary, data[i], int32(i)); streamErr != nil || err != nil {
+				t.FailNow()
+			}
+		}
+		parts, err := binary.Stat(rt.R().NewContext())
+		if err != nil {
+			t.Fatalf("Stat() failed: %v", err)
+		}
+		response, err := http.Get(url)
+		if err != nil {
+			t.Fatal(err)
+		}
+		downloaded, err := ioutil.ReadAll(response.Body)
+		if err != nil {
+			t.Fatal(err)
+		}
+		from, to := 0, 0
+		for i := 0; i < length; i++ {
+			hpart := md5.New()
+			to += len(data[i])
+			if ld := len(downloaded); to > ld {
+				t.Fatalf("Download falls short: len(downloaded):%d, need:%d (i:%d, length:%d)", ld, to, i, length)
+			}
+			output := downloaded[from:to]
+			from = to
+			if bytes.Compare(output, data[i]) != 0 {
+				t.Fatalf("Unexpected output: expected %v, got %v", data[i], output)
+			}
+			hpart.Write(data[i])
+			checksum := hex.EncodeToString(hpart.Sum(nil))
+			if expected, got := checksum, parts[i].Checksum; expected != got {
+				t.Fatalf("Unexpected checksum: expected %v, got %v", expected, got)
+			}
+			if expected, got := len(data[i]), int(parts[i].Size); expected != got {
+				t.Fatalf("Unexpected size: expected %v, got %v", expected, got)
+			}
+		}
+		if err := binary.Delete(rt.R().NewContext()); err != nil {
+			t.Fatalf("Delete() failed: %v", err)
+		}
+	}
+}
diff --git a/services/mgmt/binary/impl/impl_test.go b/services/mgmt/binary/impl/impl_test.go
index ba13d5d..8212e1e 100644
--- a/services/mgmt/binary/impl/impl_test.go
+++ b/services/mgmt/binary/impl/impl_test.go
@@ -4,7 +4,10 @@
 	"bytes"
 	"crypto/md5"
 	"encoding/hex"
+	"fmt"
 	"io/ioutil"
+	"net"
+	"net/http"
 	"os"
 	"path/filepath"
 	"testing"
@@ -92,7 +95,7 @@
 }
 
 // startServer starts the binary repository server.
-func startServer(t *testing.T, depth int) (repository.Binary, func()) {
+func startServer(t *testing.T, depth int) (repository.Binary, string, func()) {
 	// Setup the root of the binary repository.
 	root, err := ioutil.TempDir("", veyronPrefix)
 	if err != nil {
@@ -107,10 +110,20 @@
 	if err != nil {
 		t.Fatalf("NewServer() failed: %v", err)
 	}
-	dispatcher, err := NewDispatcher(root, depth, nil)
+	state, err := NewState(root, depth)
 	if err != nil {
-		t.Fatalf("NewDispatcher(%v, %v, %v) failed: %v", root, depth, nil, err)
+		t.Fatalf("NewState(%v, %v) failed: %v", root, depth, err)
 	}
+	listener, err := net.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	go func() {
+		if err := http.Serve(listener, http.FileServer(NewHTTPRoot(state))); err != nil {
+			vlog.Fatalf("Serve() failed: %v", err)
+		}
+	}()
+	dispatcher := NewDispatcher(state, nil)
 	endpoint, err := server.Listen(profiles.LocalListenSpec)
 	if err != nil {
 		t.Fatalf("Listen(%s) failed: %v", profiles.LocalListenSpec, err)
@@ -124,17 +137,17 @@
 	if err != nil {
 		t.Fatalf("BindBinary(%v) failed: %v", name, err)
 	}
-	return binary, func() {
+	return binary, fmt.Sprintf("http://%s/test", listener.Addr()), func() {
 		// Shutdown the binary repository server.
 		if err := server.Stop(); err != nil {
 			t.Fatalf("Stop() failed: %v", err)
 		}
-		if err := os.Remove(path); err != nil {
+		if err := os.RemoveAll(path); err != nil {
 			t.Fatalf("Remove(%v) failed: %v", path, err)
 		}
 		// Check that any directories and files that were created to
 		// represent the binary objects have been garbage collected.
-		if err := os.Remove(root); err != nil {
+		if err := os.RemoveAll(root); err != nil {
 			t.Fatalf("Remove(%v) failed: %v", root, err)
 		}
 	}
@@ -145,7 +158,7 @@
 // hierarchy that stores binary objects in the local file system.
 func TestHierarchy(t *testing.T) {
 	for i := 0; i < md5.Size; i++ {
-		binary, cleanup := startServer(t, i)
+		binary, _, cleanup := startServer(t, i)
 		defer cleanup()
 		// Create up to 4MB of random bytes.
 		size := testutil.Rand.Intn(1000 * bufferLength)
@@ -188,7 +201,7 @@
 // consists of.
 func TestMultiPart(t *testing.T) {
 	for length := 2; length < 5; length++ {
-		binary, cleanup := startServer(t, 2)
+		binary, _, cleanup := startServer(t, 2)
 		defer cleanup()
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
@@ -209,7 +222,6 @@
 		if err != nil {
 			t.Fatalf("Stat() failed: %v", err)
 		}
-		h := md5.New()
 		for i := 0; i < length; i++ {
 			hpart := md5.New()
 			output, streamErr, err := invokeDownload(t, binary, int32(i))
@@ -219,7 +231,6 @@
 			if bytes.Compare(output, data[i]) != 0 {
 				t.Fatalf("Unexpected output: expected %v, got %v", data[i], output)
 			}
-			h.Write(data[i])
 			hpart.Write(data[i])
 			checksum := hex.EncodeToString(hpart.Sum(nil))
 			if expected, got := checksum, parts[i].Checksum; expected != got {
@@ -240,7 +251,7 @@
 // of.
 func TestResumption(t *testing.T) {
 	for length := 2; length < 5; length++ {
-		binary, cleanup := startServer(t, 2)
+		binary, _, cleanup := startServer(t, 2)
 		defer cleanup()
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
@@ -282,9 +293,9 @@
 
 // TestErrors checks that the binary interface correctly reports errors.
 func TestErrors(t *testing.T) {
-	binary, cleanup := startServer(t, 2)
+	binary, _, cleanup := startServer(t, 2)
 	defer cleanup()
-	length := 2
+	const length = 2
 	data := make([][]byte, length)
 	for i := 0; i < length; i++ {
 		size := testutil.Rand.Intn(1000 * bufferLength)
@@ -320,6 +331,20 @@
 	if _, streamErr, err := invokeDownload(t, binary, 0); streamErr != nil || err != nil {
 		t.Fatalf("Download() failed: %v", err)
 	}
+	// Upload/Download on a part number that's outside the range set forth in
+	// Create should fail.
+	for _, part := range []int32{-1, length} {
+		if _, err := invokeUpload(t, binary, []byte("dummy"), part); err == nil {
+			t.Fatalf("Upload() did not fail when it should have")
+		} else if want := verror.BadArg; !verror.Is(err, want) {
+			t.Fatalf("Unexpected error: %v, expected error id %v", err, want)
+		}
+		if _, _, err := invokeDownload(t, binary, part); err == nil {
+			t.Fatalf("Download() did not fail when it should have")
+		} else if want := verror.BadArg; !verror.Is(err, want) {
+			t.Fatalf("Unexpected error: %v, expected error id %v", err, want)
+		}
+	}
 	if err := binary.Delete(rt.R().NewContext()); err != nil {
 		t.Fatalf("Delete() failed: %v", err)
 	}
diff --git a/services/mgmt/binary/impl/invoker.go b/services/mgmt/binary/impl/invoker.go
index 423f44f..893abcf 100644
--- a/services/mgmt/binary/impl/invoker.go
+++ b/services/mgmt/binary/impl/invoker.go
@@ -4,11 +4,9 @@
 // MD5 hash of the suffix and generates the following path in the
 // local filesystem: /<root>/<dir_1>/.../<dir_n>/<hash>. The root and
 // the directory depth are parameters of the implementation. The
-// contents of the directory include the checksum and data for the
-// object and each of its individual parts:
+// contents of the directory include the checksum and data for each of
+// the individual parts of the binary:
 //
-// checksum
-// data
 // <part_1>/checksum
 // <part_1>/data
 // ...
@@ -86,6 +84,7 @@
 	errNotFound        = verror.NoExistf("binary not found")
 	errInProgress      = verror.Internalf("identical upload already in progress")
 	errInvalidParts    = verror.BadArgf("invalid number of binary parts")
+	errInvalidPart     = verror.BadArgf("invalid binary part number")
 	errOperationFailed = verror.Internalf("operation failed")
 )
 
@@ -96,10 +95,8 @@
 	Size:     binary.MissingSize,
 }
 
-// newInvoker is the invoker factory.
-func newInvoker(state *state, suffix string) *invoker {
-	// Generate the local filesystem path for the object identified by
-	// the object name suffix.
+// dir generates the local filesystem path for the binary identified by suffix.
+func dir(suffix string, state *state) string {
 	h := md5.New()
 	h.Write([]byte(suffix))
 	hash := hex.EncodeToString(h.Sum(nil))
@@ -107,9 +104,13 @@
 	for j := 0; j < state.depth; j++ {
 		dir = filepath.Join(dir, hash[j*2:(j+1)*2])
 	}
-	path := filepath.Join(state.root, dir, hash)
+	return filepath.Join(state.root, dir, hash)
+}
+
+// newInvoker is the invoker factory.
+func newInvoker(state *state, suffix string) *invoker {
 	return &invoker{
-		path:   path,
+		path:   dir(suffix, state),
 		state:  state,
 		suffix: suffix,
 	}
@@ -119,18 +120,26 @@
 
 const bufferLength = 4096
 
-// checksumExists checks whether the given path contains a
-// checksum. The implementation uses the existence of checksum to
-// determine whether the binary (part) identified by the given path
+// checksumExists checks whether the given part path is valid and
+// contains a checksum. The implementation uses the existence of
+// the path dir to determine whether the part is valid, and the
+// existence of checksum to determine whether the binary part
 // exists.
-func (i *invoker) checksumExists(path string) error {
+func checksumExists(path string) error {
+	switch _, err := os.Stat(path); {
+	case os.IsNotExist(err):
+		return errInvalidPart
+	case err != nil:
+		vlog.Errorf("Stat(%v) failed: %v", path, err)
+		return errOperationFailed
+	}
 	checksumFile := filepath.Join(path, checksum)
 	_, err := os.Stat(checksumFile)
 	switch {
 	case os.IsNotExist(err):
 		return errNotFound
 	case err != nil:
-		vlog.Errorf("Stat(%v) failed: %v", path, err)
+		vlog.Errorf("Stat(%v) failed: %v", checksumFile, err)
 		return errOperationFailed
 	default:
 		return nil
@@ -139,30 +148,38 @@
 
 // generatePartPath generates a path for the given binary part.
 func (i *invoker) generatePartPath(part int) string {
-	return filepath.Join(i.path, fmt.Sprintf("%d", part))
+	return generatePartPath(i.path, part)
+}
+
+func generatePartPath(dir string, part int) string {
+	return filepath.Join(dir, fmt.Sprintf("%d", part))
 }
 
 // getParts returns a collection of paths to the parts of the binary.
-func (i *invoker) getParts() ([]string, error) {
-	infos, err := ioutil.ReadDir(i.path)
+func getParts(path string) ([]string, error) {
+	infos, err := ioutil.ReadDir(path)
 	if err != nil {
-		vlog.Errorf("ReadDir(%v) failed: %v", i.path, err)
+		vlog.Errorf("ReadDir(%v) failed: %v", path, err)
 		return []string{}, errOperationFailed
 	}
-	n := 0
 	result := make([]string, len(infos))
 	for _, info := range infos {
 		if info.IsDir() {
-			idx, err := strconv.Atoi(info.Name())
+			partName := info.Name()
+			idx, err := strconv.Atoi(partName)
 			if err != nil {
-				vlog.Errorf("Atoi(%v) failed: %v", info.Name(), err)
+				vlog.Errorf("Atoi(%v) failed: %v", partName, err)
 				return []string{}, errOperationFailed
 			}
-			result[idx] = filepath.Join(i.path, info.Name())
-			n++
+			if idx < 0 || idx >= len(infos) || result[idx] != "" {
+				return []string{}, errOperationFailed
+			}
+			result[idx] = filepath.Join(path, partName)
+		} else {
+			// The only entries should correspond to the part dirs.
+			return []string{}, errOperationFailed
 		}
 	}
-	result = result[:n]
 	return result, nil
 }
 
@@ -183,7 +200,7 @@
 		return errOperationFailed
 	}
 	for j := 0; j < int(nparts); j++ {
-		partPath, partPerm := filepath.Join(tmpDir, fmt.Sprintf("%d", j)), os.FileMode(0700)
+		partPath, partPerm := generatePartPath(tmpDir, j), os.FileMode(0700)
 		if err := os.MkdirAll(partPath, partPerm); err != nil {
 			vlog.Errorf("MkdirAll(%v, %v) failed: %v", partPath, partPerm, err)
 			if err := os.RemoveAll(tmpDir); err != nil {
@@ -250,13 +267,13 @@
 func (i *invoker) Download(context ipc.ServerContext, part int32, stream repository.BinaryServiceDownloadStream) error {
 	vlog.Infof("%v.Download(%v)", i.suffix, part)
 	path := i.generatePartPath(int(part))
-	if err := i.checksumExists(path); err != nil {
+	if err := checksumExists(path); err != nil {
 		return err
 	}
 	dataPath := filepath.Join(path, data)
 	file, err := os.Open(dataPath)
 	if err != nil {
-		vlog.Errorf("Open(%v) failed: %v", path, err)
+		vlog.Errorf("Open(%v) failed: %v", dataPath, err)
 		return errOperationFailed
 	}
 	defer file.Close()
@@ -288,7 +305,7 @@
 func (i *invoker) Stat(ipc.ServerContext) ([]binary.PartInfo, error) {
 	vlog.Infof("%v.Stat()", i.suffix)
 	result := make([]binary.PartInfo, 0)
-	parts, err := i.getParts()
+	parts, err := getParts(i.path)
 	if err != nil {
 		return []binary.PartInfo{}, err
 	}
@@ -321,7 +338,7 @@
 func (i *invoker) Upload(context ipc.ServerContext, part int32, stream repository.BinaryServiceUploadStream) error {
 	vlog.Infof("%v.Upload(%v)", i.suffix, part)
 	path, suffix := i.generatePartPath(int(part)), ""
-	err := i.checksumExists(path)
+	err := checksumExists(path)
 	switch err {
 	case nil:
 		return errExists
diff --git a/services/mgmt/binary/impl/merge_file/merge_file.go b/services/mgmt/binary/impl/merge_file/merge_file.go
new file mode 100644
index 0000000..06dd1a2
--- /dev/null
+++ b/services/mgmt/binary/impl/merge_file/merge_file.go
@@ -0,0 +1,184 @@
+// merge_file provides an implementation for http.File that merges
+// several files into one logical file.
+package merge_file
+
+// TODO(caprita): rename this package to multipart, and the constructor to
+// NewFile. Usage: f, err := multipart.NewFile(...).
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"time"
+)
+
+var internalErr = fmt.Errorf("internal error")
+
+// NewMergeFile creates the "merge" file out of the provided parts.
+// The sizes of the parts are captured at the outset and not updated
+// for the lifetime of the merge file (any subsequent modifications
+// in the parts will cause Read and Seek to work incorrectly).
+func NewMergeFile(name string, parts []*os.File) (http.File, error) {
+	fileParts := make([]filePart, len(parts))
+	for i, p := range parts {
+		stat, err := p.Stat()
+		if err != nil {
+			return nil, err
+		}
+		size := stat.Size()
+		// TODO(caprita): we can relax this restriction later.
+		if size == 0 {
+			return nil, fmt.Errorf("Part is empty")
+		}
+		fileParts[i] = filePart{file: p, size: size}
+	}
+	return &mergeFile{name: name, parts: fileParts}, nil
+}
+
+type filePart struct {
+	file *os.File
+	size int64
+}
+
+type mergeFile struct {
+	name       string
+	parts      []filePart
+	activePart int
+	partOffset int64
+}
+
+func (m *mergeFile) currPos() (res int64) {
+	for i := 0; i < m.activePart; i++ {
+		res += m.parts[i].size
+	}
+	res += m.partOffset
+	return
+}
+
+func (m *mergeFile) totalSize() (res int64) {
+	for _, p := range m.parts {
+		res += p.size
+	}
+	return
+}
+
+// Readdir is not implemented.
+func (*mergeFile) Readdir(int) ([]os.FileInfo, error) {
+	return nil, fmt.Errorf("Not implemented")
+}
+
+type fileInfo struct {
+	name    string
+	size    int64
+	mode    os.FileMode
+	modTime time.Time
+}
+
+// Name returns the name of the merge file.
+func (f *fileInfo) Name() string {
+	return f.name
+}
+
+// Size returns the size of the merge file (the sum of all parts).
+func (f *fileInfo) Size() int64 {
+	return f.size
+}
+
+// Mode is currently hardcoded to 0700.
+func (f *fileInfo) Mode() os.FileMode {
+	return f.mode
+}
+
+// ModTime is set to the current time.
+func (f *fileInfo) ModTime() time.Time {
+	return f.modTime
+}
+
+// IsDir always returns false.
+func (f *fileInfo) IsDir() bool {
+	return false
+}
+
+// Sys always returns nil.
+func (f *fileInfo) Sys() interface{} {
+	return nil
+}
+
+// Stat describes the merge file.
+func (m *mergeFile) Stat() (os.FileInfo, error) {
+	return &fileInfo{
+		name:    m.name,
+		size:    m.totalSize(),
+		mode:    0700,
+		modTime: time.Now(),
+	}, nil
+}
+
+// Close closes all the parts.
+func (m *mergeFile) Close() error {
+	var lastErr error
+	for _, p := range m.parts {
+		if err := p.file.Close(); err != nil {
+			lastErr = err
+		}
+	}
+	return lastErr
+}
+
+// Read reads from the parts in sequence.
+func (m *mergeFile) Read(buf []byte) (int, error) {
+	if m.activePart >= len(m.parts) {
+		return 0, io.EOF
+	}
+	p := m.parts[m.activePart]
+	n, err := p.file.Read(buf)
+	m.partOffset += int64(n)
+	if m.partOffset > p.size {
+		// Likely, the file has changed.
+		return 0, internalErr
+	}
+	if m.partOffset == p.size {
+		m.activePart++
+		if m.activePart < len(m.parts) {
+			if _, err := m.parts[m.activePart].file.Seek(0, 0); err != nil {
+				return 0, err
+			}
+			m.partOffset = 0
+		}
+	}
+	return n, err
+}
+
+// Seek seeks into the part corresponding to the global offset.
+func (m *mergeFile) Seek(offset int64, whence int) (int64, error) {
+	var target int64
+	switch whence {
+	case 0:
+		target = offset
+	case 1:
+		target = m.currPos() + offset
+	case 2:
+		target = m.totalSize() - offset
+	default:
+		return 0, fmt.Errorf("invalid whence: %d", whence)
+	}
+	if target < 0 || target > m.totalSize() {
+		return 0, fmt.Errorf("invalid offset")
+	}
+	var c int64
+	for i, p := range m.parts {
+		if pSize := p.size; c+pSize <= target {
+			c += pSize
+			continue
+		}
+		m.activePart = i
+		if _, err := p.file.Seek(target-c, 0); err != nil {
+			return 0, err
+		}
+		m.partOffset = target - c
+		return target, nil
+	}
+	// target <= m.totalSize() should ensure this is never reached.
+	return 0, internalErr // Should not be reached.
+}
diff --git a/services/mgmt/binary/impl/merge_file/merge_file_test.go b/services/mgmt/binary/impl/merge_file/merge_file_test.go
new file mode 100644
index 0000000..1d903a1
--- /dev/null
+++ b/services/mgmt/binary/impl/merge_file/merge_file_test.go
@@ -0,0 +1,147 @@
+package merge_file_test
+
+import (
+	"io"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"testing"
+
+	"veyron.io/veyron/veyron/services/mgmt/binary/impl/merge_file"
+)
+
+func read(t *testing.T, m http.File, thisMuch int) string {
+	buf := make([]byte, thisMuch)
+	bytesRead := 0
+	for {
+		n, err := m.Read(buf[bytesRead:])
+		bytesRead += n
+		if bytesRead == thisMuch {
+			return string(buf)
+		}
+		switch err {
+		case nil:
+		case io.EOF:
+			return string(buf[:bytesRead])
+		default:
+			t.Fatalf("Read failed: %v", err)
+		}
+	}
+}
+
+// TestMergeFile verifies the http.File operations on the merge file.
+func TestMergeFile(t *testing.T) {
+	contents := []string{"v", "is", "for", "vanadium"}
+	files := make([]*os.File, len(contents))
+	d, err := ioutil.TempDir("", "merge_files")
+	if err != nil {
+		t.Fatalf("TempDir() failed: %v", err)
+	}
+	defer os.RemoveAll(d)
+	contentsSize := 0
+	for i, c := range contents {
+		contentsSize += len(c)
+		fPath := filepath.Join(d, strconv.Itoa(i))
+		if err := ioutil.WriteFile(fPath, []byte(c), 0600); err != nil {
+			t.Fatalf("WriteFile(%v) failed: %v", fPath, err)
+		}
+		var err error
+		if files[i], err = os.Open(fPath); err != nil {
+			t.Fatalf("Open(%v) failed: %v", fPath, err)
+		}
+	}
+	m, err := merge_file.NewMergeFile("bunnies", files)
+	if err != nil {
+		t.Fatalf("newMergeFile failed: %v", err)
+	}
+	defer func() {
+		if err := m.Close(); err != nil {
+			t.Fatalf("Close failed: %v", err)
+		}
+	}()
+	stat, err := m.Stat()
+	if err != nil {
+		t.Fatalf("Stat failed: %v", err)
+	}
+	if want, got := "bunnies", stat.Name(); want != got {
+		t.Fatalf("Name returned %s, expected %s", got, want)
+	}
+	if want, got := int64(contentsSize), stat.Size(); want != got {
+		t.Fatalf("Size returned %d, expected %d", got, want)
+	}
+	if want, got := strings.Join(contents, ""), read(t, m, 1024); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if want, got := "", read(t, m, 1024); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if pos, err := m.Seek(0, 0); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(0), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	if want, got := strings.Join(contents, ""), read(t, m, 1024); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if pos, err := m.Seek(0, 0); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(0), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	for _, c := range contents {
+		if want, got := c, read(t, m, len(c)); want != got {
+			t.Fatalf("Read %v, wanted %v instead", got, want)
+		}
+	}
+	if want, got := "", read(t, m, 1024); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if pos, err := m.Seek(1, 0); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(1), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	if want, got := "isfo", read(t, m, 4); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if pos, err := m.Seek(2, 1); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(7), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	if want, got := "anadi", read(t, m, 5); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if _, err := m.Seek(100, 1); err == nil {
+		t.Fatalf("Seek expected to fail")
+	}
+	if want, got := "u", read(t, m, 1); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if pos, err := m.Seek(8, 2); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(6), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	if want, got := "vanad", read(t, m, 5); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+	if _, err := m.Seek(100, 2); err == nil {
+		t.Fatalf("Seek expected to fail")
+	}
+	if pos, err := m.Seek(9, 2); err != nil {
+		t.Fatalf("Seek failed: %v", err)
+	} else if want, got := int64(5), pos; want != got {
+		t.Fatalf("Pos is %d, wanted %d", got, want)
+	}
+	if want, got := "rvana", read(t, m, 5); want != got {
+		t.Fatalf("Read %v, wanted %v instead", got, want)
+	}
+
+	// TODO(caprita): Add some auto-generated test cases where we seek/read
+	// using various combinations of indices.  These can be exhaustive or
+	// randomized, the idea is to get better coverage.
+}
diff --git a/services/mgmt/lib/binary/impl_test.go b/services/mgmt/lib/binary/impl_test.go
index ae29582..8ec796f 100644
--- a/services/mgmt/lib/binary/impl_test.go
+++ b/services/mgmt/lib/binary/impl_test.go
@@ -41,10 +41,11 @@
 		t.Fatalf("NewServer() failed: %v", err)
 	}
 	depth := 2
-	dispatcher, err := impl.NewDispatcher(root, depth, nil)
+	state, err := impl.NewState(root, depth)
 	if err != nil {
-		t.Fatalf("NewDispatcher(%v, %v, %v) failed: %v", root, depth, nil, err)
+		t.Fatalf("NewState(%v, %v) failed: %v", root, depth, err)
 	}
+	dispatcher := impl.NewDispatcher(state, nil)
 	endpoint, err := server.Listen(profiles.LocalListenSpec)
 	if err != nil {
 		t.Fatalf("Listen(%s) failed: %v", profiles.LocalListenSpec, err)