Merge "syncbase: switch internal key sep to \xfe, allow ":" in keys"
diff --git a/lib/discovery/util/advertise_test.go b/lib/discovery/util/advertise_test.go
index 66be0f3..b3ff757 100644
--- a/lib/discovery/util/advertise_test.go
+++ b/lib/discovery/util/advertise_test.go
@@ -119,12 +119,16 @@
 	defer shutdown()
 
 	mock := newMockServer(newEndpoints("addr1:123"))
-
 	util.AdvertiseServer(ctx, mock, "", discovery.Service{InterfaceName: "v.io/v23/a"}, nil)
-	service, err := scan(ctx)
+
+	// Scan the advertised service.
+	service, err := scan(ctx, 3*time.Second)
 	if err != nil {
 		t.Fatal(err)
 	}
+	if len(service.InstanceUuid) == 0 {
+		t.Fatal("couldn't scan")
+	}
 
 	// Make sure the instance uuid has not been changed.
 	eps := newEndpoints("addr2:123")
@@ -145,7 +149,7 @@
 	var found discovery.Service
 	for now := time.Now(); time.Since(now) < timeout; {
 		var err error
-		found, err = scan(ctx)
+		found, err = scan(ctx, 5*time.Millisecond)
 		if err != nil {
 			return err
 		}
@@ -156,7 +160,7 @@
 	return fmt.Errorf("match failed; got %v, but wanted %v", found, want)
 }
 
