Merge "syncbase: switch internal key sep to \xfe, allow ":" in keys"
diff --git a/lib/discovery/util/advertise_test.go b/lib/discovery/util/advertise_test.go
index 66be0f3..b3ff757 100644
--- a/lib/discovery/util/advertise_test.go
+++ b/lib/discovery/util/advertise_test.go
@@ -119,12 +119,16 @@
defer shutdown()
mock := newMockServer(newEndpoints("addr1:123"))
-
util.AdvertiseServer(ctx, mock, "", discovery.Service{InterfaceName: "v.io/v23/a"}, nil)
- service, err := scan(ctx)
+
+ // Scan the advertised service.
+ service, err := scan(ctx, 3*time.Second)
if err != nil {
t.Fatal(err)
}
+ if len(service.InstanceUuid) == 0 {
+ t.Fatal("couldn't scan")
+ }
// Make sure the instance uuid has not been changed.
eps := newEndpoints("addr2:123")
@@ -145,7 +149,7 @@
var found discovery.Service
for now := time.Now(); time.Since(now) < timeout; {
var err error
- found, err = scan(ctx)
+ found, err = scan(ctx, 5*time.Millisecond)
if err != nil {
return err
}
@@ -156,7 +160,7 @@
return fmt.Errorf("match failed; got %v, but wanted %v", found, want)
}
-func scan(ctx *context.T) (discovery.Service, error) {
+func scan(ctx *context.T, timeout time.Duration) (discovery.Service, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -169,7 +173,7 @@
select {
case update := <-scan:
return update.Interface().(discovery.Found).Service, nil
- case <-time.After(5 * time.Millisecond):
+ case <-time.After(timeout):
return discovery.Service{}, nil
}
}
diff --git a/lib/security/audit/principal_test.go b/lib/security/audit/principal_test.go
index 75148b6..48124f4 100644
--- a/lib/security/audit/principal_test.go
+++ b/lib/security/audit/principal_test.go
@@ -25,7 +25,7 @@
)
func TestAuditingPrincipal(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
var (
thirdPartyCaveat, discharge = newThirdPartyCaveatAndDischarge(t)
@@ -129,7 +129,7 @@
}
func TestUnauditedMethodsOnPrincipal(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
var (
auditor = new(mockAuditor)
@@ -199,9 +199,9 @@
return d, p.NextError
}
-func (p *mockPrincipal) PublicKey() security.PublicKey { return p.NextResult.(security.PublicKey) }
-func (p *mockPrincipal) Roots() security.BlessingRoots { return nil }
-func (p *mockPrincipal) BlessingStore() security.BlessingStore { return nil }
+func (p *mockPrincipal) PublicKey() security.PublicKey { return p.NextResult.(security.PublicKey) }
+func (p *mockPrincipal) Roots() security.BlessingRoots { return nil }
+func (p *mockPrincipal) BlessingStore() security.BlessingStore { return nil }
type mockAuditor struct {
LastEntry audit.Entry
diff --git a/runtime/factories/android/android.go b/runtime/factories/android/android.go
new file mode 100644
index 0000000..c254cc2
--- /dev/null
+++ b/runtime/factories/android/android.go
@@ -0,0 +1,187 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build linux darwin
+
+// Package android implements a RuntimeFactory suitable for android. It is
+// based on the roaming package.
+//
+// The pubsub.Publisher mechanism is used for communicating networking
+// settings to the rpc.Server implementation of the runtime and publishes
+// the Settings it expects.
+package android
+
+import (
+ "flag"
+
+ "v.io/x/lib/netconfig"
+ "v.io/x/lib/netstate"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/rpc"
+
+ "v.io/x/ref/internal/logger"
+ dfactory "v.io/x/ref/lib/discovery/factory"
+ "v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/pubsub"
+ "v.io/x/ref/lib/security/securityflag"
+ "v.io/x/ref/runtime/internal"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/ws"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
+ "v.io/x/ref/runtime/internal/lib/appcycle"
+ "v.io/x/ref/runtime/internal/lib/websocket"
+ "v.io/x/ref/runtime/internal/lib/xwebsocket"
+ irpc "v.io/x/ref/runtime/internal/rpc"
+ "v.io/x/ref/runtime/internal/rt"
+ "v.io/x/ref/services/debug/debuglib"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
+ _ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
+ _ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
+ _ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
+)
+
+const (
+ SettingsStreamName = "roaming"
+ SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
+)
+
+var commonFlags *flags.Flags
+
+func init() {
+ v23.RegisterRuntimeFactory(Init)
+ rpc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridResolve, websocket.HybridListener)
+ flow.RegisterUnknownProtocol("wsh", xwebsocket.WSH{})
+ commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
+}
+
+func Init(ctx *context.T) (v23.Runtime, *context.T, v23.Shutdown, error) {
+ if err := internal.ParseFlagsAndConfigureGlobalLogger(commonFlags); err != nil {
+ return nil, nil, nil, err
+ }
+
+ ac := appcycle.New()
+ discovery, err := dfactory.New()
+ if err != nil {
+ ac.Shutdown()
+ return nil, nil, nil, err
+ }
+
+ lf := commonFlags.ListenFlags()
+ listenSpec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs(lf.Addrs),
+ Proxy: lf.Proxy,
+ AddressChooser: internal.NewAddressChooser(logger.Global()),
+ }
+ reservedDispatcher := debuglib.NewDispatcher(securityflag.NewAuthorizerOrDie())
+
+ ishutdown := func() {
+ ac.Shutdown()
+ discovery.Close()
+ }
+
+ publisher := pubsub.NewPublisher()
+
+ // Create stream in Init function to avoid a race between any
+ // goroutines started here and consumers started after Init returns.
+ ch := make(chan pubsub.Setting)
+ // TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
+ stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+ if err != nil {
+ ishutdown()
+ return nil, nil, nil, err
+ }
+
+ prev, err := netstate.GetAccessibleIPs()
+ if err != nil {
+ ishutdown()
+ return nil, nil, nil, err
+ }
+
+ // Start the dhcp watcher.
+ watcher, err := netconfig.NewNetConfigWatcher()
+ if err != nil {
+ ishutdown()
+ return nil, nil, nil, err
+ }
+
+ cleanupCh := make(chan struct{})
+ watcherCh := make(chan struct{})
+
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
+ if err != nil {
+ ishutdown()
+ return nil, nil, nil, err
+ }
+
+ go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
+ runtimeFactoryShutdown := func() {
+ close(cleanupCh)
+ ishutdown()
+ shutdown()
+ <-watcherCh
+ }
+ return runtime, ctx, runtimeFactoryShutdown, nil
+}
+
+// monitorNetworkSettings will monitor network configuration changes and
+// publish subsequent Settings to reflect any changes detected.
+func monitorNetworkSettingsX(
+ runtime *rt.Runtime,
+ ctx *context.T,
+ watcher netconfig.NetConfigWatcher,
+ prev netstate.AddrList,
+ pubStop, cleanup <-chan struct{},
+ watcherLoop chan<- struct{},
+ ch chan<- pubsub.Setting) {
+ defer close(ch)
+
+ listenSpec := runtime.GetListenSpec(ctx)
+
+ // TODO(cnicolaou): add support for listening on multiple network addresses.
+
+done:
+ for {
+ select {
+ case <-watcher.Channel():
+ netstate.InvalidateCache()
+ cur, err := netstate.GetAccessibleIPs()
+ if err != nil {
+ ctx.Errorf("failed to read network state: %s", err)
+ continue
+ }
+ removed := netstate.FindRemoved(prev, cur)
+ added := netstate.FindAdded(prev, cur)
+ ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
+ ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
+ ctx.VI(2).Infof("Added : %d: %s", len(added), added)
+ ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
+ if len(removed) == 0 && len(added) == 0 {
+ ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
+ continue
+ }
+ if len(removed) > 0 {
+ ctx.VI(2).Infof("Sending removed: %s", removed)
+ ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
+ }
+ // We will always send the best currently available address
+ if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
+ ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
+ ch <- irpc.NewNewAddrsSetting(chosen)
+ } else {
+ ctx.VI(2).Infof("Ignoring added %s", added)
+ }
+ prev = cur
+ case <-cleanup:
+ break done
+ case <-pubStop:
+ goto done
+ }
+ }
+ watcher.Stop()
+ close(watcherLoop)
+}
diff --git a/runtime/factories/android/proxy.go b/runtime/factories/android/proxy.go
new file mode 100644
index 0000000..3b4f880
--- /dev/null
+++ b/runtime/factories/android/proxy.go
@@ -0,0 +1,23 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package android
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/rpc/stream/proxy"
+)
+
+// NewProxy creates a new Proxy that listens for network connections on the provided
+// (network, address) pair and routes VC traffic between accepted connections.
+//
+// auth encapsulates the authorization policy of the proxy - which
+// servers it is willing to proxy for.
+func NewProxy(ctx *context.T, spec rpc.ListenSpec, auth security.Authorizer, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, spec, auth, names...)
+}
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16ac858..030ce8e 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -85,7 +85,10 @@
// Resolve of the discharge server name. The two resolve calls may be to
// the same mounttable.
c.loopWG.Add(1)
- go c.refreshDischarges(ctx)
+ go func() {
+ c.refreshDischarges(ctx)
+ c.loopWG.Done()
+ }()
return nil
}
@@ -105,7 +108,6 @@
lAuth := &message.Auth{
ChannelBinding: signedBinding,
}
- c.loopWG.Add(1)
if lAuth.BlessingsKey, lAuth.DischargeKey, err = c.refreshDischarges(ctx); err != nil {
return err
}
@@ -211,7 +213,6 @@
}
func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
- defer c.loopWG.Done()
dis := slib.PrepareDischarges(ctx, c.lBlessings,
security.DischargeImpetus{}, time.Minute)
// Schedule the next update.
@@ -221,6 +222,7 @@
c.loopWG.Add(1)
c.dischargeTimer = time.AfterFunc(dur, func() {
c.refreshDischarges(ctx)
+ c.loopWG.Done()
})
}
c.mu.Unlock()
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 80d0034..4806bef 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -340,7 +340,7 @@
}
c.status = Closing
- go func() {
+ go func(c *Conn) {
if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
msg := ""
if err != nil {
@@ -373,7 +373,7 @@
c.status = Closed
close(c.closed)
c.mu.Unlock()
- }()
+ }(c)
}
func (c *Conn) release(ctx *context.T, fid, count uint64) {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index e4127a3..1cad267 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
import (
"io"
+ "sync"
"time"
"v.io/v23/context"
@@ -47,6 +48,9 @@
// borrowed indicates the number of tokens we have borrowed from the shared pool for
// sending on newly dialed flows.
borrowed uint64
+ // borrowCond is a condition variable that we can use to wait for shared
+ // counters to be released.
+ borrowCond *sync.Cond
// borrowing indicates whether this flow is using borrowed counters for a newly
// dialed flow. This will be set to false after we first receive a
// release from the remote end. This is always false for accepted flows.
@@ -60,14 +64,15 @@
func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, remote naming.Endpoint, dialed, preopen bool) *flw {
f := &flw{
- id: id,
- dialed: dialed,
- conn: c,
- q: newReadQ(c, id),
- bkey: bkey,
- dkey: dkey,
- opened: preopen,
- borrowing: dialed,
+ id: id,
+ dialed: dialed,
+ conn: c,
+ q: newReadQ(c, id),
+ bkey: bkey,
+ dkey: dkey,
+ opened: preopen,
+ borrowing: dialed,
+ borrowCond: sync.NewCond(&c.mu),
// It's important that this channel has a non-zero buffer. Sometimes this
// flow will be notifying itself, so if there's no buffer a deadlock will
// occur.
@@ -144,12 +149,16 @@
return int(max), func(used int) {
f.conn.lshared -= uint64(used)
f.borrowed += uint64(used)
+ f.ctx.VI(2).Infof("deducting %d borrowed tokens on flow %d(%p), total: %d", used, f.id, f, f.borrowed)
}
}
if f.released < max {
max = f.released
}
- return int(max), func(used int) { f.released -= uint64(used) }
+ return int(max), func(used int) {
+ f.released -= uint64(used)
+ f.ctx.VI(2).Infof("flow %d(%p) deducting %d tokens, %d left", f.id, f, used, f.released)
+ }
}
// releaseLocked releases some counters from a remote reader to the local
@@ -161,12 +170,16 @@
if f.borrowed < tokens {
n = f.borrowed
}
+ f.ctx.VI(2).Infof("Returning %d tokens borrowed by %d(%p)", f.borrowed, f.id, f)
tokens -= n
f.borrowed -= n
f.conn.lshared += n
+ f.borrowCond.Broadcast()
}
f.released += tokens
+ f.ctx.VI(2).Infof("Tokens release to %d(%p): %d => %d", f.id, f, tokens, f.released)
if f.writing {
+ f.ctx.VI(2).Infof("Activating writing flow %d(%p) now that we have tokens.", f.id, f)
f.conn.activateWriterLocked(f)
f.conn.notifyNextWriterLocked(nil)
}
@@ -176,6 +189,7 @@
if err = f.checkBlessings(); err != nil {
return 0, err
}
+ f.ctx.VI(2).Infof("starting write on flow %d(%p)", f.id, f)
select {
// Catch cancellations early. If we caught a cancel when waiting
// our turn below its possible that we were notified simultaneously.
@@ -212,6 +226,7 @@
if tokens == 0 {
// Oops, we really don't have data to send, probably because we've exhausted
// the remote buffer. deactivate ourselves but keep trying.
+ f.ctx.VI(2).Infof("Deactivating write on flow %d(%p) due to lack of tokens", f.id, f)
f.conn.deactivateWriterLocked(f)
continue
}
@@ -250,6 +265,7 @@
f.opened = true
}
f.writing = false
+ f.ctx.VI(2).Infof("finishing write on %d(%p): %v", f.id, f, err)
f.conn.deactivateWriterLocked(f)
f.conn.notifyNextWriterLocked(f)
f.conn.mu.Unlock()
@@ -394,8 +410,19 @@
}
func (f *flw) close(ctx *context.T, err error) {
+ closedRemotely := verror.ErrorID(err) == ErrFlowClosedRemotely.ID
+ f.conn.mu.Lock()
+ if closedRemotely {
+ // When the other side closes a flow, it implicitly releases all the
+ // counters used by that flow. That means we should release the shared
+ // counter to be used on other new flows.
+ f.conn.lshared += f.borrowed
+ f.borrowed = 0
+ }
+ f.borrowCond.Broadcast()
+ f.conn.mu.Unlock()
if f.q.close(ctx) {
- eid := verror.ErrorID(err)
+ f.ctx.VI(2).Infof("closing %d(%p): %v", f.id, f, err)
f.cancel()
// After cancel has been called no new writes will begin for this
// flow. There may be a write in progress, but it must finish
@@ -403,13 +430,12 @@
// can simply use sendMessageLocked to send the close flow
// message.
f.conn.mu.Lock()
- delete(f.conn.flows, f.id)
connClosing := f.conn.status == Closing
var serr error
if !f.opened {
// Closing a flow that was never opened.
f.conn.unopenedFlows.Done()
- } else if eid != ErrFlowClosedRemotely.ID && !connClosing {
+ } else if !closedRemotely && !connClosing {
// Note: If the conn is closing there is no point in trying to
// send the flow close message as it will fail. This is racy
// with the connection closing, but there are no ill-effects
@@ -419,6 +445,18 @@
Flags: message.CloseFlag,
})
}
+ if f.borrowed > 0 && f.conn.status < Closing {
+ f.conn.loopWG.Add(1)
+ go func() {
+ defer f.conn.loopWG.Done()
+ f.conn.mu.Lock()
+ for f.borrowed > 0 && f.conn.status < Closing {
+ f.borrowCond.Wait()
+ }
+ delete(f.conn.flows, f.id)
+ f.conn.mu.Unlock()
+ }()
+ }
f.conn.mu.Unlock()
if serr != nil {
ctx.Errorf("Could not send close flow message: %v", err)
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index ff1f83c..186a40e 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -18,9 +18,11 @@
"v.io/x/ref/runtime/internal/flow/flowtest"
_ "v.io/x/ref/runtime/internal/flow/protocols/local"
inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/test/goroutines"
)
func TestCache(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
@@ -31,7 +33,9 @@
RID: naming.FixedRoutingID(0x5555),
Blessings: []string{"A", "B", "C"},
}
- conn := makeConnAndFlow(t, ctx, remote).c
+ caf := makeConnAndFlow(t, ctx, remote)
+ defer caf.stop(ctx)
+ conn := caf.c
if err := c.Insert(conn, remote.Protocol, remote.Address); err != nil {
t.Fatal(err)
}
@@ -72,7 +76,9 @@
RID: naming.FixedRoutingID(0x1111),
Blessings: []string{"ridonly"},
}
- ridConn := makeConnAndFlow(t, ctx, ridEP).c
+ caf = makeConnAndFlow(t, ctx, ridEP)
+ defer caf.stop(ctx)
+ ridConn := caf.c
if err := c.InsertWithRoutingID(ridConn); err != nil {
t.Fatal(err)
}
@@ -90,7 +96,9 @@
RID: naming.FixedRoutingID(0x2222),
Blessings: []string{"other"},
}
- otherConn := makeConnAndFlow(t, ctx, otherEP).c
+ caf = makeConnAndFlow(t, ctx, otherEP)
+ defer caf.stop(ctx)
+ otherConn := caf.c
// Looking up a not yet inserted endpoint should fail.
if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -117,7 +125,9 @@
}
// Insert a duplicate conn to ensure that replaced conns still get closed.
- dupConn := makeConnAndFlow(t, ctx, remote).c
+ caf = makeConnAndFlow(t, ctx, remote)
+ defer caf.stop(ctx)
+ dupConn := caf.c
if err := c.Insert(dupConn, remote.Protocol, remote.Address); err != nil {
t.Fatal(err)
}
@@ -136,17 +146,20 @@
c.Close(ctx)
// Now the connections should be closed.
<-conn.Closed()
+ <-ridConn.Closed()
<-dupConn.Closed()
<-otherConn.Closed()
}
func TestLRU(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
// Ensure that the least recently created conns are killed by KillConnections.
c := NewConnCache()
- conns := nConnAndFlows(t, ctx, 10)
+ conns, stop := nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -178,7 +191,8 @@
// Ensure that writing to conns marks conns as more recently used.
c = NewConnCache()
- conns = nConnAndFlows(t, ctx, 10)
+ conns, stop = nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -213,7 +227,8 @@
// Ensure that reading from conns marks conns as more recently used.
c = NewConnCache()
- conns = nConnAndFlows(t, ctx, 10)
+ conns, stop = nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -267,6 +282,7 @@
type connAndFlow struct {
c *connpackage.Conn
+ a *connpackage.Conn
f flow.Flow
}
@@ -284,7 +300,12 @@
}
}
-func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+func (c connAndFlow) stop(ctx *context.T) {
+ c.c.Close(ctx, nil)
+ c.a.Close(ctx, nil)
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) ([]connAndFlow, func()) {
cfs := make([]connAndFlow, n)
for i := 0; i < n; i++ {
cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
@@ -292,7 +313,11 @@
RID: naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
})
}
- return cfs
+ return cfs, func() {
+ for _, conn := range cfs {
+ conn.stop(ctx)
+ }
+ }
}
func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
@@ -318,7 +343,7 @@
ach <- a
}()
conn := <-dch
- <-ach
+ aconn := <-ach
f, err := conn.Dial(ctx, flowtest.AllowAllPeersAuthorizer{}, nil)
if err != nil {
t.Fatal(err)
@@ -328,7 +353,7 @@
t.Fatal(err)
}
<-fh.ch
- return connAndFlow{conn, f}
+ return connAndFlow{conn, aconn, f}
}
type fh struct {
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index a010f18..5e3a32a 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -607,9 +607,10 @@
dst.Process.queue.Put(m)
}
case *message.HealthCheckResponse:
- // Note that the proxy never sends health check requests, so responses
- // should always be forwarded.
- if dst := p.Route(m.VCI); dst != nil {
+ if svc := p.ServerVC(m.VCI); svc != nil {
+ // If the request is for the proxy, pass it to the VC.
+ svc.HandleHealthCheckResponse()
+ } else if dst := p.Route(m.VCI); dst != nil {
m.VCI = dst.VCI
dst.Process.queue.Put(m)
}
diff --git a/runtime/internal/rpc/stream/vc/vc_cache.go b/runtime/internal/rpc/stream/vc/vc_cache.go
index d962cfa..d78098c 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache.go
@@ -5,6 +5,7 @@
package vc
import (
+ "strings"
"sync"
"v.io/v23/naming"
@@ -17,17 +18,19 @@
// VCCache implements a set of VIFs keyed by the endpoint of the remote end and the
// local principal. Multiple goroutines can invoke methods on the VCCache simultaneously.
type VCCache struct {
- mu sync.Mutex
- cache map[vcKey]*VC // GUARDED_BY(mu)
- started map[vcKey]bool // GUARDED_BY(mu)
- cond *sync.Cond
+ mu sync.Mutex
+ cache map[vcKey]*VC // GUARDED_BY(mu)
+ ridCache map[ridKey]*VC // GUARDED_BY(mu)
+ started map[vcKey]bool // GUARDED_BY(mu)
+ cond *sync.Cond
}
// NewVCCache returns a new cache for VCs.
func NewVCCache() *VCCache {
c := &VCCache{
- cache: make(map[vcKey]*VC),
- started: make(map[vcKey]bool),
+ cache: make(map[vcKey]*VC),
+ ridCache: make(map[ridKey]*VC),
+ started: make(map[vcKey]bool),
}
c.cond = sync.NewCond(&c.mu)
return c
@@ -53,6 +56,9 @@
return nil, verror.New(errVCCacheClosed, nil)
}
c.started[k] = true
+ if vc, ok := c.ridCache[c.ridkey(ep, p)]; ok {
+ return vc, nil
+ }
return c.cache[k], nil
}
@@ -72,7 +78,11 @@
if c.cache == nil {
return verror.New(errVCCacheClosed, nil)
}
- c.cache[c.key(vc.RemoteEndpoint(), vc.LocalPrincipal())] = vc
+ ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+ c.cache[c.key(ep, principal)] = vc
+ if ep.RoutingID() != naming.NullRoutingID {
+ c.ridCache[c.ridkey(ep, principal)] = vc
+ }
return nil
}
@@ -85,6 +95,7 @@
}
c.cache = nil
c.started = nil
+ c.ridCache = nil
c.mu.Unlock()
return vcs
}
@@ -96,10 +107,18 @@
if c.cache == nil {
return verror.New(errVCCacheClosed, nil)
}
- delete(c.cache, c.key(vc.RemoteEndpoint(), vc.LocalPrincipal()))
+ ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+ delete(c.cache, c.key(ep, principal))
+ delete(c.ridCache, c.ridkey(ep, principal))
return nil
}
+type ridKey struct {
+ rid naming.RoutingID
+ localPublicKey string
+ blessingNames string
+}
+
type vcKey struct {
remoteEP string
localPublicKey string // localPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
@@ -112,3 +131,12 @@
}
return k
}
+
+func (c *VCCache) ridkey(ep naming.Endpoint, p security.Principal) ridKey {
+ k := ridKey{rid: ep.RoutingID()}
+ if p != nil {
+ k.localPublicKey = p.PublicKey().String()
+ k.blessingNames = strings.Join(ep.BlessingNames(), ",")
+ }
+ return k
+}
diff --git a/runtime/internal/rpc/stream/vc/vc_cache_test.go b/runtime/internal/rpc/stream/vc/vc_cache_test.go
index b096e21..1d85ff9 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache_test.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache_test.go
@@ -7,6 +7,7 @@
import (
"testing"
+ "v.io/v23/naming"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/test/testutil"
)
@@ -101,6 +102,24 @@
if cachedVC := <-ch; cachedVC != otherVC {
t.Errorf("got %v, want %v", cachedVC, otherVC)
}
+
+ // If we add an endpoint with a non-zero routingId and search for another
+ // endpoint with the same routingID, we should get the first routingID.
+ ridep, err := inaming.NewEndpoint("oink:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ ridep.RID = naming.FixedRoutingID(0x1111)
+ vc = &VC{remoteEP: ridep, localPrincipal: p}
+ cache.Insert(vc)
+ otherEP, err = inaming.NewEndpoint("moo:7777")
+ if err != nil {
+ t.Fatal(err)
+ }
+ otherEP.RID = ridep.RID
+ if got, err := cache.ReservedFind(otherEP, p); err != nil || got != vc {
+ t.Errorf("got %v, want %v, err: %v", got, vc, err)
+ }
}
func vcsEqual(a, b []*VC) bool {
diff --git a/runtime/internal/rpc/stream/vif/set_test.go b/runtime/internal/rpc/stream/vif/set_test.go
index b79d8d5..f634c64 100644
--- a/runtime/internal/rpc/stream/vif/set_test.go
+++ b/runtime/internal/rpc/stream/vif/set_test.go
@@ -111,7 +111,7 @@
}
func TestSetBasic(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
sockdir, err := ioutil.TempDir("", "TestSetBasic")
if err != nil {
@@ -184,7 +184,7 @@
}
func TestSetWithPipes(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
c1, s1 := net.Pipe()
c2, s2 := net.Pipe()
@@ -233,7 +233,7 @@
}
func TestSetWithUnixSocket(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
dir, err := ioutil.TempDir("", "TestSetWithUnixSocket")
if err != nil {
@@ -296,7 +296,7 @@
}
func TestSetInsertDelete(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
c1, s1 := net.Pipe()
vf1, _, err := newVIF(ctx, c1, s1)
@@ -319,7 +319,7 @@
}
func TestBlockingFind(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
network, address := "tcp", "127.0.0.1:1234"
set := vif.NewSet()
diff --git a/runtime/internal/rpc/stream/vif/vif_test.go b/runtime/internal/rpc/stream/vif/vif_test.go
index a15b081..40114c0 100644
--- a/runtime/internal/rpc/stream/vif/vif_test.go
+++ b/runtime/internal/rpc/stream/vif/vif_test.go
@@ -37,7 +37,7 @@
//go:generate jiri test generate
func TestSingleFlowCreatedAtClient(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -65,7 +65,7 @@
}
func TestSingleFlowCreatedAtServer(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -96,7 +96,7 @@
func testMultipleVCsAndMultipleFlows(t *testing.T, gomaxprocs int) {
testutil.InitRandGenerator(t.Logf)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
// This test dials multiple VCs from the client to the server.
// On each VC, it creates multiple flows, writes to them and verifies
@@ -255,7 +255,7 @@
}
func TestClose(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -294,7 +294,7 @@
}
func TestOnClose(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -335,7 +335,7 @@
const (
waitTime = 5 * time.Millisecond
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -424,7 +424,7 @@
// connection of the other side to be closed especially in race testing.
waitTime = 150 * time.Millisecond
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -489,7 +489,7 @@
idleTime = 10 * time.Millisecond
waitTime = idleTime * 2
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -607,7 +607,7 @@
func TestIdleTimeoutServer(t *testing.T) { testIdleTimeout(t, true) }
func TestShutdownVCs(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -674,7 +674,7 @@
}
func (tc *versionTestCase) Run(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -734,7 +734,7 @@
}
func TestNetworkFailure(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -768,7 +768,7 @@
}
func TestPreAuthentication(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
diff --git a/runtime/internal/rpc/testutil_test.go b/runtime/internal/rpc/testutil_test.go
index 105638d..1221ff5 100644
--- a/runtime/internal/rpc/testutil_test.go
+++ b/runtime/internal/rpc/testutil_test.go
@@ -79,7 +79,7 @@
}
func initForTest() (*context.T, v23.Shutdown) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
if err != nil {
panic(err)
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
index 97240ab..9173521 100644
--- a/services/wspr/internal/lib/signature_manager_test.go
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -23,7 +23,7 @@
)
func initRuntime(t *testing.T) (*context.T, clientWithTimesCalled, v23.Shutdown) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
initialSig := []signature.Interface{
{
Methods: []signature.Method{
diff --git a/test/init.go b/test/init.go
index ef625eb..e1812e9 100644
--- a/test/init.go
+++ b/test/init.go
@@ -117,10 +117,11 @@
return context.WithLogger(ctx, logger.Global()), cancel
}
-// V23InitEmpty initializes a runtime but with no principal.
-func V23InitAnon() (*context.T, v23.Shutdown) {
+// V23InitSimple is like V23Init, except that it does not setup a
+// mounttable.
+func V23InitSimple() (*context.T, v23.Shutdown) {
return initWithParams(initParams{
- CreatePrincipal: false,
+ CreatePrincipal: true,
CreateMounttable: false,
})
}