Merge "services/device/device: address Robin's comment from v.io/c/14213"
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 14457eb..17a5a0a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -118,6 +118,10 @@
// with an error and no more flows will be sent to the FlowHandler.
func (c *Conn) Closed() <-chan struct{} { return c.closed }
+// Close marks the Conn as closed. All Dial calls will fail with an error and
+// no more flows will be sent to the FlowHandler.
+func (c *Conn) Close() { close(c.closed) }
+
// LocalEndpoint returns the local vanadium Endpoint
func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/conn/conncache.go
new file mode 100644
index 0000000..6ac8b93
--- /dev/null
+++ b/runtime/internal/flow/conn/conncache.go
@@ -0,0 +1,207 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "strings"
+ "sync"
+
+ "v.io/v23/naming"
+)
+
+// ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
+// and (routingID).
+// Multiple goroutines can invoke methods on the ConnCache simultaneously.
+// TODO(suharshs): We should periodically look for closed connections and remove them.
+type ConnCache struct {
+ mu *sync.Mutex
+ addrCache map[string]*connEntry // keyed by (protocol, address, blessingNames)
+ ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
+ started map[string]bool // keyed by (protocol, address, blessingNames)
+ cond *sync.Cond
+ head *connEntry // the head and tail pointer of the linked list for implementing LRU.
+}
+
+type connEntry struct {
+ conn *Conn
+ rid naming.RoutingID
+ addrKey string
+ next, prev *connEntry
+}
+
+func NewConnCache() *ConnCache {
+ mu := &sync.Mutex{}
+ cond := sync.NewCond(mu)
+ head := &connEntry{}
+ head.next, head.prev = head, head
+ return &ConnCache{
+ mu: mu,
+ addrCache: make(map[string]*connEntry),
+ ridCache: make(map[naming.RoutingID]*connEntry),
+ started: make(map[string]bool),
+ cond: cond,
+ head: head,
+ }
+}
+
+// ReservedFind returns a Conn where the remote end of the connection is
+// identified by the provided (protocol, address, blessingNames). nil is
+// returned if there is no such Conn.
+//
+// ReservedFind will return an error iff the cache is closed.
+// If no error is returned, the caller is required to call Unreserve, to avoid
+// deadlock. The (protocol, address, blessingNames) provided to Unreserve must
+// be the same as the arguments provided to ReservedFind.
+// All new ReservedFind calls for the (protocol, address, blessings) will Block
+// until the corresponding Unreserve call is made.
+func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*Conn, error) {
+ k := key(protocol, address, blessingNames)
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ for c.started[k] {
+ c.cond.Wait()
+ }
+ if c.addrCache == nil {
+ return nil, NewErrCacheClosed(nil)
+ }
+ c.started[k] = true
+ entry := c.addrCache[k]
+ if entry == nil {
+ return nil, nil
+ }
+ if isClosed(entry.conn) {
+ delete(c.addrCache, entry.addrKey)
+ delete(c.ridCache, entry.rid)
+ entry.removeFromList()
+ return nil, nil
+ }
+ entry.moveAfter(c.head)
+ return entry.conn, nil
+}
+
+// Unreserve marks the status of the (protocol, address, blessingNames) as no
+// longer started, and broadcasts waiting threads.
+func (c *ConnCache) Unreserve(protocol, address string, blessingNames []string) {
+ c.mu.Lock()
+ delete(c.started, key(protocol, address, blessingNames))
+ c.cond.Broadcast()
+ c.mu.Unlock()
+}
+
+// Insert adds conn to the cache.
+// An error will be returned iff the cache has been closed.
+func (c *ConnCache) Insert(conn *Conn) error {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.addrCache == nil {
+ return NewErrCacheClosed(nil)
+ }
+ ep := conn.RemoteEndpoint()
+ k := key(ep.Addr().Network(), ep.Addr().String(), ep.BlessingNames())
+ entry := &connEntry{
+ conn: conn,
+ rid: ep.RoutingID(),
+ addrKey: k,
+ }
+ c.addrCache[k] = entry
+ c.ridCache[entry.rid] = entry
+ entry.moveAfter(c.head)
+ return nil
+}
+
+// Close marks the ConnCache as closed and closes all Conns in the cache.
+func (c *ConnCache) Close() {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ c.addrCache, c.ridCache, c.started = nil, nil, nil
+ d := c.head.next
+ for d != c.head {
+ d.conn.Close()
+ d = d.next
+ }
+ c.head = nil
+}
+
+// KillConnections will close and remove num LRU Conns in the cache.
+// This is useful when the manager is approaching system FD limits.
+// If num is greater than the number of connections in the cache, all cached
+// connections will be closed and removed.
+// KillConnections returns an error iff the cache is closed.
+func (c *ConnCache) KillConnections(num int) error {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.addrCache == nil {
+ return NewErrCacheClosed(nil)
+ }
+ d := c.head.prev
+ for i := 0; i < num; i++ {
+ if d == c.head {
+ break
+ }
+ d.conn.Close()
+ delete(c.addrCache, d.addrKey)
+ delete(c.ridCache, d.rid)
+ prev := d.prev
+ d.removeFromList()
+ d = prev
+ }
+ return nil
+}
+
+// FindWithRoutingID returns a Conn where the remote end of the connection
+// is identified by the provided rid. nil is returned if there is no such Conn.
+// FindWithRoutingID will return an error iff the cache is closed.
+func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*Conn, error) {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.addrCache == nil {
+ return nil, NewErrCacheClosed(nil)
+ }
+ entry := c.ridCache[rid]
+ if entry == nil {
+ return nil, nil
+ }
+ if isClosed(entry.conn) {
+ delete(c.addrCache, entry.addrKey)
+ delete(c.ridCache, entry.rid)
+ entry.removeFromList()
+ return nil, nil
+ }
+ entry.moveAfter(c.head)
+ return entry.conn, nil
+}
+
+func key(protocol, address string, blessingNames []string) string {
+ // TODO(suharshs): We may be able to do something more inclusive with our
+ // blessingNames.
+ return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
+}
+
+func (c *connEntry) removeFromList() {
+ if c.prev != nil {
+ c.prev.next = c.next
+ }
+ if c.next != nil {
+ c.next.prev = c.prev
+ }
+ c.next, c.prev = nil, nil
+}
+
+func (c *connEntry) moveAfter(prev *connEntry) {
+ c.removeFromList()
+ c.prev = prev
+ c.next = prev.next
+ prev.next.prev = c
+ prev.next = c
+}
+
+func isClosed(conn *Conn) bool {
+ select {
+ case <-conn.closed:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/runtime/internal/flow/conn/conncache_test.go b/runtime/internal/flow/conn/conncache_test.go
new file mode 100644
index 0000000..0f9bcaf
--- /dev/null
+++ b/runtime/internal/flow/conn/conncache_test.go
@@ -0,0 +1,264 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "strconv"
+ "testing"
+
+ "v.io/v23/naming"
+
+ inaming "v.io/x/ref/runtime/internal/naming"
+)
+
+func TestCache(t *testing.T) {
+ c := NewConnCache()
+ remote := &inaming.Endpoint{
+ Protocol: "tcp",
+ Address: "127.0.0.1:1111",
+ RID: naming.FixedRoutingID(0x5555),
+ Blessings: []string{"A", "B", "C"},
+ }
+ conn := &Conn{
+ remote: remote,
+ closed: make(chan struct{}),
+ }
+ if err := c.Insert(conn); err != nil {
+ t.Fatal(err)
+ }
+ // We should be able to find the conn in the cache.
+ if got, err := c.ReservedFind(remote.Protocol, remote.Address, remote.Blessings); err != nil || got != conn {
+ t.Errorf("got %v, want %v, err: %v", got, conn, err)
+ }
+ c.Unreserve(remote.Protocol, remote.Address, remote.Blessings)
+ // Changing the protocol should fail.
+ if got, err := c.ReservedFind("wrong", remote.Address, remote.Blessings); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ c.Unreserve("wrong", remote.Address, remote.Blessings)
+ // Changing the address should fail.
+ if got, err := c.ReservedFind(remote.Protocol, "wrong", remote.Blessings); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ c.Unreserve(remote.Protocol, "wrong", remote.Blessings)
+ // Changing the blessingNames should fail.
+ if got, err := c.ReservedFind(remote.Protocol, remote.Address, []string{"wrong"}); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ c.Unreserve(remote.Protocol, remote.Address, []string{"wrong"})
+
+ // We should be able to find the conn in the cache by looking up the RoutingID.
+ if got, err := c.FindWithRoutingID(remote.RID); err != nil || got != conn {
+ t.Errorf("got %v, want %v, err: %v", got, conn, err)
+ }
+ // Looking up the wrong RID should fail.
+ if got, err := c.FindWithRoutingID(naming.FixedRoutingID(0x1111)); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+
+ otherEP := &inaming.Endpoint{
+ Protocol: "other",
+ Address: "other",
+ Blessings: []string{"other"},
+ }
+ otherConn := &Conn{
+ remote: otherEP,
+ closed: make(chan struct{}),
+ }
+ // Looking up a not yet inserted endpoint should fail.
+ if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ // Looking it up again should block until a matching Unreserve call is made.
+ ch := make(chan *Conn, 1)
+ go func(ch chan *Conn) {
+ conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ch <- conn
+ }(ch)
+
+ // We insert the other conn into the cache.
+ if err := c.Insert(otherConn); err != nil {
+ t.Fatal(err)
+ }
+ c.Unreserve(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
+ // Now the c.ReservedFind should have unblocked and returned the correct Conn.
+ if cachedConn := <-ch; cachedConn != otherConn {
+ t.Errorf("got %v, want %v", cachedConn, otherConn)
+ }
+
+ // Closing the cache should close all the connections in the cache.
+ // Ensure that the conns are not closed yet.
+ if isClosed(conn) {
+ t.Fatalf("wanted conn to not be closed")
+ }
+ if isClosed(otherConn) {
+ t.Fatalf("wanted otherConn to not be closed")
+ }
+ c.Close()
+ // Now the connections should be closed.
+ if !isClosed(conn) {
+ t.Errorf("wanted conn to be closed")
+ }
+ if !isClosed(otherConn) {
+ t.Errorf("wanted otherConn to be closed")
+ }
+}
+
+func TestLRU(t *testing.T) {
+ // Ensure that the least recently inserted conns are killed by KillConnections.
+ c := NewConnCache()
+ conns := nConns(10)
+ for _, conn := range conns {
+ if err := c.Insert(conn); err != nil {
+ t.Fatal(err)
+ }
+ }
+ if err := c.KillConnections(3); err != nil {
+ t.Fatal(err)
+ }
+ if !cacheSizeMatches(c) {
+ t.Errorf("the size of the caches and LRU list do not match")
+ }
+ // conns[3:] should not be closed and still in the cache.
+ // conns[:3] should be closed and removed from the cache.
+ for _, conn := range conns[3:] {
+ if isClosed(conn) {
+ t.Errorf("conn %v should not have been closed", conn)
+ }
+ if !isInCache(t, c, conn) {
+ t.Errorf("conn %v should still be in cache", conn)
+ }
+ }
+ for _, conn := range conns[:3] {
+ if !isClosed(conn) {
+ t.Errorf("conn %v should have been closed", conn)
+ }
+ if isInCache(t, c, conn) {
+ t.Errorf("conn %v should not be in cache", conn)
+ }
+ }
+
+ // Ensure that ReservedFind marks conns as more recently used.
+ c = NewConnCache()
+ conns = nConns(10)
+ for _, conn := range conns {
+ if err := c.Insert(conn); err != nil {
+ t.Fatal(err)
+ }
+ }
+ for _, conn := range conns[:7] {
+ if got, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames()); err != nil || got != conn {
+ t.Errorf("got %v, want %v, err: %v", got, conn, err)
+ }
+ c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+ }
+ if err := c.KillConnections(3); err != nil {
+ t.Fatal(err)
+ }
+ if !cacheSizeMatches(c) {
+ t.Errorf("the size of the caches and LRU list do not match")
+ }
+ // conns[:7] should not be closed and still in the cache.
+ // conns[7:] should be closed and removed from the cache.
+ for _, conn := range conns[:7] {
+ if isClosed(conn) {
+ t.Errorf("conn %v should not have been closed", conn)
+ }
+ if !isInCache(t, c, conn) {
+ t.Errorf("conn %v should still be in cache", conn)
+ }
+ }
+ for _, conn := range conns[7:] {
+ if !isClosed(conn) {
+ t.Errorf("conn %v should have been closed", conn)
+ }
+ if isInCache(t, c, conn) {
+ t.Errorf("conn %v should not be in cache", conn)
+ }
+ }
+
+ // Ensure that FindWithRoutingID marks conns as more recently used.
+ c = NewConnCache()
+ conns = nConns(10)
+ for _, conn := range conns {
+ if err := c.Insert(conn); err != nil {
+ t.Fatal(err)
+ }
+ }
+ for _, conn := range conns[:7] {
+ if got, err := c.FindWithRoutingID(conn.remote.RoutingID()); err != nil || got != conn {
+ t.Errorf("got %v, want %v, err: %v", got, conn, err)
+ }
+ }
+ if err := c.KillConnections(3); err != nil {
+ t.Fatal(err)
+ }
+ if !cacheSizeMatches(c) {
+ t.Errorf("the size of the caches and LRU list do not match")
+ }
+ // conns[:7] should not be closed and still in the cache.
+ // conns[7:] should be closed and removed from the cache.
+ for _, conn := range conns[:7] {
+ if isClosed(conn) {
+ t.Errorf("conn %v should not have been closed", conn)
+ }
+ if !isInCache(t, c, conn) {
+ t.Errorf("conn %v should still be in cache", conn)
+ }
+ }
+ for _, conn := range conns[7:] {
+ if !isClosed(conn) {
+ t.Errorf("conn %v should have been closed", conn)
+ }
+ if isInCache(t, c, conn) {
+ t.Errorf("conn %v should not be in cache", conn)
+ }
+ }
+}
+
+func isInCache(t *testing.T, c *ConnCache, conn *Conn) bool {
+ rfconn, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+ if err != nil {
+ t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
+ }
+ c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+ ridconn, err := c.FindWithRoutingID(conn.remote.RoutingID())
+ if err != nil {
+ t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
+ }
+ return rfconn != nil || ridconn != nil
+}
+
+func cacheSizeMatches(c *ConnCache) bool {
+ ls := listSize(c)
+ return ls == len(c.addrCache) && ls == len(c.ridCache)
+}
+
+func listSize(c *ConnCache) int {
+ size := 0
+ d := c.head.next
+ for d != c.head {
+ size++
+ d = d.next
+ }
+ return size
+}
+
+func nConns(n int) []*Conn {
+ conns := make([]*Conn, n)
+ for i := 0; i < n; i++ {
+ conns[i] = &Conn{
+ remote: &inaming.Endpoint{
+ Protocol: strconv.Itoa(i),
+ RID: naming.FixedRoutingID(uint64(i)),
+ },
+ closed: make(chan struct{}),
+ }
+ }
+ return conns
+}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 253ec86..eb0d216 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -16,4 +16,5 @@
"en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+ CacheClosed() {"en":"cache is closed"}
)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index 15fb4cd..de2eda9 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -19,6 +19,7 @@
ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+ ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
)
func init() {
@@ -26,6 +27,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
}
// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
@@ -47,3 +49,8 @@
func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
return verror.New(ErrUnknownControlMsg, ctx, cmd)
}
+
+// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
+func NewErrCacheClosed(ctx *context.T) error {
+ return verror.New(ErrCacheClosed, ctx)
+}