Merge "store/ptrie: add tests/benchmark"
diff --git a/runtime/factories/roaming/.api b/runtime/factories/roaming/.api
index c15af37..7443e1d 100644
--- a/runtime/factories/roaming/.api
+++ b/runtime/factories/roaming/.api
@@ -1,4 +1,4 @@
 pkg roaming, const SettingsStreamDesc ideal-string
 pkg roaming, const SettingsStreamName ideal-string
 pkg roaming, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
-pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
+pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, security.Authorizer, ...string) (func(), naming.Endpoint, error)
diff --git a/runtime/factories/static/.api b/runtime/factories/static/.api
index 2d3ebb3..c6a2583 100644
--- a/runtime/factories/static/.api
+++ b/runtime/factories/static/.api
@@ -1,2 +1,2 @@
 pkg static, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
-pkg static, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
+pkg static, func NewProxy(*context.T, rpc.ListenSpec, security.Authorizer, ...string) (func(), naming.Endpoint, error)
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index f6432bc..0254e9c 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,6 +26,8 @@
 	"v.io/x/ref/runtime/internal/rpc/version"
 )
 
+const reconnectDelay = 50 * time.Millisecond
+
 type manager struct {
 	rid    naming.RoutingID
 	closed chan struct{}
@@ -34,18 +36,20 @@
 
 	mu              *sync.Mutex
 	listenEndpoints []naming.Endpoint
+	proxyEndpoints  map[string][]naming.Endpoint // keyed by proxy address
 	listeners       []flow.Listener
 	wg              sync.WaitGroup
 }
 
 func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
 	m := &manager{
-		rid:       rid,
-		closed:    make(chan struct{}),
-		q:         upcqueue.New(),
-		cache:     NewConnCache(),
-		mu:        &sync.Mutex{},
-		listeners: []flow.Listener{},
+		rid:            rid,
+		closed:         make(chan struct{}),
+		q:              upcqueue.New(),
+		cache:          NewConnCache(),
+		mu:             &sync.Mutex{},
+		proxyEndpoints: make(map[string][]naming.Endpoint),
+		listeners:      []flow.Listener{},
 	}
 	go func() {
 		select {
@@ -72,28 +76,16 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Listen(ctx *context.T, protocol, address string) error {
-	var (
-		eps []naming.Endpoint
-		err error
-	)
 	if protocol == inaming.Network {
-		eps, err = m.proxyListen(ctx, address)
-	} else {
-		eps, err = m.listen(ctx, protocol, address)
+		return m.proxyListen(ctx, address)
 	}
-	if err != nil {
-		return err
-	}
-	m.mu.Lock()
-	m.listenEndpoints = append(m.listenEndpoints, eps...)
-	m.mu.Unlock()
-	return nil
+	return m.listen(ctx, protocol, address)
 }
 
-func (m *manager) listen(ctx *context.T, protocol, address string) ([]naming.Endpoint, error) {
+func (m *manager) listen(ctx *context.T, protocol, address string) error {
 	ln, err := listen(ctx, protocol, address)
 	if err != nil {
-		return nil, flow.NewErrNetwork(ctx, err)
+		return flow.NewErrNetwork(ctx, err)
 	}
 	local := &inaming.Endpoint{
 		Protocol: protocol,
@@ -102,33 +94,69 @@
 	}
 	m.mu.Lock()
 	if m.listeners == nil {
-		return nil, flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
+		return flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
 	}
 	m.listeners = append(m.listeners, ln)
 	m.mu.Unlock()
 	m.wg.Add(1)
 	go m.lnAcceptLoop(ctx, ln, local)
-	return []naming.Endpoint{local}, nil
+	m.mu.Lock()
+	m.listenEndpoints = append(m.listenEndpoints, local)
+	m.mu.Unlock()
+	return nil
 }
 
-func (m *manager) proxyListen(ctx *context.T, address string) ([]naming.Endpoint, error) {
+func (m *manager) proxyListen(ctx *context.T, address string) error {
 	ep, err := inaming.NewEndpoint(address)
 	if err != nil {
-		return nil, flow.NewErrBadArg(ctx, err)
+		return flow.NewErrBadArg(ctx, err)
 	}
-	f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
-	if err != nil {
-		return nil, flow.NewErrNetwork(ctx, err)
-	}
-	w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
-	if err != nil {
-		return nil, flow.NewErrBadArg(ctx, err)
-	}
-	if _, err := f.WriteMsg(w); err != nil {
-		return nil, flow.NewErrBadArg(ctx, err)
-	}
+	m.wg.Add(1)
+	go m.connectToProxy(ctx, address, ep)
+	return nil
+}
 
-	return m.readProxyResponse(ctx, f)
+func (m *manager) connectToProxy(ctx *context.T, address string, ep naming.Endpoint) {
+	defer m.wg.Done()
+	for delay := reconnectDelay; ; delay *= 2 {
+		time.Sleep(delay - reconnectDelay)
+		select {
+		case <-ctx.Done():
+			return
+		default:
+		}
+		f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
+		if err != nil {
+			ctx.Error(err)
+			continue
+		}
+		w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
+		if err != nil {
+			ctx.Error(err)
+			continue
+		}
+		if _, err = f.WriteMsg(w); err != nil {
+			ctx.Error(err)
+			continue
+		}
+		eps, err := m.readProxyResponse(ctx, f)
+		if err != nil {
+			ctx.Error(err)
+			continue
+		}
+		m.mu.Lock()
+		m.proxyEndpoints[address] = eps
+		m.mu.Unlock()
+		select {
+		case <-ctx.Done():
+			return
+		case <-f.Closed():
+			m.mu.Lock()
+			delete(m.proxyEndpoints, address)
+			m.mu.Unlock()
+			delay = reconnectDelay
+		}
+	}
 }
 
 func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
@@ -176,29 +204,37 @@
 			ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
 			return
 		}
+		cached := make(chan struct{})
 		c, err := conn.NewAccepted(
 			ctx,
 			flowConn,
 			local,
 			version.Supported,
-			&flowHandler{q: m.q},
+			&flowHandler{q: m.q, cached: cached},
 		)
 		if err != nil {
+			close(cached)
 			flowConn.Close()
 			ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
 			continue
 		}
 		if err := m.cache.InsertWithRoutingID(c); err != nil {
-			ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
+			close(cached)
+			ctx.Errorf("failed to cache conn %v: %v", c, err)
 		}
+		close(cached)
 	}
 }
 
 type flowHandler struct {
-	q *upcqueue.T
+	q      *upcqueue.T
+	cached chan struct{}
 }
 
 func (h *flowHandler) HandleFlow(f flow.Flow) error {
+	if h.cached != nil {
+		<-h.cached
+	}
 	return h.q.Put(f)
 }
 
@@ -236,6 +272,9 @@
 	m.mu.Lock()
 	ret := make([]naming.Endpoint, len(m.listenEndpoints))
 	copy(ret, m.listenEndpoints)
+	for _, peps := range m.proxyEndpoints {
+		ret = append(ret, peps...)
+	}
 	m.mu.Unlock()
 	if len(ret) == 0 {
 		ret = append(ret, &inaming.Endpoint{RID: m.rid})
@@ -334,6 +373,7 @@
 			fh,
 		)
 		if err != nil {
+			flowConn.Close()
 			if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
 				return nil, err
 			}
@@ -350,16 +390,17 @@
 
 	// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
 	// return a flow on that corresponding conn.
-	if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+	if proxyConn := c; remote.RoutingID() != proxyConn.RemoteEndpoint().RoutingID() {
 		c, err = conn.NewDialed(
 			ctx,
 			f,
-			c.LocalEndpoint(),
+			proxyConn.LocalEndpoint(),
 			remote,
 			version.Supported,
 			fh,
 		)
 		if err != nil {
+			proxyConn.Close(ctx, err)
 			if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
 				return nil, err
 			}
@@ -370,6 +411,7 @@
 		}
 		f, err = c.Dial(ctx, fn)
 		if err != nil {
+			proxyConn.Close(ctx, err)
 			return nil, flow.NewErrDialFailed(ctx, err)
 		}
 	}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index d0a9a41..9752a56 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -31,22 +31,23 @@
 func TestDirectConnection(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
-	defer shutdown()
 
-	rid := naming.FixedRoutingID(0x5555)
-	m := New(ctx, rid)
-
-	if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+	am := New(ctx, naming.FixedRoutingID(0x5555))
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
+	dm := New(ctx, naming.FixedRoutingID(0x1111))
 
-	testFlows(t, ctx, m, m, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+
+	shutdown()
+	<-am.Closed()
+	<-dm.Closed()
 }
 
 func TestDialCachedConn(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
-	defer shutdown()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
@@ -63,54 +64,67 @@
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
+	old := dm.(*manager).cache.ridCache[am.RoutingID()]
 	// After dialing another connection the cache should still hold one connection
 	// because the connections should be reused.
 	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
-		t.Fatalf("got cache size %v, want %v", got, want)
+		t.Errorf("got cache size %v, want %v", got, want)
 	}
+	if c := dm.(*manager).cache.ridCache[am.RoutingID()]; c != old {
+		t.Errorf("got %v want %v", c, old)
+	}
+
+	shutdown()
+	<-am.Closed()
+	<-dm.Closed()
 }
 
 func TestBidirectionalListeningEndpoint(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
-	defer shutdown()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-	eps := am.ListeningEndpoints()
-	if len(eps) == 0 {
-		t.Fatalf("no endpoints listened on")
-	}
+
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
 	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	// Now am should be able to make a flow to dm even though dm is not listening.
 	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
+
+	shutdown()
+	<-am.Closed()
+	<-dm.Closed()
 }
 
 func TestNullClientBlessings(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
-	defer shutdown()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-	dm := New(ctx, naming.NullRoutingID)
-	_, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	nulldm := New(ctx, naming.NullRoutingID)
+	_, af := testFlows(t, ctx, nulldm, am, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
 		t.Errorf("got %v, want zero-value blessings", rBlessings)
 	}
-	dm = New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111))
 	_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are
 	// non-zero if we did specify a RoutingID.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
 		t.Errorf("got %v, want non-zero blessings", rBlessings)
 	}