-func scan(ctx *context.T) (discovery.Service, error) {
+func scan(ctx *context.T, timeout time.Duration) (discovery.Service, error) {
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
@@ -169,7 +173,7 @@
 	select {
 	case update := <-scan:
 		return update.Interface().(discovery.Found).Service, nil
-	case <-time.After(5 * time.Millisecond):
+	case <-time.After(timeout):
 		return discovery.Service{}, nil
 	}
 }
diff --git a/lib/security/audit/principal_test.go b/lib/security/audit/principal_test.go
index 75148b6..48124f4 100644
--- a/lib/security/audit/principal_test.go
+++ b/lib/security/audit/principal_test.go
@@ -25,7 +25,7 @@
 )
 
 func TestAuditingPrincipal(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	var (
 		thirdPartyCaveat, discharge = newThirdPartyCaveatAndDischarge(t)
@@ -129,7 +129,7 @@
 }
 
 func TestUnauditedMethodsOnPrincipal(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	var (
 		auditor = new(mockAuditor)
@@ -199,9 +199,9 @@
 	return d, p.NextError
 }
 
-func (p *mockPrincipal) PublicKey() security.PublicKey               { return p.NextResult.(security.PublicKey) }
-func (p *mockPrincipal) Roots() security.BlessingRoots               { return nil }
-func (p *mockPrincipal) BlessingStore() security.BlessingStore       { return nil }
+func (p *mockPrincipal) PublicKey() security.PublicKey         { return p.NextResult.(security.PublicKey) }
+func (p *mockPrincipal) Roots() security.BlessingRoots         { return nil }
+func (p *mockPrincipal) BlessingStore() security.BlessingStore { return nil }
 
 type mockAuditor struct {
 	LastEntry audit.Entry
diff --git a/runtime/factories/android/android.go b/runtime/factories/android/android.go
new file mode 100644
index 0000000..c254cc2
--- /dev/null
+++ b/runtime/factories/android/android.go
@@ -0,0 +1,187 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build linux darwin
+
+// Package android implements a RuntimeFactory suitable for android.  It is
+// based on the roaming package.
+//
+// The pubsub.Publisher mechanism is used for communicating networking
+// settings to the rpc.Server implementation of the runtime and publishes
+// the Settings it expects.
+package android
+
+import (
+	"flag"
+
+	"v.io/x/lib/netconfig"
+	"v.io/x/lib/netstate"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/rpc"
+
+	"v.io/x/ref/internal/logger"
+	dfactory "v.io/x/ref/lib/discovery/factory"
+	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/lib/pubsub"
+	"v.io/x/ref/lib/security/securityflag"
+	"v.io/x/ref/runtime/internal"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
+	"v.io/x/ref/runtime/internal/lib/appcycle"
+	"v.io/x/ref/runtime/internal/lib/websocket"
+	"v.io/x/ref/runtime/internal/lib/xwebsocket"
+	irpc "v.io/x/ref/runtime/internal/rpc"
+	"v.io/x/ref/runtime/internal/rt"
+	"v.io/x/ref/services/debug/debuglib"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
+	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
+	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
+	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
+)
+
+const (
+	SettingsStreamName = "roaming"
+	SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
+)
+
+var commonFlags *flags.Flags
+
+func init() {
+	v23.RegisterRuntimeFactory(Init)
+	rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
+	flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
+	commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
+}
+
+func Init(ctx *context.T) (v23.Runtime, *context.T, v23.Shutdown, error) {
+	if err := internal.ParseFlagsAndConfigureGlobalLogger(commonFlags); err != nil {
+		return nil, nil, nil, err
+	}
+
+	ac := appcycle.New()
+	discovery, err := dfactory.New()
+	if err != nil {
+		ac.Shutdown()
+		return nil, nil, nil, err
+	}
+
+	lf := commonFlags.ListenFlags()
+	listenSpec := rpc.ListenSpec{
+		Addrs:          rpc.ListenAddrs(lf.Addrs),
+		Proxy:          lf.Proxy,
+		AddressChooser: internal.NewAddressChooser(logger.Global()),
+	}
+	reservedDispatcher := debuglib.NewDispatcher(securityflag.NewAuthorizerOrDie())
+
+	ishutdown := func() {
+		ac.Shutdown()
+		discovery.Close()
+	}
+
+	publisher := pubsub.NewPublisher()
+
+	// Create stream in Init function to avoid a race between any
+	// goroutines started here and consumers started after Init returns.
+	ch := make(chan pubsub.Setting)
+	// TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
+	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+	if err != nil {
+		ishutdown()
+		return nil, nil, nil, err
+	}
+
+	prev, err := netstate.GetAccessibleIPs()
+	if err != nil {
+		ishutdown()
+		return nil, nil, nil, err
+	}
+
+	// Start the dhcp watcher.
+	watcher, err := netconfig.NewNetConfigWatcher()
+	if err != nil {
+		ishutdown()
+		return nil, nil, nil, err
+	}
+
+	cleanupCh := make(chan struct{})
+	watcherCh := make(chan struct{})
+
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
+	if err != nil {
+		ishutdown()
+		return nil, nil, nil, err
+	}
+
+	go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
+	runtimeFactoryShutdown := func() {
+		close(cleanupCh)
+		ishutdown()
+		shutdown()
+		<-watcherCh
+	}
+	return runtime, ctx, runtimeFactoryShutdown, nil
+}
+
+// monitorNetworkSettings will monitor network configuration changes and
+// publish subsequent Settings to reflect any changes detected.
+func monitorNetworkSettingsX(
+	runtime *rt.Runtime,
+	ctx *context.T,
+	watcher netconfig.NetConfigWatcher,
+	prev netstate.AddrList,
+	pubStop, cleanup <-chan struct{},
+	watcherLoop chan<- struct{},
+	ch chan<- pubsub.Setting) {
+	defer close(ch)
+
+	listenSpec := runtime.GetListenSpec(ctx)
+
+	// TODO(cnicolaou): add support for listening on multiple network addresses.
+
+done:
+	for {
+		select {
+		case <-watcher.Channel():
+			netstate.InvalidateCache()
+			cur, err := netstate.GetAccessibleIPs()
+			if err != nil {
+				ctx.Errorf("failed to read network state: %s", err)
+				continue
+			}
+			removed := netstate.FindRemoved(prev, cur)
+			added := netstate.FindAdded(prev, cur)
+			ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
+			ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
+			ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
+			ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
+			if len(removed) == 0 && len(added) == 0 {
+				ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
+				continue
+			}
+			if len(removed) > 0 {
+				ctx.VI(2).Infof("Sending removed: %s", removed)
+				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
+			}
+			// We will always send the best currently available address
+			if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
+				ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
+				ch <- irpc.NewNewAddrsSetting(chosen)
+			} else {
+				ctx.VI(2).Infof("Ignoring added %s", added)
+			}
+			prev = cur
+		case <-cleanup:
+			break done
+		case <-pubStop:
+			goto done
+		}
+	}
+	watcher.Stop()
+	close(watcherLoop)
+}
diff --git a/runtime/factories/android/proxy.go b/runtime/factories/android/proxy.go
new file mode 100644
index 0000000..3b4f880
--- /dev/null
+++ b/runtime/factories/android/proxy.go
@@ -0,0 +1,23 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package android
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/rpc/stream/proxy"
+)
+
+// NewProxy creates a new Proxy that listens for network connections on the provided
+// (network, address) pair and routes VC traffic between accepted connections.
+//
+// auth encapsulates the authorization policy of the proxy - which
+// servers it is willing to proxy for.
+func NewProxy(ctx *context.T, spec rpc.ListenSpec, auth security.Authorizer, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+	return proxy.New(ctx, spec, auth, names...)
+}
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16ac858..030ce8e 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -85,7 +85,10 @@
 	// Resolve of the discharge server name.  The two resolve calls may be to
 	// the same mounttable.
 	c.loopWG.Add(1)
-	go c.refreshDischarges(ctx)
+	go func() {
+		c.refreshDischarges(ctx)
+		c.loopWG.Done()
+	}()
 	return nil
 }
 
