Merge "runtime/internal/flow/manager: Add InsertWithRoutingID to connCache."
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index dc97290..cdb81a7 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -106,6 +106,21 @@
return nil
}
+// InsertWithRoutingID add conn to the cache keyed only by conn's RoutingID.
+func (c *ConnCache) InsertWithRoutingID(conn *conn.Conn) error {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.addrCache == nil {
+ return NewErrCacheClosed(nil)
+ }
+ entry := &connEntry{
+ conn: conn,
+ rid: conn.RemoteEndpoint().RoutingID(),
+ }
+ c.ridCache[entry.rid] = entry
+ return nil
+}
+
// Close marks the ConnCache as closed and closes all Conns in the cache.
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
@@ -185,13 +200,6 @@
return entry.conn, nil
}
-// Size returns the number of Conns stored in the ConnCache.
-func (c *ConnCache) Size() int {
- defer c.mu.Unlock()
- c.mu.Lock()
- return len(c.addrCache)
-}
-
func key(protocol, address string, blessingNames []string) string {
// TODO(suharshs): We may be able to do something more inclusive with our
// blessingNames.
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 20c1537..ace0f89 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -64,6 +64,25 @@
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
+ // Caching with InsertWithRoutingID should only cache by RoutingID, not with network/address.
+ ridEP := &inaming.Endpoint{
+ Protocol: "ridonly",
+ Address: "ridonly",
+ RID: naming.FixedRoutingID(0x1111),
+ Blessings: []string{"ridonly"},
+ }
+ ridConn := makeConnAndFlow(t, ctx, ridEP).c
+ if err := c.InsertWithRoutingID(ridConn); err != nil {
+ t.Fatal(err)
+ }
+ if got, err := c.ReservedFind(ridEP.Protocol, ridEP.Address, ridEP.Blessings); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ c.Unreserve(ridEP.Protocol, ridEP.Address, ridEP.Blessings)
+ if got, err := c.FindWithRoutingID(ridEP.RID); err != nil || got != ridConn {
+ t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
+ }
+
otherEP := &inaming.Endpoint{
Protocol: "other",
Address: "other",
@@ -217,12 +236,12 @@
rep := conn.RemoteEndpoint()
rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
if err != nil {
- t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
+ t.Error(err)
}
c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
ridconn, err := c.FindWithRoutingID(rep.RoutingID())
if err != nil {
- t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
+ t.Error(err)
}
return rfconn != nil || ridconn != nil
}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index da7707a..d067aa1 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -150,7 +150,7 @@
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
- if err := m.cache.Insert(c); err != nil {
+ if err := m.cache.InsertWithRoutingID(c); err != nil {
ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
}
}
@@ -194,7 +194,7 @@
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
- if err := h.m.cache.Insert(c); err != nil {
+ if err := h.m.cache.InsertWithRoutingID(c); err != nil {
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
@@ -335,6 +335,9 @@
}
return nil, flow.NewErrDialFailed(ctx, err)
}
+ if err := m.cache.InsertWithRoutingID(c); err != nil {
+ return nil, flow.NewErrBadState(ctx, err)
+ }
f, err = c.Dial(ctx, fn)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 0c45db3..7af0053 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -73,18 +73,18 @@
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
// At first the cache should be empty.
- if got, want := dm.(*manager).cache.Size(), 0; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
- if got, want := dm.(*manager).cache.Size(), 1; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
- if got, want := dm.(*manager).cache.Size(), 1; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
}