+
+	shutdown()
+	<-am.Closed()
+	<-dm.Closed()
+	<-nulldm.Closed()
 }
 
 func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
diff --git a/runtime/internal/naming/namespace/namespace.go b/runtime/internal/naming/namespace/namespace.go
index 1a9f109..eaa20c4 100644
--- a/runtime/internal/naming/namespace/namespace.go
+++ b/runtime/internal/naming/namespace/namespace.go
@@ -159,9 +159,9 @@
 	case verror.ErrBadArg.ID:
 		// This should cover "rpc: wrong number of in-args".
 		return true
-	case verror.ErrNoExist.ID, verror.ErrUnknownMethod.ID, verror.ErrUnknownSuffix.ID:
+	case verror.ErrNoExist.ID, verror.ErrUnknownMethod.ID, verror.ErrUnknownSuffix.ID, errNoServers.ID:
 		// This should cover "rpc: unknown method", "rpc: dispatcher not
-		// found", and dispatcher Lookup not found errors.
+		// found", dispatcher Lookup not found, and "No servers found to resolve query "errors.
 		return true
 	case verror.ErrBadProtocol.ID:
 		// This covers "rpc: response decoding failed: EOF".
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 8b3b892..10ed8a0 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -55,9 +55,6 @@
 	chosenEndpoints   []*inaming.Endpoint
 	typeCache         *typeCache
 
-	// state of proxies keyed by the name of the proxy
-	proxies map[string]proxyState
-
 	disp               rpc.Dispatcher // dispatcher to serve RPCs
 	dispReserved       rpc.Dispatcher // dispatcher for reserved methods
 	active             sync.WaitGroup // active goroutines we've spawned.
@@ -103,7 +100,6 @@
 		principal:         principal,
 		blessings:         principal.BlessingStore().Default(),
 		publisher:         publisher.New(ctx, ns, publishPeriod),