@@ -105,7 +108,6 @@
 	lAuth := &message.Auth{
 		ChannelBinding: signedBinding,
 	}
-	c.loopWG.Add(1)
 	if lAuth.BlessingsKey, lAuth.DischargeKey, err = c.refreshDischarges(ctx); err != nil {
 		return err
 	}
@@ -211,7 +213,6 @@
 }
 
 func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
-	defer c.loopWG.Done()
 	dis := slib.PrepareDischarges(ctx, c.lBlessings,
 		security.DischargeImpetus{}, time.Minute)
 	// Schedule the next update.
@@ -221,6 +222,7 @@
 		c.loopWG.Add(1)
 		c.dischargeTimer = time.AfterFunc(dur, func() {
 			c.refreshDischarges(ctx)
+			c.loopWG.Done()
 		})
 	}
 	c.mu.Unlock()
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 80d0034..4806bef 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -340,7 +340,7 @@
 	}
 	c.status = Closing
 
-	go func() {
+	go func(c *Conn) {
 		if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
 			msg := ""
 			if err != nil {
@@ -373,7 +373,7 @@
 		c.status = Closed
 		close(c.closed)
 		c.mu.Unlock()
-	}()
+	}(c)
 }
 
 func (c *Conn) release(ctx *context.T, fid, count uint64) {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index e4127a3..1cad267 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
 
 import (
 	"io"
+	"sync"
 	"time"
 
 	"v.io/v23/context"
@@ -47,6 +48,9 @@
 	// borrowed indicates the number of tokens we have borrowed from the shared pool for
 	// sending on newly dialed flows.
 	borrowed uint64
+	// borrowCond is a condition variable that we can use to wait for shared
+	// counters to be released.
+	borrowCond *sync.Cond
 	// borrowing indicates whether this flow is using borrowed counters for a newly
 	// dialed flow.  This will be set to false after we first receive a
 	// release from the remote end.  This is always false for accepted flows.
@@ -60,14 +64,15 @@
 
 func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, remote naming.Endpoint, dialed, preopen bool) *flw {
 	f := &flw{
-		id:        id,
-		dialed:    dialed,
-		conn:      c,
-		q:         newReadQ(c, id),
-		bkey:      bkey,
-		dkey:      dkey,
-		opened:    preopen,
-		borrowing: dialed,
+		id:         id,
+		dialed:     dialed,
+		conn:       c,
+		q:          newReadQ(c, id),
+		bkey:       bkey,
+		dkey:       dkey,
+		opened:     preopen,
+		borrowing:  dialed,
+		borrowCond: sync.NewCond(&c.mu),
 		// It's important that this channel has a non-zero buffer.  Sometimes this
 		// flow will be notifying itself, so if there's no buffer a deadlock will
 		// occur.
@@ -144,12 +149,16 @@
 		return int(max), func(used int) {
 			f.conn.lshared -= uint64(used)
 			f.borrowed += uint64(used)
+			f.ctx.VI(2).Infof("deducting %d borrowed tokens on flow %d(%p), total: %d", used, f.id, f, f.borrowed)
 		}
 	}
 	if f.released < max {
 		max = f.released
 	}
-	return int(max), func(used int) { f.released -= uint64(used) }
+	return int(max), func(used int) {
+		f.released -= uint64(used)
+		f.ctx.VI(2).Infof("flow %d(%p) deducting %d tokens, %d left", f.id, f, used, f.released)
+	}
 }
 
 // releaseLocked releases some counters from a remote reader to the local
@@ -161,12 +170,16 @@
 		if f.borrowed < tokens {
 			n = f.borrowed
 		}
+		f.ctx.VI(2).Infof("Returning %d tokens borrowed by %d(%p)", f.borrowed, f.id, f)
 		tokens -= n
 		f.borrowed -= n
 		f.conn.lshared += n
+		f.borrowCond.Broadcast()
 	}
 	f.released += tokens
