Merge "proxy: Properly handle health check responses to VCs that terminate at the proxy."
diff --git a/lib/discovery/util/advertise_test.go b/lib/discovery/util/advertise_test.go
index 66be0f3..b3ff757 100644
--- a/lib/discovery/util/advertise_test.go
+++ b/lib/discovery/util/advertise_test.go
@@ -119,12 +119,16 @@
defer shutdown()
mock := newMockServer(newEndpoints("addr1:123"))
-
util.AdvertiseServer(ctx, mock, "", discovery.Service{InterfaceName: "v.io/v23/a"}, nil)
- service, err := scan(ctx)
+
+ // Scan the advertised service.
+ service, err := scan(ctx, 3*time.Second)
if err != nil {
t.Fatal(err)
}
+ if len(service.InstanceUuid) == 0 {
+ t.Fatal("couldn't scan")
+ }
// Make sure the instance uuid has not been changed.
eps := newEndpoints("addr2:123")
@@ -145,7 +149,7 @@
var found discovery.Service
for now := time.Now(); time.Since(now) < timeout; {
var err error
- found, err = scan(ctx)
+ found, err = scan(ctx, 5*time.Millisecond)
if err != nil {
return err
}
@@ -156,7 +160,7 @@
return fmt.Errorf("match failed; got %v, but wanted %v", found, want)
}
-func scan(ctx *context.T) (discovery.Service, error) {
+func scan(ctx *context.T, timeout time.Duration) (discovery.Service, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -169,7 +173,7 @@
select {
case update := <-scan:
return update.Interface().(discovery.Found).Service, nil
- case <-time.After(5 * time.Millisecond):
+ case <-time.After(timeout):
return discovery.Service{}, nil
}
}
diff --git a/lib/security/audit/principal_test.go b/lib/security/audit/principal_test.go
index 75148b6..48124f4 100644
--- a/lib/security/audit/principal_test.go
+++ b/lib/security/audit/principal_test.go
@@ -25,7 +25,7 @@
)
func TestAuditingPrincipal(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
var (
thirdPartyCaveat, discharge = newThirdPartyCaveatAndDischarge(t)
@@ -129,7 +129,7 @@
}
func TestUnauditedMethodsOnPrincipal(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
var (
auditor = new(mockAuditor)
@@ -199,9 +199,9 @@
return d, p.NextError
}
-func (p *mockPrincipal) PublicKey() security.PublicKey { return p.NextResult.(security.PublicKey) }
-func (p *mockPrincipal) Roots() security.BlessingRoots { return nil }
-func (p *mockPrincipal) BlessingStore() security.BlessingStore { return nil }
+func (p *mockPrincipal) PublicKey() security.PublicKey { return p.NextResult.(security.PublicKey) }
+func (p *mockPrincipal) Roots() security.BlessingRoots { return nil }
+func (p *mockPrincipal) BlessingStore() security.BlessingStore { return nil }
type mockAuditor struct {
LastEntry audit.Entry
diff --git a/runtime/internal/rpc/stream/vc/vc_cache.go b/runtime/internal/rpc/stream/vc/vc_cache.go
index d962cfa..d78098c 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache.go
@@ -5,6 +5,7 @@
package vc
import (
+ "strings"
"sync"
"v.io/v23/naming"
@@ -17,17 +18,19 @@
// VCCache implements a set of VIFs keyed by the endpoint of the remote end and the
// local principal. Multiple goroutines can invoke methods on the VCCache simultaneously.
type VCCache struct {
- mu sync.Mutex
- cache map[vcKey]*VC // GUARDED_BY(mu)
- started map[vcKey]bool // GUARDED_BY(mu)
- cond *sync.Cond
+ mu sync.Mutex
+ cache map[vcKey]*VC // GUARDED_BY(mu)
+ ridCache map[ridKey]*VC // GUARDED_BY(mu)
+ started map[vcKey]bool // GUARDED_BY(mu)
+ cond *sync.Cond
}
// NewVCCache returns a new cache for VCs.
func NewVCCache() *VCCache {
c := &VCCache{
- cache: make(map[vcKey]*VC),
- started: make(map[vcKey]bool),
+ cache: make(map[vcKey]*VC),
+ ridCache: make(map[ridKey]*VC),
+ started: make(map[vcKey]bool),
}
c.cond = sync.NewCond(&c.mu)
return c
@@ -53,6 +56,9 @@
return nil, verror.New(errVCCacheClosed, nil)
}
c.started[k] = true
+ if vc, ok := c.ridCache[c.ridkey(ep, p)]; ok {
+ return vc, nil
+ }
return c.cache[k], nil
}
@@ -72,7 +78,11 @@
if c.cache == nil {
return verror.New(errVCCacheClosed, nil)
}
- c.cache[c.key(vc.RemoteEndpoint(), vc.LocalPrincipal())] = vc
+ ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+ c.cache[c.key(ep, principal)] = vc
+ if ep.RoutingID() != naming.NullRoutingID {
+ c.ridCache[c.ridkey(ep, principal)] = vc
+ }
return nil
}
@@ -85,6 +95,7 @@
}
c.cache = nil
c.started = nil
+ c.ridCache = nil
c.mu.Unlock()
return vcs
}
@@ -96,10 +107,18 @@
if c.cache == nil {
return verror.New(errVCCacheClosed, nil)
}
- delete(c.cache, c.key(vc.RemoteEndpoint(), vc.LocalPrincipal()))
+ ep, principal := vc.RemoteEndpoint(), vc.LocalPrincipal()
+ delete(c.cache, c.key(ep, principal))
+ delete(c.ridCache, c.ridkey(ep, principal))
return nil
}
+type ridKey struct {
+ rid naming.RoutingID
+ localPublicKey string
+ blessingNames string
+}
+
type vcKey struct {
remoteEP string
localPublicKey string // localPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
@@ -112,3 +131,12 @@
}
return k
}
+
+func (c *VCCache) ridkey(ep naming.Endpoint, p security.Principal) ridKey {
+ k := ridKey{rid: ep.RoutingID()}
+ if p != nil {
+ k.localPublicKey = p.PublicKey().String()
+ k.blessingNames = strings.Join(ep.BlessingNames(), ",")
+ }
+ return k
+}
diff --git a/runtime/internal/rpc/stream/vc/vc_cache_test.go b/runtime/internal/rpc/stream/vc/vc_cache_test.go
index b096e21..1d85ff9 100644
--- a/runtime/internal/rpc/stream/vc/vc_cache_test.go
+++ b/runtime/internal/rpc/stream/vc/vc_cache_test.go
@@ -7,6 +7,7 @@
import (
"testing"
+ "v.io/v23/naming"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/test/testutil"
)
@@ -101,6 +102,24 @@
if cachedVC := <-ch; cachedVC != otherVC {
t.Errorf("got %v, want %v", cachedVC, otherVC)
}
+
+ // If we add an endpoint with a non-zero routingId and search for another
+ // endpoint with the same routingID, we should get the first routingID.
+ ridep, err := inaming.NewEndpoint("oink:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ ridep.RID = naming.FixedRoutingID(0x1111)
+ vc = &VC{remoteEP: ridep, localPrincipal: p}
+ cache.Insert(vc)
+ otherEP, err = inaming.NewEndpoint("moo:7777")
+ if err != nil {
+ t.Fatal(err)
+ }
+ otherEP.RID = ridep.RID
+ if got, err := cache.ReservedFind(otherEP, p); err != nil || got != vc {
+ t.Errorf("got %v, want %v, err: %v", got, vc, err)
+ }
}
func vcsEqual(a, b []*VC) bool {
diff --git a/runtime/internal/rpc/stream/vif/set_test.go b/runtime/internal/rpc/stream/vif/set_test.go
index b79d8d5..f634c64 100644
--- a/runtime/internal/rpc/stream/vif/set_test.go
+++ b/runtime/internal/rpc/stream/vif/set_test.go
@@ -111,7 +111,7 @@
}
func TestSetBasic(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
sockdir, err := ioutil.TempDir("", "TestSetBasic")
if err != nil {
@@ -184,7 +184,7 @@
}
func TestSetWithPipes(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
c1, s1 := net.Pipe()
c2, s2 := net.Pipe()
@@ -233,7 +233,7 @@
}
func TestSetWithUnixSocket(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
dir, err := ioutil.TempDir("", "TestSetWithUnixSocket")
if err != nil {
@@ -296,7 +296,7 @@
}
func TestSetInsertDelete(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
c1, s1 := net.Pipe()
vf1, _, err := newVIF(ctx, c1, s1)
@@ -319,7 +319,7 @@
}
func TestBlockingFind(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
network, address := "tcp", "127.0.0.1:1234"
set := vif.NewSet()
diff --git a/runtime/internal/rpc/stream/vif/vif_test.go b/runtime/internal/rpc/stream/vif/vif_test.go
index a15b081..40114c0 100644
--- a/runtime/internal/rpc/stream/vif/vif_test.go
+++ b/runtime/internal/rpc/stream/vif/vif_test.go
@@ -37,7 +37,7 @@
//go:generate jiri test generate
func TestSingleFlowCreatedAtClient(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -65,7 +65,7 @@
}
func TestSingleFlowCreatedAtServer(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -96,7 +96,7 @@
func testMultipleVCsAndMultipleFlows(t *testing.T, gomaxprocs int) {
testutil.InitRandGenerator(t.Logf)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
// This test dials multiple VCs from the client to the server.
// On each VC, it creates multiple flows, writes to them and verifies
@@ -255,7 +255,7 @@
}
func TestClose(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -294,7 +294,7 @@
}
func TestOnClose(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -335,7 +335,7 @@
const (
waitTime = 5 * time.Millisecond
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -424,7 +424,7 @@
// connection of the other side to be closed especially in race testing.
waitTime = 150 * time.Millisecond
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -489,7 +489,7 @@
idleTime = 10 * time.Millisecond
waitTime = idleTime * 2
)
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -607,7 +607,7 @@
func TestIdleTimeoutServer(t *testing.T) { testIdleTimeout(t, true) }
func TestShutdownVCs(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -674,7 +674,7 @@
}
func (tc *versionTestCase) Run(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -734,7 +734,7 @@
}
func TestNetworkFailure(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
@@ -768,7 +768,7 @@
}
func TestPreAuthentication(t *testing.T) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
defer shutdown()
pclient := testutil.NewPrincipal("client")
pserver := testutil.NewPrincipal("server")
diff --git a/runtime/internal/rpc/testutil_test.go b/runtime/internal/rpc/testutil_test.go
index 105638d..1221ff5 100644
--- a/runtime/internal/rpc/testutil_test.go
+++ b/runtime/internal/rpc/testutil_test.go
@@ -79,7 +79,7 @@
}
func initForTest() (*context.T, v23.Shutdown) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
if err != nil {
panic(err)
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
index 97240ab..9173521 100644
--- a/services/wspr/internal/lib/signature_manager_test.go
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -23,7 +23,7 @@
)
func initRuntime(t *testing.T) (*context.T, clientWithTimesCalled, v23.Shutdown) {
- ctx, shutdown := test.V23InitAnon()
+ ctx, shutdown := test.V23InitSimple()
initialSig := []signature.Interface{
{
Methods: []signature.Method{
diff --git a/test/init.go b/test/init.go
index ef625eb..e1812e9 100644
--- a/test/init.go
+++ b/test/init.go
@@ -117,10 +117,11 @@
return context.WithLogger(ctx, logger.Global()), cancel
}
-// V23InitEmpty initializes a runtime but with no principal.
-func V23InitAnon() (*context.T, v23.Shutdown) {
+// V23InitSimple is like V23Init, except that it does not setup a
+// mounttable.
+func V23InitSimple() (*context.T, v23.Shutdown) {
return initWithParams(initParams{
- CreatePrincipal: false,
+ CreatePrincipal: true,
CreateMounttable: false,
})
}