-		proxies:           make(map[string]proxyState),
 		stoppedChan:       make(chan struct{}),
 		ns:                ns,
 		stats:             newRPCStats(statsPrefix),
@@ -247,10 +243,18 @@
 	s.Lock()
 	defer s.Unlock()
 	var lastErr error
+	if len(listenSpec.Proxy) > 0 {
+		lastErr = s.flowMgr.Listen(ctx, inaming.Network, listenSpec.Proxy)
+		if lastErr != nil {
+			s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, listenSpec.Proxy, lastErr)
+		}
+	}
 	for _, addr := range listenSpec.Addrs {
 		if len(addr.Address) > 0 {
 			lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
-			s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
+			if lastErr != nil {
+				s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
+			}
 		}
 	}
 
diff --git a/services/device/deviced/internal/starter/starter.go b/services/device/deviced/internal/starter/starter.go
index 3ad0a74..0849d6a 100644
--- a/services/device/deviced/internal/starter/starter.go
+++ b/services/device/deviced/internal/starter/starter.go
@@ -173,16 +173,30 @@
 	}
 	var epName string
 	if args.Device.ListenSpec.Proxy != "" {
-		for {
-			p := server.Status().Proxies
-			if len(p) == 0 {
+		if os.Getenv("V23_RPC_TRANSITION_STATE") == "xservers" {
+			for {
+				eps := server.Status().Endpoints
+				if len(eps) > 0 && len(eps[0].Addr().Network()) > 0 {
+					epName = eps[0].Name()
+					ctx.Infof("Proxied address: %s", epName)
+					break
+				}
 				ctx.Infof("Waiting for proxy address to appear...")
 				time.Sleep(time.Second)
-				continue
 			}
-			epName = p[0].Endpoint.Name()
-			ctx.Infof("Proxied address: %s", epName)
-			break
+		} else {
+			// TODO(suharshs): Remove this else block once the transition is complete.
+			for {
+				p := server.Status().Proxies
+				if len(p) == 0 {
+					ctx.Infof("Waiting for proxy address to appear...")
+					time.Sleep(time.Second)
+					continue
+				}
+				epName = p[0].Endpoint.Name()
+				ctx.Infof("Proxied address: %s", epName)
+				break
+			}
 		}
 	} else {
 		if len(endpoints) == 0 {
diff --git a/services/device/dmrun/backend/backend.go b/services/device/dmrun/backend/backend.go
new file mode 100644
index 0000000..da8ca3e
--- /dev/null
+++ b/services/device/dmrun/backend/backend.go
@@ -0,0 +1,37 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package backend
+
+import "fmt"
+
+type CloudVM interface {
+	// Name of the VM instance that the object talks to
+	Name() string
+
+	// IP address (as a string) of the VM instance
+	IP() string
+
+	// Execute a command on the VM instance
+	RunCommand(...string) (output []byte, err error)
+
+	// Copy a file to the VM instance
+	CopyFile(infile, destination string) error
+
+	// Delete the VM instance
+	Delete() error
+
+	// Provide the command that the user can use to delete a VM instance for which Delete()
+	// was not called
+	DeleteCommandForUser() string
+}
+
+func CreateCloudVM(instanceName string, options interface{}) (CloudVM, error) {
+	switch options.(type) {
+	default:
+		return nil, fmt.Errorf("Unknown options type")
+	case VcloudVMOptions:
+		return newVcloudVM(instanceName, options.(VcloudVMOptions))
+	}
+}
diff --git a/services/device/dmrun/backend/backend_vcloud.go b/services/device/dmrun/backend/backend_vcloud.go
new file mode 100644
index 0000000..38a68b4
--- /dev/null
+++ b/services/device/dmrun/backend/backend_vcloud.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package backend
+
+import (
+	"fmt"
+	"net"
+	"os/exec"
+	"strings"
+)
+
+type VcloudVM struct {
+	vcloud              string // path to vcloud command
+	sshUser             string // ssh into the VM as this user
+	projectArg, zoneArg string // common flags used with the vcloud command
+	name, ip            string
+	isDeleted           bool
+}
+
+type VcloudVMOptions struct {
+	VcloudBinary string // path to the "vcloud" command
+}
+
+func newVcloudVM(instanceName string, opt VcloudVMOptions) (vm *VcloudVM, err error) {
+	// TODO: Make sshUser, zone, and project configurable
+	g := &VcloudVM{
+		vcloud:     opt.VcloudBinary,
+		sshUser:    "veyron",
+		projectArg: "--project=google.com:veyron",
+		zoneArg:    "--zone=us-central1-c",
+		isDeleted:  false,
+	}
+
+	cmd := exec.Command(g.vcloud, "node", "create", g.projectArg, g.zoneArg, instanceName)
+	if output, err := cmd.CombinedOutput(); err != nil {
+		return nil, fmt.Errorf("setting up new GCE instance (%v) failed. Error: (%v) Output:\n%v", strings.Join(cmd.Args, " "), err, string(output))
+	}
+
+	cmd = exec.Command(g.vcloud, "list", g.projectArg, "--noheader", "--fields=EXTERNAL_IP", instanceName)
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		return nil, fmt.Errorf("listing instances (%v) failed. Error: (%v) Output:\n%v", strings.Join(cmd.Args, " "), err, string(output))
+	}
+	tmpIP := strings.TrimSpace(string(output))
+	if net.ParseIP(tmpIP) == nil {
+		return nil, fmt.Errorf("IP of new instance is not a valid IP address: %v", tmpIP)
+	}
+	g.ip = tmpIP
+	g.name = instanceName
+	return g, nil
+}
+
+func (g *VcloudVM) Delete() error {
+	if g.isDeleted {
+		return fmt.Errorf("trying to delete a deleted VcloudVM")
+	}
+
+	cmd := exec.Command(g.vcloud, "node", "delete", g.projectArg, g.zoneArg, g.name)
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		err = fmt.Errorf("failed deleting GCE instance (%s): %v\nOutput:%v\n", strings.Join(cmd.Args, " "), err, string(output))
+	} else {
+		g.isDeleted = true
+		g.name = ""
+		g.ip = ""
+	}
+	return err
+}
+
+func (g *VcloudVM) Name() string {
+	return g.name
+}
+
+func (g *VcloudVM) IP() string {
+	return g.ip
+}
+
+func (g *VcloudVM) RunCommand(args ...string) ([]byte, error) {
+	if g.isDeleted {
+		return nil, fmt.Errorf("RunCommand called on deleted VcloudVM")
+	}
+
+	cmd := exec.Command(g.vcloud, append([]string{"sh", g.projectArg, g.name}, args...)...)
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		err = fmt.Errorf("failed running [%s] on VM %s", strings.Join(args, " "), g.name)
+	}
+	return output, err
+}
+
+func (g *VcloudVM) CopyFile(infile, destination string) error {
+	if g.isDeleted {
+		return fmt.Errorf("CopyFile called on deleted VcloudVM")
+	}
+
+	cmd := exec.Command("gcloud", "compute", g.projectArg, "copy-files", infile, fmt.Sprintf("%s@%s:/%s", g.sshUser, g.Name(), destination), g.zoneArg)
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		err = fmt.Errorf("failed copying %s to %s:%s - %v\nOutput:\n%v", infile, g.name, destination, err, string(output))
+	}
+	return err
+}
+
+func (g *VcloudVM) DeleteCommandForUser() string {
+	if g.isDeleted {
+		return ""
+	}
+
+	// We can't return the vcloud binary that we ran for the steps above, as that one is deleted
+	// after use. For now, we assume the user will have a vcloud binary on his path to use.
+	return strings.Join([]string{"vcloud", "node", "delete", g.projectArg, g.zoneArg, g.name}, " ")
+}
diff --git a/services/device/dmrun/dmrun.go b/services/device/dmrun/dmrun.go
index d816542..d722b58 100644
--- a/services/device/dmrun/dmrun.go
+++ b/services/device/dmrun/dmrun.go
@@ -2,9 +2,9 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Command dmrun runs a binary on a remote GCE instance using device manager.
+// Command dmrun runs a binary on a remote VM instance using device manager.
 //