+	f.ctx.VI(2).Infof("Tokens release to %d(%p): %d => %d", f.id, f, tokens, f.released)
 	if f.writing {
+		f.ctx.VI(2).Infof("Activating writing flow %d(%p) now that we have tokens.", f.id, f)
 		f.conn.activateWriterLocked(f)
 		f.conn.notifyNextWriterLocked(nil)
 	}
@@ -176,6 +189,7 @@
 	if err = f.checkBlessings(); err != nil {
 		return 0, err
 	}
+	f.ctx.VI(2).Infof("starting write on flow %d(%p)", f.id, f)
 	select {
 	// Catch cancellations early.  If we caught a cancel when waiting
 	// our turn below its possible that we were notified simultaneously.
@@ -212,6 +226,7 @@
 		if tokens == 0 {
 			// Oops, we really don't have data to send, probably because we've exhausted
 			// the remote buffer.  deactivate ourselves but keep trying.
+			f.ctx.VI(2).Infof("Deactivating write on flow %d(%p) due to lack of tokens", f.id, f)
 			f.conn.deactivateWriterLocked(f)
 			continue
 		}
@@ -250,6 +265,7 @@
 		f.opened = true
 	}
 	f.writing = false
+	f.ctx.VI(2).Infof("finishing write on %d(%p): %v", f.id, f, err)
 	f.conn.deactivateWriterLocked(f)
 	f.conn.notifyNextWriterLocked(f)
 	f.conn.mu.Unlock()
