rpc/client: Ensure that concurrent VC creations to the same endpoint use
the cache.

VC equivalent of https://vanadium-review.googlesource.com/#/c/10828/.

Change-Id: Ic822699aef953d10911ff9a197a218ce4881151c
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index 2efc1b3..f00c4ba 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -82,33 +82,19 @@
 	// TODO(jhahn): Add monitoring the network interface changes.
 	ipNets []*net.IPNet
 
-	// We support concurrent calls to StartCall and Close, so we must protect the
-	// vcMap.  Everything else is initialized upon client construction, and safe
-	// to use concurrently.
-	vcMapMu sync.Mutex
-	vcMap   map[vcMapKey]*vcInfo
+	vcCache *vc.VCCache
 
 	dc vc.DischargeClient
 }
 
 var _ rpc.Client = (*client)(nil)
 
-type vcInfo struct {
-	vc       stream.VC
-	remoteEP naming.Endpoint
-}
-
-type vcMapKey struct {
-	endpoint        string
-	clientPublicKey string // clientPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
-}
-
 func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
 	c := &client{
 		streamMgr: streamMgr,
 		ns:        ns,
 		ipNets:    ipNetworks(),
-		vcMap:     make(map[vcMapKey]*vcInfo),
+		vcCache:   vc.NewVCCache(),
 	}
 	c.dc = InternalNewDischargeClient(nil, c, 0)
 	for _, opt := range opts {
@@ -125,54 +111,54 @@
 }
 
 func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
-	c.vcMapMu.Lock()
-	defer c.vcMapMu.Unlock()
-
 	suberr := func(err error) *verror.SubErr {
 		return &verror.SubErr{Err: err, Options: verror.Print}
 	}
 