-// dmrun creates the GCE instance, installs and starts device manager on it, and
+// dmrun creates the VM instance, installs and starts device manager on it, and
 // then installs and starts an app from the specified binary.
 //
 // dmrun uses the credentials it is running with in order to claim the device
@@ -35,12 +35,14 @@
 	"time"
 
 	"v.io/x/ref"
+	"v.io/x/ref/services/device/dmrun/backend"
 )
 
 var (
 	workDir        string
 	vcloud         string
 	device         string
+	vm             backend.CloudVM
 	cleanupOnDeath func()
 )
 
@@ -148,39 +150,28 @@
 	return zipFile
 }
 
-// setupInstance creates a new GCE instance and returns its name and IP address.
-func setupInstance() (string, string) {
+// setupInstance creates a new VM instance and returns its name and IP address.
+func setupInstance(vmOptions interface{}) (backend.CloudVM, string, string) {
 	currUser, err := user.Current()
 	dieIfErr(err, "Couldn't obtain current user")
 	instanceName := fmt.Sprintf("%s-%s", currUser.Username, time.Now().UTC().Format("20060102-150405"))
-	// TODO(caprita): Allow project and zone to be customized.
-	cmd := exec.Command(vcloud, "node", "create", "--project=google.com:veyron", "--zone=us-central1-c", instanceName)
-	output, err := cmd.CombinedOutput()
-	dieIfErr(err, "Setting up new GCE instance (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	cmd = exec.Command(vcloud, "list", "--project=google.com:veyron", "--noheader", "--fields=EXTERNAL_IP", instanceName)
-	output, err = cmd.CombinedOutput()
-	dieIfErr(err, "Listing instances (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	instanceIP := strings.TrimSpace(string(output))
-	if net.ParseIP(instanceIP) == nil {
-		die("Not a valid IP address: %v", instanceIP)
-	}
+	vm, err = backend.CreateCloudVM(instanceName, vmOptions)
+	dieIfErr(err, "VM Instance Creation Failed: %v", err)
+	instanceIP := vm.IP()
 	// Install unzip so we can unpack the archive.
 	// TODO(caprita): Use tar instead.
-	cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instanceName, "sudo", "apt-get", "install", "unzip")
-	output, err = cmd.CombinedOutput()
-	dieIfErr(err, "Installing unzip (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	fmt.Println("Created GCE instance", instanceName, "with IP", instanceIP)
-	return instanceName, instanceIP
+	output, err := vm.RunCommand("sudo", "apt-get", "install", "unzip")
+	dieIfErr(err, "Installing unzip failed. Output:\n%v", string(output))
+	fmt.Println("Created VM instance", instanceName, "with IP", instanceIP)
+	return vm, instanceName, instanceIP
 }
 
-// installArchive ships the archive to the GCE instance and unpacks it.
+// installArchive ships the archive to the VM instance and unpacks it.
 func installArchive(archive, instance string) {
-	cmd := exec.Command("gcloud", "compute", "--project=google.com:veyron", "copy-files", archive, fmt.Sprintf("veyron@%s:/tmp/", instance), "--zone=us-central1-c")
-	output, err := cmd.CombinedOutput()
-	dieIfErr(err, "Copying archive (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "unzip", path.Join("/tmp", filepath.Base(archive)), "-d", "/tmp/unpacked")
-	output, err = cmd.CombinedOutput()
-	dieIfErr(err, "Extracting archive (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+	err := vm.CopyFile(archive, "/tmp/")
+	dieIfErr(err, "Copying archive failed: %v", err)
+	output, err := vm.RunCommand("unzip", path.Join("/tmp", filepath.Base(archive)), "-d", "/tmp/unpacked")
+	dieIfErr(err, "Extracting archive failed. Output:\n%v", string(output))
 }
 
 // installDevice installs and starts device manager, and returns the public key
@@ -188,12 +179,10 @@
 func installDevice(instance string) (string, string) {
 	fmt.Println("Installing device manager...")
 	defer fmt.Println("Done installing device manager...")
-	cmd := exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "install", "/tmp/unpacked", "--single_user", "--", "--v23.tcp.address=:8151", "--deviced-port=8150", "--proxy-port=8160", "--use-pairing-token")
-	output, err := cmd.CombinedOutput()
-	dieIfErr(err, "Installing device manager (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
-	cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "start")
-	output, err = cmd.CombinedOutput()
-	dieIfErr(err, "Starting device manager (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+	output, err := vm.RunCommand("V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "install", "/tmp/unpacked", "--single_user", "--", "--v23.tcp.address=:8151", "--deviced-port=8150", "--proxy-port=8160", "--use-pairing-token")
+	dieIfErr(err, "Installing device manager failed. Output:\n%v", string(output))
+	output, err = vm.RunCommand("V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "start")
+	dieIfErr(err, "Starting device manager failed. Output:\n%v", string(output))
 	// Grab the token and public key from the device manager log.
 	dieAfter := time.After(5 * time.Second)
 	firstIteration := true
@@ -207,9 +196,8 @@
 		} else {
 			firstIteration = false
 		}
-		cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "cat", "/tmp/dm/dmroot/device-manager/logs/deviced.INFO")
-		output, err = cmd.CombinedOutput()
-		dieIfErr(err, "Reading device manager log (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+		output, err = vm.RunCommand("cat", "/tmp/dm/dmroot/device-manager/logs/deviced.INFO")
+		dieIfErr(err, "Reading device manager log failed. Output:\n%v", string(output))
 		pairingTokenRE := regexp.MustCompile("Device manager pairing token: (.*)")
 		matches := pairingTokenRE.FindSubmatch(output)
 		if matches == nil {
@@ -301,21 +289,21 @@
 	device = buildV23Binary(deviceBin)
 	dmBins := buildDMBinaries()
 	archive := createArchive(append(dmBins, getPath(devicexRepo, devicex)))
-	gceInstanceName, gceInstanceIP := setupInstance()
+	vmOpts := backend.VcloudVMOptions{VcloudBinary: vcloud}
+	vm, vmInstanceName, vmInstanceIP := setupInstance(vmOpts)
 	cleanupOnDeath = func() {
-		fmt.Fprintf(os.Stderr, "Deleting GCE instance ...\n")
-		cmd := exec.Command(vcloud, "node", "delete", "--project=google.com:veyron", "--zone=us-central1-c", gceInstanceName)
-		output, err := cmd.CombinedOutput()
+		fmt.Fprintf(os.Stderr, "Deleting VM instance ...\n")
+		err := vm.Delete()
 		fmt.Fprintf(os.Stderr, "Removing tmp files ...\n")
 		os.RemoveAll(workDir)
-		dieIfErr(err, "Deleting GCE instance (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+		dieIfErr(err, "Deleting VM instance failed")
 	}
-	installArchive(archive, gceInstanceName)
-	publicKey, pairingToken := installDevice(gceInstanceName)
-	deviceAddr := net.JoinHostPort(gceInstanceIP, "8150")
+	installArchive(archive, vmInstanceName)
+	publicKey, pairingToken := installDevice(vmInstanceName)
+	deviceAddr := net.JoinHostPort(vmInstanceIP, "8150")
 	deviceName := "/" + deviceAddr
-	claimDevice(deviceName, gceInstanceIP, publicKey, pairingToken, gceInstanceName)
-	installationName := installApp(deviceName, gceInstanceIP)
+	claimDevice(deviceName, vmInstanceIP, publicKey, pairingToken, vmInstanceName)
+	installationName := installApp(deviceName, vmInstanceIP)
 	instanceName := startApp(installationName, "app")
 	fmt.Println("Launched app.")
 	fmt.Println("-------------")
@@ -325,6 +313,6 @@
 	fmt.Printf("\t${V23_ROOT}/release/go/bin/debug glob %s/logs/*\n", instanceName)
 	fmt.Println("Dump e.g. the INFO log:")
 	fmt.Printf("\t${V23_ROOT}/release/go/bin/debug logs read %s/logs/app.INFO\n", instanceName)
-	fmt.Println("Clean up by deleting the GCE instance:")
-	fmt.Printf("\t${V23_ROOT}/release/go/bin/vcloud node delete --project=google.com:veyron --zone=us-central1-c %s\n", gceInstanceName)
+	fmt.Println("Clean up by deleting the VM instance:")
+	fmt.Printf("\t%s\n", vm.DeleteCommandForUser())
 }
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 65a7f32..849e8ec 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -58,6 +58,9 @@
         CurVers     string      // current version number of the object.
         Parents     []string    // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime     time.Time   // timestamp when the update is generated.
+	PermId      string      // id of the permissions object controlling this version.
+	PermVers    string      // current version of the permissions object.
+	Shell       bool        // true when the mutation data is hidden due to permissions.
 	Delete      bool        // indicates whether the update resulted in object being deleted from the store.
 	BatchId     uint64      // unique id of the Batch this update belongs to.
 	BatchCount  uint64      // number of objects in the Batch.
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index ca29abc..a00591c 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -56,6 +56,9 @@
 	CurVers    string    // current version number of the object.
 	Parents    []string  // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime    time.Time // timestamp when the update is generated.
+	PermId     string    // id of the permissions object controlling this version.
+	PermVers   string    // current version of the permissions object.
+	Shell      bool      // true when the mutation data is hidden due to permissions.
 	Delete     bool      // indicates whether the update resulted in object being deleted from the store.
 	BatchId    uint64    // unique id of the Batch this update belongs to.
 	BatchCount uint64    // number of objects in the Batch.
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 3f5181b..984a8df 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -17,15 +17,23 @@
 
 // PutOp represents a store put operation.  The new version is written instead
 // of the value to avoid duplicating the user data in the store.  The version
-// is used to access the user data of that specific mutation.
+// is used to access the user data of that specific mutation.  The key and the
+// version of the permissions entry that was checked to allow this put operation
+// are also tracked to secure the access to this history.
 type PutOp struct {
-	Key     []byte
-	Version []byte
+	Key         []byte
+	Version     []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
-// DeleteOp represents a store delete operation.
+// DeleteOp represents a store delete operation.  The key and the version of the
+// permissions entry that was checked to allow this delete operation are also
+// tracked to secure the access to this history.
 type DeleteOp struct {
-	Key []byte
+	Key         []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
 // SyncGroupOp represents a change in SyncGroup tracking, adding or removing
@@ -43,9 +51,13 @@
 // allows sync to initialize its metadata at the correct versions of the objects
 // when they become syncable.  These log entries should be filtered by the
 // client-facing Watch interface because the user data did not actually change.
+// The key and the version of the permissions entry that was checked when the
+// key was accessed are also tracked to secure the access to this history.
 type SyncSnapshotOp struct {
-	Key     []byte
-	Version []byte
+	Key         []byte
+	Version     []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
 // Op represents a store operation.
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index 71c4b90..f4114fa 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -35,10 +35,14 @@
 
 // PutOp represents a store put operation.  The new version is written instead
 // of the value to avoid duplicating the user data in the store.  The version
-// is used to access the user data of that specific mutation.
+// is used to access the user data of that specific mutation.  The key and the
+// version of the permissions entry that was checked to allow this put operation
+// are also tracked to secure the access to this history.
 type PutOp struct {
-	Key     []byte
-	Version []byte
+	Key         []byte
+	Version     []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
 func (PutOp) __VDLReflect(struct {
@@ -46,9 +50,13 @@
 }) {
 }
 
-// DeleteOp represents a store delete operation.
+// DeleteOp represents a store delete operation.  The key and the version of the
+// permissions entry that was checked to allow this delete operation are also
+// tracked to secure the access to this history.
 type DeleteOp struct {
-	Key []byte
+	Key         []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
 func (DeleteOp) __VDLReflect(struct {
@@ -76,9 +84,13 @@
 // allows sync to initialize its metadata at the correct versions of the objects
 // when they become syncable.  These log entries should be filtered by the
 // client-facing Watch interface because the user data did not actually change.
+// The key and the version of the permissions entry that was checked when the
+// key was accessed are also tracked to secure the access to this history.
 type SyncSnapshotOp struct {
-	Key     []byte
-	Version []byte
+	Key         []byte
+	Version     []byte
+	PermKey     []byte
+	PermVersion []byte
 }
 
 func (SyncSnapshotOp) __VDLReflect(struct {
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 086296c..2caa50d 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -110,31 +110,9 @@
 	NoBatchId = uint64(0)
 )
 
-// dagNode holds the information on a object mutation in the DAG.
-// Note: the batch ID and deleted flag are copies of information in the log
-// record.  They are also stored in the DAG node to improve DAG traversal for
-// conflict resolution and pruning without having to fetch the full log record
-// every time.
-type dagNode struct {
-	Level   uint64   // node distance from root
-	Parents []string // references to parent versions
-	Logrec  string   // reference to log record
-	BatchId uint64   // ID of a write batch
-	Deleted bool     // true if the change was a delete
-}
-
 // batchSet holds information on a set of write batches.
 type batchSet map[uint64]*batchInfo
 
-// batchInfo holds the information on a write batch:
-// - The map of syncable (versioned) objects: {oid: version}
-// - The total count of batch objects, including non-syncable ones.
-// TODO(rdaoud): add support to track the read and scan sets.
-type batchInfo struct {
-	Objects map[string]string
-	Count   uint64
-}
-
 // graftMap holds the state of DAG node grafting (attaching) per object.
 type graftMap map[string]*graftInfo
 
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index a0c8731..820dec6 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -84,3 +84,27 @@
 	SyncPending   bool
 	PendingGenVec interfaces.PrefixGenVector
 }
+
+// dagNode holds the information on an object mutation in the DAG.  The node
+// information is extracted from the log records exchanged between Syncbases.
+// They are also stored in the DAG node to improve DAG traversal for conflict
+// resolution and pruning without having to fetch the full log record.
+type dagNode struct {
+	Level    uint64   // node distance from root
+	Parents  []string // references to parent versions
+	Logrec   string   // reference to log record
+	BatchId  uint64   // ID of a write batch
+	Shell    bool     // true when the data is hidden due to permissions
+	Deleted  bool     // true if the change was a delete
+	PermId   string   // ID of the permissions controlling this version
+	PermVers string   // current version of the permissions object
+}
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+	Objects map[string]string
+	Count   uint64
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index 34f425f..e352de4 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -104,12 +104,48 @@
 }) {
 }
 
+// dagNode holds the information on an object mutation in the DAG.  The node
+// information is extracted from the log records exchanged between Syncbases.
+// They are also stored in the DAG node to improve DAG traversal for conflict
+// resolution and pruning without having to fetch the full log record.
+type dagNode struct {
+	Level    uint64   // node distance from root
+	Parents  []string // references to parent versions
+	Logrec   string   // reference to log record
+	BatchId  uint64   // ID of a write batch
+	Shell    bool     // true when the data is hidden due to permissions
+	Deleted  bool     // true if the change was a delete
+	PermId   string   // ID of the permissions controlling this version
+	PermVers string   // current version of the permissions object
+}
+
+func (dagNode) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/vsync.dagNode"`
+}) {
+}
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+	Objects map[string]string
+	Count   uint64
+}
+
+func (batchInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/vsync.batchInfo"`
+}) {
+}
+
 func init() {
 	vdl.Register((*syncData)(nil))
 	vdl.Register((*localGenInfo)(nil))
 	vdl.Register((*dbSyncState)(nil))
 	vdl.Register((*localLogRec)(nil))
 	vdl.Register((*sgLocalState)(nil))
+	vdl.Register((*dagNode)(nil))
+	vdl.Register((*batchInfo)(nil))
 }
 
 const logPrefix = "log" // log state.
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index ebeb262..bb85a2b 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -6,7 +6,9 @@
 
 import (
 	"bufio"
+	"fmt"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -22,10 +24,15 @@
 	"v.io/v23/security"
 )
 
-const leakWaitTime = 100 * time.Millisecond
+const (
+	leakWaitTime = 100 * time.Millisecond
+	pollTime     = 50 * time.Millisecond
+)
 
-func TestProxiedConnection(t *testing.T) {
+func TestSingleProxy(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
+	kp := newKillProtocol()
+	flow.RegisterProtocol("kill", kp)
 	pctx, shutdown := v23.Init()
 	defer shutdown()
 	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
@@ -37,16 +44,23 @@
 		t.Fatal(err)
 	}
 
-	pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+	pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
 
 	if err := am.Listen(actx, "v23", pep.String()); err != nil {
 		t.Fatal(err)
 	}
-	testEndToEndConnections(t, dctx, actx, dm, am)
+
+	for am.ListeningEndpoints()[0].Addr().Network() == "" {
+		time.Sleep(pollTime)
+	}
+
+	testEndToEndConnections(t, dctx, actx, dm, am, kp)
 }
 
 func TestMultipleProxies(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
+	kp := newKillProtocol()
+	flow.RegisterProtocol("kill", kp)
 	pctx, shutdown := v23.Init()
 	defer shutdown()
 	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
@@ -58,59 +72,79 @@
 		t.Fatal(err)
 	}
 
-	pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+	pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
 
-	p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+	p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
 
-	p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+	p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
 
 	if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
 		t.Fatal(err)
 	}
-	testEndToEndConnections(t, dctx, actx, dm, am)
+
+	// Wait for am.Listen to get 3 endpoints.
+	for len(am.ListeningEndpoints()) != 3 {
+		time.Sleep(pollTime)
+	}
+
+	testEndToEndConnections(t, dctx, actx, dm, am, kp)
 }
 
-func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager) {
+func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, kp *killProtocol) {
 	aeps := am.ListeningEndpoints()
 	if len(aeps) == 0 {
 		t.Fatal("acceptor not listening on any endpoints")
 	}
 	for _, aep := range aeps {
-		testEndToEndConnection(t, dctx, actx, dm, am, aep)
+		// Kill the connections, connections should still eventually succeed.
+		kp.KillConnections()
+		for {
+			if err := testEndToEndConnection(t, dctx, actx, dm, am, aep); err != nil {
+				t.Log(err)
+				time.Sleep(pollTime)
+				continue
+			}
+			break
+		}
 	}
 }
 
-func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) {
+func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
 	// The dialing flow.Manager dials a flow to the accepting flow.Manager.
 	want := "Do you read me?"
 	df, err := dm.Dial(dctx, aep, bfp)
 	if err != nil {
-		t.Fatal(err)
+		return err
 	}
 	// We write before accepting to ensure that the openFlow message is sent.
-	writeLine(df, want)
+	if err := writeLine(df, want); err != nil {
+		return err
+	}
 	af, err := am.Accept(actx)
 	if err != nil {
-		t.Fatal(err)
+		return err
 	}
 	got, err := readLine(af)
 	if err != nil {
-		t.Fatal(err)
+		return err
 	}
 	if got != want {
-		t.Errorf("got %v, want %v", got, want)
+		return fmt.Errorf("got %v, want %v", got, want)
 	}
 
 	// Writes in the opposite direction should work as well.
 	want = "I read you loud and clear."
-	writeLine(af, want)
+	if err := writeLine(af, want); err != nil {
+		return err
+	}
 	got, err = readLine(df)
 	if err != nil {
-		t.Fatal(err)
+		return err
 	}
 	if got != want {
-		t.Errorf("got %v, want %v", got, want)
+		return fmt.Errorf("got %v, want %v", got, want)
 	}
+	return nil
 }
 
 // TODO(suharshs): Add test for bidirectional RPC.
@@ -141,20 +175,69 @@
 
 func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
 	var ls rpc.ListenSpec
+	hasProxies := false
 	for _, addr := range addrs {
 		ls.Addrs = append(ls.Addrs, addr)
+		if addr.Protocol == "v23" {
+			hasProxies = true
+		}
 	}
 	ctx = v23.WithListenSpec(ctx, ls)
 	proxy, _, err := xproxyd.New(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
+	// Wait for the proxy to connect to its proxies.
+	if hasProxies {
+		for len(proxy.MultipleProxyEndpoints()) == 0 {
+			time.Sleep(pollTime)
+		}
+	}
 	peps := proxy.ListeningEndpoints()
 	for _, pep := range peps {
-		if pep.Addr().Network() == "tcp" {
+		if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" {
 			return pep
 		}
 	}
 	t.Fatal("Proxy not listening on network address.")
 	return nil
 }
+
+type killProtocol struct {
+	protocol flow.Protocol
+	mu       sync.Mutex
+	conns    []flow.Conn
+}
+
+func newKillProtocol() *killProtocol {
+	p, _ := flow.RegisteredProtocol("tcp")
+	return &killProtocol{protocol: p}
+}
+
+func (p *killProtocol) KillConnections() {
+	p.mu.Lock()
+	for _, c := range p.conns {
+		c.Close()
+	}
+	p.conns = nil
+	p.mu.Unlock()
+}
+
+func (p *killProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
+	c, err := p.protocol.Dial(ctx, "tcp", address, timeout)
+	if err != nil {
+		return nil, err
+	}
+	p.mu.Lock()
+	p.conns = append(p.conns, c)
+	p.mu.Unlock()
+	return c, nil
+}
+
+func (p *killProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
+	return p.protocol.Listen(ctx, "tcp", address)
+}
+
+func (p *killProtocol) Resolve(ctx *context.T, protocol, address string) (string, string, error) {
+	return p.protocol.Resolve(ctx, "tcp", address)
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index d05a561..85d66f7 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,9 +5,9 @@
 package xproxyd
 
 import (
-	"fmt"
 	"io"
 	"sync"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -16,12 +16,12 @@
 	"v.io/v23/naming"
 )
 
-// TODO(suharshs): Make sure that we don't leak any goroutines.
+const reconnectDelay = 50 * time.Millisecond
 
 type proxy struct {
 	m              flow.Manager
 	mu             sync.Mutex
-	proxyEndpoints []naming.Endpoint
+	proxyEndpoints map[string][]naming.Endpoint // keyed by proxy address
 }
 
 func New(ctx *context.T) (*proxy, *context.T, error) {
@@ -30,7 +30,8 @@
 		return nil, nil, err
 	}
 	p := &proxy{
-		m: mgr,
+		m:              mgr,
+		proxyEndpoints: make(map[string][]naming.Endpoint),
 	}
 	for _, addr := range v23.GetListenSpec(ctx).Addrs {
 		if addr.Protocol == "v23" {
@@ -38,25 +39,7 @@
 			if err != nil {
 				return nil, nil, err
 			}
-			f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
-			if err != nil {
-				return nil, nil, err
-			}
-			// Send a byte telling the acceptor that we are a proxy.
-			if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
-				return nil, nil, err
-			}
-			msg, err := readMessage(ctx, f)
-			if err != nil {
-				return nil, nil, err
-			}
-			m, ok := msg.(*message.ProxyResponse)
-			if !ok {
-				return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-			}
-			p.mu.Lock()
-			p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
-			p.mu.Unlock()
+			go p.connectToProxy(ctx, addr.Address, ep)
 		} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
 			return nil, nil, err
 		}
@@ -69,6 +52,16 @@
 	return p.m.ListeningEndpoints()
 }
 
+func (p *proxy) MultipleProxyEndpoints() []naming.Endpoint {
+	var eps []naming.Endpoint
+	p.mu.Lock()
+	for _, v := range p.proxyEndpoints {
+		eps = append(eps, v...)
+	}
+	p.mu.Unlock()
+	return eps
+}
+
 func (p *proxy) listenLoop(ctx *context.T) {
 	for {
 		f, err := p.m.Accept(ctx)
@@ -99,6 +92,7 @@
 func (p *proxy) startRouting(ctx *context.T, f flow.Flow, m *message.Setup) error {
 	fout, err := p.dialNextHop(ctx, f, m)
 	if err != nil {
+		f.Close()
 		return err
 	}
 	go p.forwardLoop(ctx, f, fout)
@@ -108,10 +102,9 @@
 
 func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
 	for {
-		_, err := io.Copy(fin, fout)
-		if err == io.EOF {
-			return
-		} else if err != nil {
+		if _, err := io.Copy(fin, fout); err != nil {
+			fin.Close()
+			fout.Close()
 			ctx.Errorf("f.Read failed: %v", err)
 			return
 		}
@@ -124,7 +117,10 @@
 		ep  naming.Endpoint
 		err error
 	)
-	if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
+	if ep, err = removeNetworkAddress(m.PeerRemoteEndpoint); err != nil {
+		return nil, err
+	}
+	if routes := ep.Routes(); len(routes) > 0 {
 		if err := rid.FromString(routes[0]); err != nil {
 			return nil, err
 		}
@@ -133,15 +129,13 @@
 		// TODO(suharshs): Make sure that the routingID from the route belongs to a
 		// connection that is stored in the manager's cache. (i.e. a Server has connected
 		// with the routingID before)
-		if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
+		if ep, err = setEndpointRoutingID(ep, rid); err != nil {
 			return nil, err
 		}
 		// Remove the read route from the setup message endpoint.
 		if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
 			return nil, err
 		}
-	} else {
-		ep = m.PeerRemoteEndpoint
 	}
 	fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
 	if err != nil {
@@ -175,7 +169,10 @@
 
 func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
 	p.mu.Lock()
-	eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
+	eps := p.m.ListeningEndpoints()
+	for _, peps := range p.proxyEndpoints {
+		eps = append(eps, peps...)
+	}
 	p.mu.Unlock()
 	if len(eps) == 0 {
 		return nil, NewErrNotListening(ctx)
@@ -201,3 +198,41 @@
 	}
 	return eps, nil
 }
+
+func (p *proxy) connectToProxy(ctx *context.T, address string, ep naming.Endpoint) {
+	for delay := reconnectDelay; ; delay *= 2 {
+		time.Sleep(delay - reconnectDelay)
+		select {
+		case <-ctx.Done():
+			return
+		default:
+		}
+		f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+		if err != nil {
+			ctx.Error(err)
+			continue
+		}
+		// Send a byte telling the acceptor that we are a proxy.
+		if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
+			ctx.Error(err)
+			continue
+		}
+		eps, err := readProxyResponse(ctx, f)
+		if err != nil {
+			ctx.Error(err)
+			continue
+		}
+		p.mu.Lock()
+		p.proxyEndpoints[address] = eps
+		p.mu.Unlock()
+		select {
+		case <-ctx.Done():
+			return
+		case <-f.Closed():
+			p.mu.Lock()
+			delete(p.proxyEndpoints, address)
+			p.mu.Unlock()
+			delay = reconnectDelay
+		}
+	}
+}
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
index 4ccad31..e14e23e 100644
--- a/services/xproxyd/util.go
+++ b/services/xproxyd/util.go
@@ -5,6 +5,8 @@
 package xproxyd
 
 import (
+	"fmt"
+
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -13,6 +15,16 @@
 	"v.io/v23/security"
 )
 
+func removeNetworkAddress(ep naming.Endpoint) (naming.Endpoint, error) {
+	_, _, routes, rid, bnames, mountable := getEndpointParts(ep)
+	opts := routes
+	opts = append(opts, bnames...)
+	opts = append(opts, rid)
+	opts = append(opts, mountable)
+	epString := naming.FormatEndpoint("", "", opts...)
+	return v23.NewEndpoint(epString)
+}
+
 // setEndpointRoutingID returns a copy of ep with RoutingId changed to rid.
 func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
 	network, address, routes, _, bnames, mountable := getEndpointParts(ep)
@@ -87,3 +99,15 @@
 	}
 	return message.Read(ctx, b)
 }
+
+func readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
+	msg, err := readMessage(ctx, f)
+	if err != nil {
+		return nil, err
+	}
+	res, ok := msg.(*message.ProxyResponse)
+	if !ok {
+		return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", msg))
+	}
+	return res.Endpoints, nil
+}