@@ -394,8 +410,19 @@
 }
 
 func (f *flw) close(ctx *context.T, err error) {
+	closedRemotely := verror.ErrorID(err) == ErrFlowClosedRemotely.ID
+	f.conn.mu.Lock()
+	if closedRemotely {
+		// When the other side closes a flow, it implicitly releases all the
+		// counters used by that flow.  That means we should release the shared
+		// counter to be used on other new flows.
+		f.conn.lshared += f.borrowed
+		f.borrowed = 0
+	}
+	f.borrowCond.Broadcast()
+	f.conn.mu.Unlock()
 	if f.q.close(ctx) {
-		eid := verror.ErrorID(err)
+		f.ctx.VI(2).Infof("closing %d(%p): %v", f.id, f, err)
 		f.cancel()
 		// After cancel has been called no new writes will begin for this
 		// flow.  There may be a write in progress, but it must finish
@@ -403,13 +430,12 @@
 		// can simply use sendMessageLocked to send the close flow
 		// message.
 		f.conn.mu.Lock()
-		delete(f.conn.flows, f.id)
 		connClosing := f.conn.status == Closing
 		var serr error
 		if !f.opened {
 			// Closing a flow that was never opened.
 			f.conn.unopenedFlows.Done()
-		} else if eid != ErrFlowClosedRemotely.ID && !connClosing {
+		} else if !closedRemotely && !connClosing {
 			// Note: If the conn is closing there is no point in trying to
 			// send the flow close message as it will fail.  This is racy
 			// with the connection closing, but there are no ill-effects
@@ -419,6 +445,18 @@
 				Flags: message.CloseFlag,
 			})
 		}
+		if f.borrowed > 0 && f.conn.status < Closing {
+			f.conn.loopWG.Add(1)
+			go func() {
+				defer f.conn.loopWG.Done()
+				f.conn.mu.Lock()
+				for f.borrowed > 0 && f.conn.status < Closing {
+					f.borrowCond.Wait()
+				}
+				delete(f.conn.flows, f.id)
+				f.conn.mu.Unlock()
+			}()
+		}
 		f.conn.mu.Unlock()
 		if serr != nil {
 			ctx.Errorf("Could not send close flow message: %v", err)
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index ff1f83c..186a40e 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -18,9 +18,11 @@
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/local"
 	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/test/goroutines"
 )
 
 func TestCache(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
@@ -31,7 +33,9 @@
 		RID:       naming.FixedRoutingID(0x5555),
 		Blessings: []string{"A", "B", "C"},
 	}
-	conn := makeConnAndFlow(t, ctx, remote).c
+	caf := makeConnAndFlow(t, ctx, remote)
+	defer caf.stop(ctx)
+	conn := caf.c
 	if err := c.Insert(conn, remote.Protocol, remote.Address); err != nil {
 		t.Fatal(err)
 	}
@@ -72,7 +76,9 @@
 		RID:       naming.FixedRoutingID(0x1111),
 		Blessings: []string{"ridonly"},
 	}
-	ridConn := makeConnAndFlow(t, ctx, ridEP).c
+	caf = makeConnAndFlow(t, ctx, ridEP)
+	defer caf.stop(ctx)
+	ridConn := caf.c
 	if err := c.InsertWithRoutingID(ridConn); err != nil {
 		t.Fatal(err)
 	}
@@ -90,7 +96,9 @@
 		RID:       naming.FixedRoutingID(0x2222),
 		Blessings: []string{"other"},
 	}
-	otherConn := makeConnAndFlow(t, ctx, otherEP).c
+	caf = makeConnAndFlow(t, ctx, otherEP)
+	defer caf.stop(ctx)
+	otherConn := caf.c
 
 	// Looking up a not yet inserted endpoint should fail.
 	if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -117,7 +125,9 @@
 	}
 
 	// Insert a duplicate conn to ensure that replaced conns still get closed.
-	dupConn := makeConnAndFlow(t, ctx, remote).c
+	caf = makeConnAndFlow(t, ctx, remote)
+	defer caf.stop(ctx)
+	dupConn := caf.c
 	if err := c.Insert(dupConn, remote.Protocol, remote.Address); err != nil {
 		t.Fatal(err)
 	}
@@ -136,17 +146,20 @@
 	c.Close(ctx)
 	// Now the connections should be closed.
 	<-conn.Closed()