-	if c.vcMap == nil {
+	found, err := c.vcCache.ReservedFind(ep, principal)
+	if err != nil {
 		return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
 	}
-
-	vcKey := vcMapKey{endpoint: ep.String()}
-	if principal != nil {
-		vcKey.clientPublicKey = principal.PublicKey().String()
-	}
-	if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
-		if flow, err := vcinfo.vc.Connect(); err == nil {
+	defer c.vcCache.Unreserve(ep, principal)
+	if found != nil {
+		// We are serializing the creation of all flows per VC. This is okay
+		// because if one flow creation is to block, it is likely that all others
+		// for that VC would block as well.
+		if flow, err := found.Connect(); err == nil {
 			return flow, nil
 		}
 		// If the vc fails to establish a new flow, we assume it's
-		// broken, remove it from the map, and proceed to establishing
+		// broken, remove it from the cache, and proceed to establishing
 		// a new vc.
+		//
+		// TODO(suharshs): The decision to redial 1 time when the dialing the vc
+		// in the cache fails is a bit inconsistent with the behavior when a newly
+		// dialed vc.Connect fails. We should revisit this.
+		//
 		// TODO(caprita): Should we distinguish errors due to vc being
 		// closed from other errors?  If not, should we call vc.Close()
-		// before removing the vc from the map?
-		delete(c.vcMap, vcKey)
+		// before removing the vc from the cache?
+		if err := c.vcCache.Delete(found); err != nil {
+			return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+		}
 	}
+
 	sm := c.streamMgr
-	c.vcMapMu.Unlock()
-	vc, err := sm.Dial(ep, principal, vcOpts...)
-	c.vcMapMu.Lock()
+	v, err := sm.Dial(ep, principal, vcOpts...)
 	if err != nil {
 		return nil, suberr(err)
 	}
-	if c.vcMap == nil {
+
+	flow, err := v.Connect()
+	if err != nil {
+		return nil, suberr(err)
+	}
+
+	if err := c.vcCache.Insert(v.(*vc.VC)); err != nil {
 		sm.ShutdownEndpoint(ep)
 		return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
 	}
-	if othervc, exists := c.vcMap[vcKey]; exists {
-		go vc.Close(nil)
-		vc = othervc.vc
-	} else {
-		c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
-	}
-	flow, err := vc.Connect()
-	if err != nil {
-		return nil, suberr(err)
-	}
+
 	return flow, nil
 }
 
@@ -744,12 +730,9 @@
 
 func (c *client) Close() {
 	defer vlog.LogCall()()
-	c.vcMapMu.Lock()
-	for _, v := range c.vcMap {
-		c.streamMgr.ShutdownEndpoint(v.remoteEP)
+	for _, v := range c.vcCache.Close() {
+		c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
 	}
-	c.vcMap = nil
-	c.vcMapMu.Unlock()
 }
 
 // flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/profiles/internal/rpc/stream/vc/vc_cache.go b/profiles/internal/rpc/stream/vc/vc_cache.go
new file mode 100644
index 0000000..d962cfa
--- /dev/null
+++ b/profiles/internal/rpc/stream/vc/vc_cache.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+	"sync"
+
+	"v.io/v23/naming"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+)
+
+var errVCCacheClosed = reg(".errVCCacheClosed", "vc cache has been closed")
+
+// VCCache implements a set of VIFs keyed by the endpoint of the remote end and the
+// local principal. Multiple goroutines can invoke methods on the VCCache simultaneously.
+type VCCache struct {
+	mu      sync.Mutex
+	cache   map[vcKey]*VC  // GUARDED_BY(mu)
+	started map[vcKey]bool // GUARDED_BY(mu)
+	cond    *sync.Cond
+}
+
+// NewVCCache returns a new cache for VCs.
+func NewVCCache() *VCCache {
+	c := &VCCache{
+		cache:   make(map[vcKey]*VC),
+		started: make(map[vcKey]bool),
+	}
+	c.cond = sync.NewCond(&c.mu)
+	return c
+}
+
+// ReservedFind returns a VC where the remote end of the underlying connection
+// is identified by the provided (ep, p.PublicKey). Returns nil if there is no
+// such VC.
+//
+// Iff the cache is closed, ReservedFind will return an error.
+// If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock.
+// The ep, and p.PublicKey in Unreserve must be the same as used in the ReservedFind call.
+// During this time, all new ReservedFind calls for this ep and p will Block until
+// the corresponding Unreserve call is made.
+func (c *VCCache) ReservedFind(ep naming.Endpoint, p security.Principal) (*VC, error) {
+	k := c.key(ep, p)
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	for c.started[k] {
+		c.cond.Wait()
+	}
+	if c.cache == nil {
+		return nil, verror.New(errVCCacheClosed, nil)
+	}
+	c.started[k] = true
+	return c.cache[k], nil
+}
+
+// Unreserve marks the status of the ep, p as no longer started, and
+// broadcasts waiting threads.
+func (c *VCCache) Unreserve(ep naming.Endpoint, p security.Principal) {
+	c.mu.Lock()
+	delete(c.started, c.key(ep, p))
+	c.cond.Broadcast()
+	c.mu.Unlock()
+}
+
+// Insert adds vc to the cache and returns an error iff the cache has been closed.
+func (c *VCCache) Insert(vc *VC) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.cache == nil {
+		return verror.New(errVCCacheClosed, nil)
+	}
+	c.cache[c.key(vc.RemoteEndpoint(), vc.LocalPrincipal())] = vc
+	return nil
+}
+
+// Close marks the VCCache as closed and returns the VCs remaining in the cache.
+func (c *VCCache) Close() []*VC {
+	c.mu.Lock()
+	vcs := make([]*VC, 0, len(c.cache))
+	for _, vc := range c.cache {
+		vcs = append(vcs, vc)
+	}
+	c.cache = nil
+	c.started = nil
+	c.mu.Unlock()
+	return vcs
+}
+
+// Delete removes vc from the cache, returning an error iff the cache has been closed.
+func (c *VCCache) Delete(vc *VC) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.cache == nil {
+		return verror.New(errVCCacheClosed, nil)
+	}
+	delete(c.cache, c.key(vc.RemoteEndpoint(), vc.LocalPrincipal()))
+	return nil
+}
+
+type vcKey struct {
+	remoteEP       string
+	localPublicKey string // localPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
+}
+
+func (c *VCCache) key(ep naming.Endpoint, p security.Principal) vcKey {
+	k := vcKey{remoteEP: ep.String()}
+	if p != nil {
+		k.localPublicKey = p.PublicKey().String()
+	}
+	return k
+}
diff --git a/profiles/internal/rpc/stream/vc/vc_cache_test.go b/profiles/internal/rpc/stream/vc/vc_cache_test.go
new file mode 100644
index 0000000..8bfa144
--- /dev/null
+++ b/profiles/internal/rpc/stream/vc/vc_cache_test.go
@@ -0,0 +1,123 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+	"testing"
+
+	inaming "v.io/x/ref/profiles/internal/naming"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestInsertDelete(t *testing.T) {
+	cache := NewVCCache()
+	ep, err := inaming.NewEndpoint("foo:8888")
+	if err != nil {
+		t.Fatal(err)
+	}
+	p := testutil.NewPrincipal("test")
+	vc := &VC{remoteEP: ep, localPrincipal: p}
+	otherEP, err := inaming.NewEndpoint("foo:8888")
+	if err != nil {
+		t.Fatal(err)
+	}
+	otherP := testutil.NewPrincipal("test")
+	otherVC := &VC{remoteEP: otherEP, localPrincipal: otherP}
+
+	cache.Insert(vc)
+	cache.Insert(otherVC)
+	cache.Delete(vc)
+	if got, want := cache.Close(), []*VC{otherVC}; !vcsEqual(got, want) {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func TestInsertClose(t *testing.T) {
+	cache := NewVCCache()
+	ep, err := inaming.NewEndpoint("foo:8888")
+	if err != nil {
+		t.Fatal(err)
+	}
+	p := testutil.NewPrincipal("test")
+	vc := &VC{remoteEP: ep, localPrincipal: p}
+
+	if err := cache.Insert(vc); err != nil {
+		t.Errorf("the cache is not closed yet")
+	}
+	if got, want := cache.Close(), []*VC{vc}; !vcsEqual(got, want) {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	if err := cache.Insert(vc); err == nil {
+		t.Errorf("the cache has been closed")
+	}
+}
+
+func TestReservedFind(t *testing.T) {
+	cache := NewVCCache()
+	ep, err := inaming.NewEndpoint("foo:8888")
+	if err != nil {
+		t.Fatal(err)
+	}
+	p := testutil.NewPrincipal("test")
+	vc := &VC{remoteEP: ep, localPrincipal: p}
+	cache.Insert(vc)
+
+	// We should be able to find the vc in the cache.
+	if got, err := cache.ReservedFind(ep, p); err != nil || got != vc {
+		t.Errorf("got %v, want %v, err: %v", got, vc, err)
+	}
+
+	// If we change the endpoint or the principal, we should get nothing.
+	otherEP, err := inaming.NewEndpoint("bar: 7777")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got, err := cache.ReservedFind(otherEP, p); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+	if got, err := cache.ReservedFind(ep, testutil.NewPrincipal("wrong")); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+
+	// A subsequent ReservedFind call that matches a previous failed ReservedFind
+	// should block until a matching Unreserve call is made.
+	ch := make(chan *VC, 1)
+	go func(ch chan *VC) {
+		vc, err := cache.ReservedFind(otherEP, p)
+		if err != nil {
+			t.Fatal(err)
+		}
+		ch <- vc
+	}(ch)
+
+	// We insert the otherEP into the cache.
+	otherVC := &VC{remoteEP: otherEP, localPrincipal: p}
+	cache.Insert(otherVC)
+	cache.Unreserve(otherEP, p)
+
+	// Now the cache.BlcokingFind should have returned the correct otherVC.
+	if cachedVC := <-ch; cachedVC != otherVC {
+		t.Errorf("got %v, want %v", cachedVC, otherVC)
+	}
+}
+
+func vcsEqual(a, b []*VC) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	m := make(map[*VC]int)
+	for _, v := range a {
+		m[v]++
+	}
+	for _, v := range b {
+		m[v]--
+	}
+	for _, i := range m {
+		if i != 0 {
+			return false
+		}
+	}
+	return true
+}
diff --git a/profiles/internal/rpc/stream/vif/set.go b/profiles/internal/rpc/stream/vif/set.go
index b966735..497e43b 100644
--- a/profiles/internal/rpc/stream/vif/set.go
+++ b/profiles/internal/rpc/stream/vif/set.go
@@ -45,7 +45,8 @@
 	return s.find(network, address, true)
 }
 
-// Unblock broadcasts all threads
+// Unblock marks the status of the network, address as no longer started, and
+// broadcasts waiting threads.
 func (s *Set) Unblock(network, address string) {
 	s.mu.Lock()
 	delete(s.started, key(network, address))
@@ -60,7 +61,7 @@
 	return s.find(network, address, false)
 }
 
-// Insert adds a VIF to the set
+// Insert adds a VIF to the set.
 func (s *Set) Insert(vif *VIF) {
 	addr := vif.conn.RemoteAddr()
 	k := key(addr.Network(), addr.String())
@@ -76,7 +77,7 @@
 	vif.addSet(s)
 }
 
-// Delete removes a VIF from the set
+// Delete removes a VIF from the set.
 func (s *Set) Delete(vif *VIF) {
 	vif.removeSet(s)
 	addr := vif.conn.RemoteAddr()