Merge "services/device/device: address Robin's comment from v.io/c/14213"
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 14457eb..17a5a0a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -118,6 +118,10 @@
 // with an error and no more flows will be sent to the FlowHandler.
 func (c *Conn) Closed() <-chan struct{} { return c.closed }
 
+// Close marks the Conn as closed. All Dial calls will fail with an error and
+// no more flows will be sent to the FlowHandler.
+func (c *Conn) Close() { close(c.closed) }
+
 // LocalEndpoint returns the local vanadium Endpoint
 func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
 
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/conn/conncache.go
new file mode 100644
index 0000000..6ac8b93
--- /dev/null
+++ b/runtime/internal/flow/conn/conncache.go
@@ -0,0 +1,207 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"strings"
+	"sync"
+
+	"v.io/v23/naming"
+)
+
+// ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
+// and (routingID).
+// Multiple goroutines can invoke methods on the ConnCache simultaneously.
+// TODO(suharshs): We should periodically look for closed connections and remove them.
+type ConnCache struct {
+	mu        *sync.Mutex
+	addrCache map[string]*connEntry           // keyed by (protocol, address, blessingNames)
+	ridCache  map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
+	started   map[string]bool                 // keyed by (protocol, address, blessingNames)
+	cond      *sync.Cond
+	head      *connEntry // the head and tail pointer of the linked list for implementing LRU.
+}
+
+type connEntry struct {
+	conn       *Conn
+	rid        naming.RoutingID
+	addrKey    string
+	next, prev *connEntry
+}
+
+func NewConnCache() *ConnCache {
+	mu := &sync.Mutex{}
+	cond := sync.NewCond(mu)
+	head := &connEntry{}
+	head.next, head.prev = head, head
+	return &ConnCache{
+		mu:        mu,
+		addrCache: make(map[string]*connEntry),
+		ridCache:  make(map[naming.RoutingID]*connEntry),
+		started:   make(map[string]bool),
+		cond:      cond,
+		head:      head,
+	}
+}
+
+// ReservedFind returns a Conn where the remote end of the connection is
+// identified by the provided (protocol, address, blessingNames). nil is
+// returned if there is no such Conn.
+//
+// ReservedFind will return an error iff the cache is closed.
+// If no error is returned, the caller is required to call Unreserve, to avoid
+// deadlock. The (protocol, address, blessingNames) provided to Unreserve must
+// be the same as the arguments provided to ReservedFind.
+// All new ReservedFind calls for the (protocol, address, blessings) will Block
+// until the corresponding Unreserve call is made.
+func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*Conn, error) {
+	k := key(protocol, address, blessingNames)
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	for c.started[k] {
+		c.cond.Wait()
+	}
+	if c.addrCache == nil {
+		return nil, NewErrCacheClosed(nil)
+	}
+	c.started[k] = true
+	entry := c.addrCache[k]
+	if entry == nil {
+		return nil, nil
+	}
+	if isClosed(entry.conn) {
+		delete(c.addrCache, entry.addrKey)
+		delete(c.ridCache, entry.rid)
+		entry.removeFromList()
+		return nil, nil
+	}
+	entry.moveAfter(c.head)
+	return entry.conn, nil
+}
+
+// Unreserve marks the status of the (protocol, address, blessingNames) as no
+// longer started, and broadcasts waiting threads.
+func (c *ConnCache) Unreserve(protocol, address string, blessingNames []string) {
+	c.mu.Lock()
+	delete(c.started, key(protocol, address, blessingNames))
+	c.cond.Broadcast()
+	c.mu.Unlock()
+}
+
+// Insert adds conn to the cache.
+// An error will be returned iff the cache has been closed.
+func (c *ConnCache) Insert(conn *Conn) error {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	if c.addrCache == nil {
+		return NewErrCacheClosed(nil)
+	}
+	ep := conn.RemoteEndpoint()
+	k := key(ep.Addr().Network(), ep.Addr().String(), ep.BlessingNames())
+	entry := &connEntry{
+		conn:    conn,
+		rid:     ep.RoutingID(),
+		addrKey: k,
+	}
+	c.addrCache[k] = entry
+	c.ridCache[entry.rid] = entry
+	entry.moveAfter(c.head)
+	return nil
+}
+
+// Close marks the ConnCache as closed and closes all Conns in the cache.
+func (c *ConnCache) Close() {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	c.addrCache, c.ridCache, c.started = nil, nil, nil
+	d := c.head.next
+	for d != c.head {
+		d.conn.Close()
+		d = d.next
+	}
+	c.head = nil
+}
+
+// KillConnections will close and remove num LRU Conns in the cache.
+// This is useful when the manager is approaching system FD limits.
+// If num is greater than the number of connections in the cache, all cached
+// connections will be closed and removed.
+// KillConnections returns an error iff the cache is closed.
+func (c *ConnCache) KillConnections(num int) error {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	if c.addrCache == nil {
+		return NewErrCacheClosed(nil)
+	}
+	d := c.head.prev
+	for i := 0; i < num; i++ {
+		if d == c.head {
+			break
+		}
+		d.conn.Close()
+		delete(c.addrCache, d.addrKey)
+		delete(c.ridCache, d.rid)
+		prev := d.prev
+		d.removeFromList()
+		d = prev
+	}
+	return nil
+}
+
+// FindWithRoutingID returns a Conn where the remote end of the connection
+// is identified by the provided rid. nil is returned if there is no such Conn.
+// FindWithRoutingID will return an error iff the cache is closed.
+func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*Conn, error) {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	if c.addrCache == nil {
+		return nil, NewErrCacheClosed(nil)
+	}
+	entry := c.ridCache[rid]
+	if entry == nil {
+		return nil, nil
+	}
+	if isClosed(entry.conn) {
+		delete(c.addrCache, entry.addrKey)
+		delete(c.ridCache, entry.rid)
+		entry.removeFromList()
+		return nil, nil
+	}
+	entry.moveAfter(c.head)
+	return entry.conn, nil
+}
+
+func key(protocol, address string, blessingNames []string) string {
+	// TODO(suharshs): We may be able to do something more inclusive with our
+	// blessingNames.
+	return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
+}
+
+func (c *connEntry) removeFromList() {
+	if c.prev != nil {
+		c.prev.next = c.next
+	}
+	if c.next != nil {
+		c.next.prev = c.prev
+	}
+	c.next, c.prev = nil, nil
+}
+
+func (c *connEntry) moveAfter(prev *connEntry) {
+	c.removeFromList()
+	c.prev = prev
+	c.next = prev.next
+	prev.next.prev = c
+	prev.next = c
+}
+
+func isClosed(conn *Conn) bool {
+	select {
+	case <-conn.closed:
+		return true
+	default:
+		return false
+	}
+}
diff --git a/runtime/internal/flow/conn/conncache_test.go b/runtime/internal/flow/conn/conncache_test.go
new file mode 100644
index 0000000..0f9bcaf
--- /dev/null
+++ b/runtime/internal/flow/conn/conncache_test.go
@@ -0,0 +1,264 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"strconv"
+	"testing"
+
+	"v.io/v23/naming"
+
+	inaming "v.io/x/ref/runtime/internal/naming"
+)
+
+func TestCache(t *testing.T) {
+	c := NewConnCache()
+	remote := &inaming.Endpoint{
+		Protocol:  "tcp",
+		Address:   "127.0.0.1:1111",
+		RID:       naming.FixedRoutingID(0x5555),
+		Blessings: []string{"A", "B", "C"},
+	}
+	conn := &Conn{
+		remote: remote,
+		closed: make(chan struct{}),
+	}
+	if err := c.Insert(conn); err != nil {
+		t.Fatal(err)
+	}
+	// We should be able to find the conn in the cache.
+	if got, err := c.ReservedFind(remote.Protocol, remote.Address, remote.Blessings); err != nil || got != conn {
+		t.Errorf("got %v, want %v, err: %v", got, conn, err)
+	}
+	c.Unreserve(remote.Protocol, remote.Address, remote.Blessings)
+	// Changing the protocol should fail.
+	if got, err := c.ReservedFind("wrong", remote.Address, remote.Blessings); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+	c.Unreserve("wrong", remote.Address, remote.Blessings)
+	// Changing the address should fail.
+	if got, err := c.ReservedFind(remote.Protocol, "wrong", remote.Blessings); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+	c.Unreserve(remote.Protocol, "wrong", remote.Blessings)
+	// Changing the blessingNames should fail.
+	if got, err := c.ReservedFind(remote.Protocol, remote.Address, []string{"wrong"}); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+	c.Unreserve(remote.Protocol, remote.Address, []string{"wrong"})
+
+	// We should be able to find the conn in the cache by looking up the RoutingID.
+	if got, err := c.FindWithRoutingID(remote.RID); err != nil || got != conn {
+		t.Errorf("got %v, want %v, err: %v", got, conn, err)
+	}
+	// Looking up the wrong RID should fail.
+	if got, err := c.FindWithRoutingID(naming.FixedRoutingID(0x1111)); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+
+	otherEP := &inaming.Endpoint{
+		Protocol:  "other",
+		Address:   "other",
+		Blessings: []string{"other"},
+	}
+	otherConn := &Conn{
+		remote: otherEP,
+		closed: make(chan struct{}),
+	}
+	// Looking up a not yet inserted endpoint should fail.
+	if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
+		t.Errorf("got %v, want <nil>, err: %v", got, err)
+	}
+	// Looking it up again should block until a matching Unreserve call is made.
+	ch := make(chan *Conn, 1)
+	go func(ch chan *Conn) {
+		conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
+		if err != nil {
+			t.Fatal(err)
+		}
+		ch <- conn
+	}(ch)
+
+	// We insert the other conn into the cache.
+	if err := c.Insert(otherConn); err != nil {
+		t.Fatal(err)
+	}
+	c.Unreserve(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
+	// Now the c.ReservedFind should have unblocked and returned the correct Conn.
+	if cachedConn := <-ch; cachedConn != otherConn {
+		t.Errorf("got %v, want %v", cachedConn, otherConn)
+	}
+
+	// Closing the cache should close all the connections in the cache.
+	// Ensure that the conns are not closed yet.
+	if isClosed(conn) {
+		t.Fatalf("wanted conn to not be closed")
+	}
+	if isClosed(otherConn) {
+		t.Fatalf("wanted otherConn to not be closed")
+	}
+	c.Close()
+	// Now the connections should be closed.
+	if !isClosed(conn) {
+		t.Errorf("wanted conn to be closed")
+	}
+	if !isClosed(otherConn) {
+		t.Errorf("wanted otherConn to be closed")
+	}
+}
+
+func TestLRU(t *testing.T) {
+	// Ensure that the least recently inserted conns are killed by KillConnections.
+	c := NewConnCache()
+	conns := nConns(10)
+	for _, conn := range conns {
+		if err := c.Insert(conn); err != nil {
+			t.Fatal(err)
+		}
+	}
+	if err := c.KillConnections(3); err != nil {
+		t.Fatal(err)
+	}
+	if !cacheSizeMatches(c) {
+		t.Errorf("the size of the caches and LRU list do not match")
+	}
+	// conns[3:] should not be closed and still in the cache.
+	// conns[:3] should be closed and removed from the cache.
+	for _, conn := range conns[3:] {
+		if isClosed(conn) {
+			t.Errorf("conn %v should not have been closed", conn)
+		}
+		if !isInCache(t, c, conn) {
+			t.Errorf("conn %v should still be in cache", conn)
+		}
+	}
+	for _, conn := range conns[:3] {
+		if !isClosed(conn) {
+			t.Errorf("conn %v should have been closed", conn)
+		}
+		if isInCache(t, c, conn) {
+			t.Errorf("conn %v should not be in cache", conn)
+		}
+	}
+
+	// Ensure that ReservedFind marks conns as more recently used.
+	c = NewConnCache()
+	conns = nConns(10)
+	for _, conn := range conns {
+		if err := c.Insert(conn); err != nil {
+			t.Fatal(err)
+		}
+	}
+	for _, conn := range conns[:7] {
+		if got, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames()); err != nil || got != conn {
+			t.Errorf("got %v, want %v, err: %v", got, conn, err)
+		}
+		c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+	}
+	if err := c.KillConnections(3); err != nil {
+		t.Fatal(err)
+	}
+	if !cacheSizeMatches(c) {
+		t.Errorf("the size of the caches and LRU list do not match")
+	}
+	// conns[:7] should not be closed and still in the cache.
+	// conns[7:] should be closed and removed from the cache.
+	for _, conn := range conns[:7] {
+		if isClosed(conn) {
+			t.Errorf("conn %v should not have been closed", conn)
+		}
+		if !isInCache(t, c, conn) {
+			t.Errorf("conn %v should still be in cache", conn)
+		}
+	}
+	for _, conn := range conns[7:] {
+		if !isClosed(conn) {
+			t.Errorf("conn %v should have been closed", conn)
+		}
+		if isInCache(t, c, conn) {
+			t.Errorf("conn %v should not be in cache", conn)
+		}
+	}
+
+	// Ensure that FindWithRoutingID marks conns as more recently used.
+	c = NewConnCache()
+	conns = nConns(10)
+	for _, conn := range conns {
+		if err := c.Insert(conn); err != nil {
+			t.Fatal(err)
+		}
+	}
+	for _, conn := range conns[:7] {
+		if got, err := c.FindWithRoutingID(conn.remote.RoutingID()); err != nil || got != conn {
+			t.Errorf("got %v, want %v, err: %v", got, conn, err)
+		}
+	}
+	if err := c.KillConnections(3); err != nil {
+		t.Fatal(err)
+	}
+	if !cacheSizeMatches(c) {
+		t.Errorf("the size of the caches and LRU list do not match")
+	}
+	// conns[:7] should not be closed and still in the cache.
+	// conns[7:] should be closed and removed from the cache.
+	for _, conn := range conns[:7] {
+		if isClosed(conn) {
+			t.Errorf("conn %v should not have been closed", conn)
+		}
+		if !isInCache(t, c, conn) {
+			t.Errorf("conn %v should still be in cache", conn)
+		}
+	}
+	for _, conn := range conns[7:] {
+		if !isClosed(conn) {
+			t.Errorf("conn %v should have been closed", conn)
+		}
+		if isInCache(t, c, conn) {
+			t.Errorf("conn %v should not be in cache", conn)
+		}
+	}
+}
+
+func isInCache(t *testing.T, c *ConnCache, conn *Conn) bool {
+	rfconn, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+	if err != nil {
+		t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
+	}
+	c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+	ridconn, err := c.FindWithRoutingID(conn.remote.RoutingID())
+	if err != nil {
+		t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
+	}
+	return rfconn != nil || ridconn != nil
+}
+
+func cacheSizeMatches(c *ConnCache) bool {
+	ls := listSize(c)
+	return ls == len(c.addrCache) && ls == len(c.ridCache)
+}
+
+func listSize(c *ConnCache) int {
+	size := 0
+	d := c.head.next
+	for d != c.head {
+		size++
+		d = d.next
+	}
+	return size
+}
+
+func nConns(n int) []*Conn {
+	conns := make([]*Conn, n)
+	for i := 0; i < n; i++ {
+		conns[i] = &Conn{
+			remote: &inaming.Endpoint{
+				Protocol: strconv.Itoa(i),
+				RID:      naming.FixedRoutingID(uint64(i)),
+			},
+			closed: make(chan struct{}),
+		}
+	}
+	return conns
+}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 253ec86..eb0d216 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -16,4 +16,5 @@
   "en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
   UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
   UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+  CacheClosed() {"en":"cache is closed"}
 )
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index 15fb4cd..de2eda9 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -19,6 +19,7 @@
 	ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
 	ErrUnknownMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
 	ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+	ErrCacheClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
 )
 
 func init() {
@@ -26,6 +27,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
 }
 
 // NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
@@ -47,3 +49,8 @@
 func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
 	return verror.New(ErrUnknownControlMsg, ctx, cmd)
 }
+
+// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
+func NewErrCacheClosed(ctx *context.T) error {
+	return verror.New(ErrCacheClosed, ctx)
+}