+	<-ridConn.Closed()
 	<-dupConn.Closed()
 	<-otherConn.Closed()
 }
 
 func TestLRU(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
 	// Ensure that the least recently created conns are killed by KillConnections.
 	c := NewConnCache()
-	conns := nConnAndFlows(t, ctx, 10)
+	conns, stop := nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -178,7 +191,8 @@
 
 	// Ensure that writing to conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConnAndFlows(t, ctx, 10)
+	conns, stop = nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -213,7 +227,8 @@
 
 	// Ensure that reading from conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConnAndFlows(t, ctx, 10)
+	conns, stop = nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -267,6 +282,7 @@
 
 type connAndFlow struct {
 	c *connpackage.Conn
+	a *connpackage.Conn
 	f flow.Flow
 }
 
@@ -284,7 +300,12 @@
 	}
 }
 
-func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+func (c connAndFlow) stop(ctx *context.T) {
+	c.c.Close(ctx, nil)
+	c.a.Close(ctx, nil)
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) ([]connAndFlow, func()) {
 	cfs := make([]connAndFlow, n)
 	for i := 0; i < n; i++ {
 		cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
@@ -292,7 +313,11 @@
 			RID:      naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
 		})
 	}
-	return cfs
+	return cfs, func() {
+		for _, conn := range cfs {
+			conn.stop(ctx)
+		}
+	}
 }
 
 func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
@@ -318,7 +343,7 @@
 		ach <- a
 	}()
 	conn := <-dch
-	<-ach
+	aconn := <-ach
 	f, err := conn.Dial(ctx, flowtest.AllowAllPeersAuthorizer{}, nil)
 	if err != nil {
 		t.Fatal(err)
@@ -328,7 +353,7 @@
 		t.Fatal(err)
 	}
 	<-fh.ch
-	return connAndFlow{conn, f}
+	return connAndFlow{conn, aconn, f}
 }
 
 type fh struct {
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index a010f18..5e3a32a 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -607,9 +607,10 @@
 				dst.Process.queue.Put(m)
 			}
 		case *message.HealthCheckResponse:
-			// Note that the proxy never sends health check requests, so responses
-			// should always be forwarded.
-			if dst := p.Route(m.VCI); dst != nil {
+			if svc := p.ServerVC(m.VCI); svc != nil {
+				// If the request is for the proxy, pass it to the VC.
+				svc.HandleHealthCheckResponse()
+			} else if dst := p.Route(m.VCI); dst != nil {
 				m.VCI = dst.VCI
 				dst.Process.queue.Put(m)
 			}
diff --git a/runtime/internal/rpc/stream/vc/vc_cache.go b/runtime/internal/rpc/stream/vc/vc_cache.go
index d962cfa..d78098c 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache.go
@@ -5,6 +5,7 @@
 package vc
 
 import (
+	"strings"
 	"sync"
 
 	"v.io/v23/naming"
@@ -17,17 +18,19 @@
 // VCCache implements a set of VIFs keyed by the endpoint of the remote end and the
 // local principal. Multiple goroutines can invoke methods on the VCCache simultaneously.
 type VCCache struct {
-	mu      sync.Mutex
-	cache   map[vcKey]*VC  // GUARDED_BY(mu)
-	started map[vcKey]bool // GUARDED_BY(mu)
-	cond    *sync.Cond
+	mu       sync.Mutex
+	cache    map[vcKey]*VC  // GUARDED_BY(mu)
+	ridCache map[ridKey]*VC // GUARDED_BY(mu)
+	started  map[vcKey]bool // GUARDED_BY(mu)
+	cond     *sync.Cond
 }
 
 // NewVCCache returns a new cache for VCs.
 func NewVCCache() *VCCache {
 	c := &VCCache{
-		cache:   make(map[vcKey]*VC),
-		started: make(map[vcKey]bool),
+		cache:    make(map[vcKey]*VC),
+		ridCache: make(map[ridKey]*VC),
+		started:  make(map[vcKey]bool),
 	}
 	c.cond = sync.NewCond(&c.mu)
 	return c
@@ -53,6 +56,9 @@
 		return nil, verror.New(errVCCacheClosed, nil)
 	}
 	c.started[k] = true
+	if vc, ok := c.ridCache[c.ridkey(ep, p)]; ok {
+		return vc, nil
+	}
 	return c.cache[k], nil
 }
 
@@ -72,7 +78,11 @@
 	if c.cache == nil {
 		return verror.New(errVCCacheClosed, nil)
 	}
-	c.cache[c.key(vc.RemoteEndpoint(), vc.LocalPrincipal())] = vc
+	ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+	c.cache[c.key(ep, principal)] = vc
+	if ep.RoutingID() != naming.NullRoutingID {
+		c.ridCache[c.ridkey(ep, principal)] = vc
+	}
 	return nil
 }
 
