rpc/client: Ensure that concurrent VC creations to the same endpoint use
the cache.
VC equivalent of https://vanadium-review.googlesource.com/#/c/10828/.
Change-Id: Ic822699aef953d10911ff9a197a218ce4881151c
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index 2efc1b3..f00c4ba 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -82,33 +82,19 @@
// TODO(jhahn): Add monitoring the network interface changes.
ipNets []*net.IPNet
- // We support concurrent calls to StartCall and Close, so we must protect the
- // vcMap. Everything else is initialized upon client construction, and safe
- // to use concurrently.
- vcMapMu sync.Mutex
- vcMap map[vcMapKey]*vcInfo
+ vcCache *vc.VCCache
dc vc.DischargeClient
}
var _ rpc.Client = (*client)(nil)
-type vcInfo struct {
- vc stream.VC
- remoteEP naming.Endpoint
-}
-
-type vcMapKey struct {
- endpoint string
- clientPublicKey string // clientPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
-}
-
func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &client{
streamMgr: streamMgr,
ns: ns,
ipNets: ipNetworks(),
- vcMap: make(map[vcMapKey]*vcInfo),
+ vcCache: vc.NewVCCache(),
}
c.dc = InternalNewDischargeClient(nil, c, 0)
for _, opt := range opts {
@@ -125,54 +111,54 @@
}
func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
- c.vcMapMu.Lock()
- defer c.vcMapMu.Unlock()
-
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{Err: err, Options: verror.Print}
}
- if c.vcMap == nil {
+ found, err := c.vcCache.ReservedFind(ep, principal)
+ if err != nil {
return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
}
-
- vcKey := vcMapKey{endpoint: ep.String()}
- if principal != nil {
- vcKey.clientPublicKey = principal.PublicKey().String()
- }
- if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
- if flow, err := vcinfo.vc.Connect(); err == nil {
+ defer c.vcCache.Unreserve(ep, principal)
+ if found != nil {
+ // We are serializing the creation of all flows per VC. This is okay
+ // because if one flow creation is to block, it is likely that all others
+ // for that VC would block as well.
+ if flow, err := found.Connect(); err == nil {
return flow, nil
}
// If the vc fails to establish a new flow, we assume it's
- // broken, remove it from the map, and proceed to establishing
+ // broken, remove it from the cache, and proceed to establishing
// a new vc.
+ //
+ // TODO(suharshs): The decision to redial 1 time when the dialing the vc
+ // in the cache fails is a bit inconsistent with the behavior when a newly
+ // dialed vc.Connect fails. We should revisit this.
+ //
// TODO(caprita): Should we distinguish errors due to vc being
// closed from other errors? If not, should we call vc.Close()
- // before removing the vc from the map?
- delete(c.vcMap, vcKey)
+ // before removing the vc from the cache?
+ if err := c.vcCache.Delete(found); err != nil {
+ return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+ }
}
+
sm := c.streamMgr
- c.vcMapMu.Unlock()
- vc, err := sm.Dial(ep, principal, vcOpts...)
- c.vcMapMu.Lock()
+ v, err := sm.Dial(ep, principal, vcOpts...)
if err != nil {
return nil, suberr(err)
}
- if c.vcMap == nil {
+
+ flow, err := v.Connect()
+ if err != nil {
+ return nil, suberr(err)
+ }
+
+ if err := c.vcCache.Insert(v.(*vc.VC)); err != nil {
sm.ShutdownEndpoint(ep)
return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
}
- if othervc, exists := c.vcMap[vcKey]; exists {
- go vc.Close(nil)
- vc = othervc.vc
- } else {
- c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
- }
- flow, err := vc.Connect()
- if err != nil {
- return nil, suberr(err)
- }
+
return flow, nil
}
@@ -744,12 +730,9 @@
func (c *client) Close() {
defer vlog.LogCall()()
- c.vcMapMu.Lock()
- for _, v := range c.vcMap {
- c.streamMgr.ShutdownEndpoint(v.remoteEP)
+ for _, v := range c.vcCache.Close() {
+ c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
}
- c.vcMap = nil
- c.vcMapMu.Unlock()
}
// flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/profiles/internal/rpc/stream/vc/vc_cache.go b/profiles/internal/rpc/stream/vc/vc_cache.go
new file mode 100644
index 0000000..d962cfa
--- /dev/null
+++ b/profiles/internal/rpc/stream/vc/vc_cache.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+ "sync"
+
+ "v.io/v23/naming"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+)
+
+var errVCCacheClosed = reg(".errVCCacheClosed", "vc cache has been closed")
+
+// VCCache implements a set of VIFs keyed by the endpoint of the remote end and the
+// local principal. Multiple goroutines can invoke methods on the VCCache simultaneously.
+type VCCache struct {
+ mu sync.Mutex
+ cache map[vcKey]*VC // GUARDED_BY(mu)
+ started map[vcKey]bool // GUARDED_BY(mu)
+ cond *sync.Cond
+}
+
+// NewVCCache returns a new cache for VCs.
+func NewVCCache() *VCCache {
+ c := &VCCache{
+ cache: make(map[vcKey]*VC),
+ started: make(map[vcKey]bool),
+ }
+ c.cond = sync.NewCond(&c.mu)
+ return c
+}
+
+// ReservedFind returns a VC where the remote end of the underlying connection
+// is identified by the provided (ep, p.PublicKey). Returns nil if there is no
+// such VC.
+//
+// Iff the cache is closed, ReservedFind will return an error.
+// If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock.
+// The ep, and p.PublicKey in Unreserve must be the same as used in the ReservedFind call.
+// During this time, all new ReservedFind calls for this ep and p will Block until
+// the corresponding Unreserve call is made.
+func (c *VCCache) ReservedFind(ep naming.Endpoint, p security.Principal) (*VC, error) {
+ k := c.key(ep, p)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ for c.started[k] {
+ c.cond.Wait()
+ }
+ if c.cache == nil {
+ return nil, verror.New(errVCCacheClosed, nil)
+ }
+ c.started[k] = true
+ return c.cache[k], nil
+}
+
+// Unreserve marks the status of the ep, p as no longer started, and
+// broadcasts waiting threads.
+func (c *VCCache) Unreserve(ep naming.Endpoint, p security.Principal) {
+ c.mu.Lock()
+ delete(c.started, c.key(ep, p))
+ c.cond.Broadcast()
+ c.mu.Unlock()
+}
+
+// Insert adds vc to the cache and returns an error iff the cache has been closed.
+func (c *VCCache) Insert(vc *VC) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.cache == nil {
+ return verror.New(errVCCacheClosed, nil)
+ }
+ c.cache[c.key(vc.RemoteEndpoint(), vc.LocalPrincipal())] = vc
+ return nil
+}
+
+// Close marks the VCCache as closed and returns the VCs remaining in the cache.
+func (c *VCCache) Close() []*VC {
+ c.mu.Lock()
+ vcs := make([]*VC, 0, len(c.cache))
+ for _, vc := range c.cache {
+ vcs = append(vcs, vc)
+ }
+ c.cache = nil
+ c.started = nil
+ c.mu.Unlock()
+ return vcs
+}
+
+// Delete removes vc from the cache, returning an error iff the cache has been closed.
+func (c *VCCache) Delete(vc *VC) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.cache == nil {
+ return verror.New(errVCCacheClosed, nil)
+ }
+ delete(c.cache, c.key(vc.RemoteEndpoint(), vc.LocalPrincipal()))
+ return nil
+}
+
+type vcKey struct {
+ remoteEP string
+ localPublicKey string // localPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
+}
+
+func (c *VCCache) key(ep naming.Endpoint, p security.Principal) vcKey {
+ k := vcKey{remoteEP: ep.String()}
+ if p != nil {
+ k.localPublicKey = p.PublicKey().String()
+ }
+ return k
+}
diff --git a/profiles/internal/rpc/stream/vc/vc_cache_test.go b/profiles/internal/rpc/stream/vc/vc_cache_test.go
new file mode 100644
index 0000000..8bfa144
--- /dev/null
+++ b/profiles/internal/rpc/stream/vc/vc_cache_test.go
@@ -0,0 +1,123 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+ "testing"
+
+ inaming "v.io/x/ref/profiles/internal/naming"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestInsertDelete(t *testing.T) {
+ cache := NewVCCache()
+ ep, err := inaming.NewEndpoint("foo:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ p := testutil.NewPrincipal("test")
+ vc := &VC{remoteEP: ep, localPrincipal: p}
+ otherEP, err := inaming.NewEndpoint("foo:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ otherP := testutil.NewPrincipal("test")
+ otherVC := &VC{remoteEP: otherEP, localPrincipal: otherP}
+
+ cache.Insert(vc)
+ cache.Insert(otherVC)
+ cache.Delete(vc)
+ if got, want := cache.Close(), []*VC{otherVC}; !vcsEqual(got, want) {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func TestInsertClose(t *testing.T) {
+ cache := NewVCCache()
+ ep, err := inaming.NewEndpoint("foo:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ p := testutil.NewPrincipal("test")
+ vc := &VC{remoteEP: ep, localPrincipal: p}
+
+ if err := cache.Insert(vc); err != nil {
+ t.Errorf("the cache is not closed yet")
+ }
+ if got, want := cache.Close(), []*VC{vc}; !vcsEqual(got, want) {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ if err := cache.Insert(vc); err == nil {
+ t.Errorf("the cache has been closed")
+ }
+}
+
+func TestReservedFind(t *testing.T) {
+ cache := NewVCCache()
+ ep, err := inaming.NewEndpoint("foo:8888")
+ if err != nil {
+ t.Fatal(err)
+ }
+ p := testutil.NewPrincipal("test")
+ vc := &VC{remoteEP: ep, localPrincipal: p}
+ cache.Insert(vc)
+
+ // We should be able to find the vc in the cache.
+ if got, err := cache.ReservedFind(ep, p); err != nil || got != vc {
+ t.Errorf("got %v, want %v, err: %v", got, vc, err)
+ }
+
+ // If we change the endpoint or the principal, we should get nothing.
+ otherEP, err := inaming.NewEndpoint("bar: 7777")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got, err := cache.ReservedFind(otherEP, p); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ if got, err := cache.ReservedFind(ep, testutil.NewPrincipal("wrong")); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+
+ // A subsequent ReservedFind call that matches a previous failed ReservedFind
+ // should block until a matching Unreserve call is made.
+ ch := make(chan *VC, 1)
+ go func(ch chan *VC) {
+ vc, err := cache.ReservedFind(otherEP, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ch <- vc
+ }(ch)
+
+ // We insert the otherEP into the cache.
+ otherVC := &VC{remoteEP: otherEP, localPrincipal: p}
+ cache.Insert(otherVC)
+ cache.Unreserve(otherEP, p)
+
+ // Now the cache.BlcokingFind should have returned the correct otherVC.
+ if cachedVC := <-ch; cachedVC != otherVC {
+ t.Errorf("got %v, want %v", cachedVC, otherVC)
+ }
+}
+
+func vcsEqual(a, b []*VC) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ m := make(map[*VC]int)
+ for _, v := range a {
+ m[v]++
+ }
+ for _, v := range b {
+ m[v]--
+ }
+ for _, i := range m {
+ if i != 0 {
+ return false
+ }
+ }
+ return true
+}
diff --git a/profiles/internal/rpc/stream/vif/set.go b/profiles/internal/rpc/stream/vif/set.go
index b966735..497e43b 100644
--- a/profiles/internal/rpc/stream/vif/set.go
+++ b/profiles/internal/rpc/stream/vif/set.go
@@ -45,7 +45,8 @@
return s.find(network, address, true)
}
-// Unblock broadcasts all threads
+// Unblock marks the status of the network, address as no longer started, and
+// broadcasts waiting threads.
func (s *Set) Unblock(network, address string) {
s.mu.Lock()
delete(s.started, key(network, address))
@@ -60,7 +61,7 @@
return s.find(network, address, false)
}
-// Insert adds a VIF to the set
+// Insert adds a VIF to the set.
func (s *Set) Insert(vif *VIF) {
addr := vif.conn.RemoteAddr()
k := key(addr.Network(), addr.String())
@@ -76,7 +77,7 @@
vif.addSet(s)
}
-// Delete removes a VIF from the set
+// Delete removes a VIF from the set.
func (s *Set) Delete(vif *VIF) {
vif.removeSet(s)
addr := vif.conn.RemoteAddr()