@@ -85,6 +95,7 @@
 	}
 	c.cache = nil
 	c.started = nil
+	c.ridCache = nil
 	c.mu.Unlock()
 	return vcs
 }
@@ -96,10 +107,18 @@
 	if c.cache == nil {
 		return verror.New(errVCCacheClosed, nil)
 	}
-	delete(c.cache, c.key(vc.RemoteEndpoint(), vc.LocalPrincipal()))
+	ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+	delete(c.cache, c.key(ep, principal))
+	delete(c.ridCache, c.ridkey(ep, principal))
 	return nil
 }
 
+type ridKey struct {
+	rid            naming.RoutingID
+	localPublicKey string
+	blessingNames  string
+}
+
 type vcKey struct {
 	remoteEP       string
 	localPublicKey string // localPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
@@ -112,3 +131,12 @@
 	}
 	return k
 }
+
+func (c *VCCache) ridkey(ep naming.Endpoint, p security.Principal) ridKey {
+	k := ridKey{rid: ep.RoutingID()}
+	if p != nil {
+		k.localPublicKey = p.PublicKey().String()
+		k.blessingNames = strings.Join(ep.BlessingNames(), ",")
+	}
+	return k
+}
diff --git a/runtime/internal/rpc/stream/vc/vc_cache_test.go b/runtime/internal/rpc/stream/vc/vc_cache_test.go
index b096e21..1d85ff9 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache_test.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache_test.go
@@ -7,6 +7,7 @@
 import (
 	"testing"
 
+	"v.io/v23/naming"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	"v.io/x/ref/test/testutil"
 )
@@ -101,6 +102,24 @@
 	if cachedVC := <-ch; cachedVC != otherVC {
 		t.Errorf("got %v, want %v", cachedVC, otherVC)
 	}
+
+	// If we add an endpoint with a non-zero routingId and search for another
+	// endpoint with the same routingID, we should get the first routingID.
+	ridep, err := inaming.NewEndpoint("oink:8888")
+	if err != nil {
+		t.Fatal(err)
+	}
+	ridep.RID = naming.FixedRoutingID(0x1111)
+	vc = &VC{remoteEP: ridep, localPrincipal: p}
+	cache.Insert(vc)
+	otherEP, err = inaming.NewEndpoint("moo:7777")
+	if err != nil {
+		t.Fatal(err)
+	}
+	otherEP.RID = ridep.RID
+	if got, err := cache.ReservedFind(otherEP, p); err != nil || got != vc {
+		t.Errorf("got %v, want %v, err: %v", got, vc, err)
+	}
 }
 
 func vcsEqual(a, b []*VC) bool {
diff --git a/runtime/internal/rpc/stream/vif/set_test.go b/runtime/internal/rpc/stream/vif/set_test.go
index b79d8d5..f634c64 100644
--- a/runtime/internal/rpc/stream/vif/set_test.go
+++ b/runtime/internal/rpc/stream/vif/set_test.go
@@ -111,7 +111,7 @@
 }
 
 func TestSetBasic(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	sockdir, err := ioutil.TempDir("", "TestSetBasic")
 	if err != nil {
@@ -184,7 +184,7 @@
 }
 
 func TestSetWithPipes(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	c1, s1 := net.Pipe()
 	c2, s2 := net.Pipe()
@@ -233,7 +233,7 @@
 }
 
 func TestSetWithUnixSocket(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	dir, err := ioutil.TempDir("", "TestSetWithUnixSocket")
 	if err != nil {
@@ -296,7 +296,7 @@
 }
 
 func TestSetInsertDelete(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	c1, s1 := net.Pipe()
 	vf1, _, err := newVIF(ctx, c1, s1)
@@ -319,7 +319,7 @@
 }
 
 func TestBlockingFind(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	network, address := "tcp", "127.0.0.1:1234"
 	set := vif.NewSet()
diff --git a/runtime/internal/rpc/stream/vif/vif_test.go b/runtime/internal/rpc/stream/vif/vif_test.go
index a15b081..40114c0 100644
--- a/runtime/internal/rpc/stream/vif/vif_test.go
+++ b/runtime/internal/rpc/stream/vif/vif_test.go
@@ -37,7 +37,7 @@
 //go:generate jiri test generate
 
 func TestSingleFlowCreatedAtClient(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -65,7 +65,7 @@
 }
 
 func TestSingleFlowCreatedAtServer(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -96,7 +96,7 @@
 
 func testMultipleVCsAndMultipleFlows(t *testing.T, gomaxprocs int) {
 	testutil.InitRandGenerator(t.Logf)
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	// This test dials multiple VCs from the client to the server.
 	// On each VC, it creates multiple flows, writes to them and verifies
@@ -255,7 +255,7 @@
 }
 
 func TestClose(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -294,7 +294,7 @@
 }
 
 func TestOnClose(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -335,7 +335,7 @@
 	const (
 		waitTime = 5 * time.Millisecond
 	)
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -424,7 +424,7 @@
 		// connection of the other side to be closed especially in race testing.
 		waitTime = 150 * time.Millisecond
 	)
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -489,7 +489,7 @@
 		idleTime = 10 * time.Millisecond
 		waitTime = idleTime * 2
 	)
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -607,7 +607,7 @@
 func TestIdleTimeoutServer(t *testing.T) { testIdleTimeout(t, true) }
 
 func TestShutdownVCs(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -674,7 +674,7 @@
 }
 
 func (tc *versionTestCase) Run(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -734,7 +734,7 @@
 }
 
 func TestNetworkFailure(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
@@ -768,7 +768,7 @@
 }
 
 func TestPreAuthentication(t *testing.T) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	defer shutdown()
 	pclient := testutil.NewPrincipal("client")
 	pserver := testutil.NewPrincipal("server")
diff --git a/runtime/internal/rpc/testutil_test.go b/runtime/internal/rpc/testutil_test.go
index 105638d..1221ff5 100644
--- a/runtime/internal/rpc/testutil_test.go
+++ b/runtime/internal/rpc/testutil_test.go
@@ -79,7 +79,7 @@
 }
 
 func initForTest() (*context.T, v23.Shutdown) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
 	if err != nil {
 		panic(err)
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
index 97240ab..9173521 100644
--- a/services/wspr/internal/lib/signature_manager_test.go
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -23,7 +23,7 @@
 )
 
 func initRuntime(t *testing.T) (*context.T, clientWithTimesCalled, v23.Shutdown) {
-	ctx, shutdown := test.V23InitAnon()
+	ctx, shutdown := test.V23InitSimple()
 	initialSig := []signature.Interface{
 		{
 			Methods: []signature.Method{
diff --git a/test/init.go b/test/init.go
index ef625eb..e1812e9 100644
--- a/test/init.go
+++ b/test/init.go
@@ -117,10 +117,11 @@
 	return context.WithLogger(ctx, logger.Global()), cancel
 }
 
-// V23InitEmpty initializes a runtime but with no principal.
-func V23InitAnon() (*context.T, v23.Shutdown) {
+// V23InitSimple is like V23Init, except that it does not setup a
+// mounttable.
+func V23InitSimple() (*context.T, v23.Shutdown) {
 	return initWithParams(initParams{
-		CreatePrincipal:  false,
+		CreatePrincipal:  true,
 		CreateMounttable: false,
 	})
 }