ref/runtime/internal/flow: Conncache can kill conns based on LRU.
Now, LRU is based on bytes being read or written on any of the flows
on a conn.
Change-Id: Id25245e6c552f3b8744ead253e8f8e2db3f78b56
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 9d00cb7..2a0fec1 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -64,6 +64,7 @@
nextFid uint64
flows map[uint64]*flw
dischargeTimer *time.Timer
+ lastUsedTime time.Time
}
// Ensure that *Conn implements flow.Conn.
@@ -77,15 +78,16 @@
versions version.RPCVersionRange,
handler FlowHandler) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
- mp: newMessagePipe(conn),
- handler: handler,
- lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
- local: local,
- remote: remote,
- closed: make(chan struct{}),
- nextFid: reservedFlows,
- flows: map[uint64]*flw{},
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
+ local: local,
+ remote: remote,
+ closed: make(chan struct{}),
+ nextFid: reservedFlows,
+ flows: map[uint64]*flw{},
+ lastUsedTime: time.Now(),
}
if err := c.dialHandshake(ctx, versions); err != nil {
c.Close(ctx, err)
@@ -104,14 +106,15 @@
versions version.RPCVersionRange,
handler FlowHandler) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
- mp: newMessagePipe(conn),
- handler: handler,
- lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
- local: local,
- closed: make(chan struct{}),
- nextFid: reservedFlows + 1,
- flows: map[uint64]*flw{},
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
+ local: local,
+ closed: make(chan struct{}),
+ nextFid: reservedFlows + 1,
+ flows: map[uint64]*flw{},
+ lastUsedTime: time.Now(),
}
if err := c.acceptHandshake(ctx, versions); err != nil {
c.Close(ctx, err)
@@ -155,6 +158,13 @@
// RemoteEndpoint returns the remote vanadium Endpoint
func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
+// LastUsedTime returns the time at which the Conn had bytes read or written on it.
+func (c *Conn) LastUsedTime() time.Time {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ return c.lastUsedTime
+}
+
// Closed returns a channel that will be closed after the Conn is shutdown.
// After this channel is closed it is guaranteed that all Dial calls will fail
// with an error and no more flows will be sent to the FlowHandler.
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index a149ad1..435a4b4 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
import (
"strconv"
+ "time"
"v.io/v23/context"
"v.io/v23/flow"
@@ -50,6 +51,9 @@
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
+ f.conn.mu.Lock()
+ f.conn.lastUsedTime = time.Now()
+ f.conn.mu.Unlock()
var release bool
if n, release, err = f.q.read(f.ctx, p); release {
f.conn.release(f.ctx)
@@ -65,6 +69,9 @@
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
+ f.conn.mu.Lock()
+ f.conn.lastUsedTime = time.Now()
+ f.conn.mu.Unlock()
var release bool
// TODO(mattr): Currently we only ever release counters when some flow
// reads. We may need to do it more or less often. Currently
@@ -86,6 +93,9 @@
}
func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+ f.conn.mu.Lock()
+ f.conn.lastUsedTime = time.Now()
+ f.conn.mu.Unlock()
sent := 0
var left []byte
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
index 652e8b4..b9a9907 100644
--- a/runtime/internal/flow/flowtest/flowtest.go
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -8,7 +8,11 @@
"io"
"sync"
+ "v.io/v23"
"v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+
"v.io/x/ref/internal/logger"
)
@@ -69,6 +73,7 @@
f.wire.c.Broadcast()
return len(buf), nil
}
+
func (f *MRW) ReadMsg() (buf []byte, err error) {
defer f.wire.mu.Unlock()
f.wire.mu.Lock()
@@ -87,7 +92,17 @@
logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
return buf, nil
}
+
func (f *MRW) Close() error {
f.wire.Close()
return nil
}
+
+func BlessingsForPeer(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cc5943a..dc97290 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -5,6 +5,7 @@
package manager
import (
+ "sort"
"strings"
"sync"
@@ -23,28 +24,23 @@
ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
started map[string]bool // keyed by (protocol, address, blessingNames)
cond *sync.Cond
- head *connEntry // the head and tail pointer of the linked list for implementing LRU.
}
type connEntry struct {
- conn *conn.Conn
- rid naming.RoutingID
- addrKey string
- next, prev *connEntry
+ conn *conn.Conn
+ rid naming.RoutingID
+ addrKey string
}
func NewConnCache() *ConnCache {
mu := &sync.Mutex{}
cond := sync.NewCond(mu)
- head := &connEntry{}
- head.next, head.prev = head, head
return &ConnCache{
mu: mu,
addrCache: make(map[string]*connEntry),
ridCache: make(map[naming.RoutingID]*connEntry),
started: make(map[string]bool),
cond: cond,
- head: head,
}
}
@@ -76,10 +72,8 @@
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
- entry.removeFromList()
return nil, nil
}
- entry.moveAfter(c.head)
return entry.conn, nil
}
@@ -109,7 +103,6 @@
}
c.addrCache[k] = entry
c.ridCache[entry.rid] = entry
- entry.moveAfter(c.head)
return nil
}
@@ -117,16 +110,14 @@
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
c.mu.Lock()
- c.addrCache, c.ridCache, c.started = nil, nil, nil
- d := c.head.next
- for d != c.head {
+ c.addrCache, c.started = nil, nil
+ for _, d := range c.ridCache {
d.conn.Close(ctx, NewErrCacheClosed(ctx))
- d = d.next
}
- c.head = nil
}
// KillConnections will close and remove num LRU Conns in the cache.
+// If connections are already closed they will be removed from the cache.
// This is useful when the manager is approaching system FD limits.
// If num is greater than the number of connections in the cache, all cached
// connections will be closed and removed.
@@ -137,22 +128,42 @@
if c.addrCache == nil {
return NewErrCacheClosed(ctx)
}
- d := c.head.prev
err := NewErrConnKilledToFreeResources(ctx)
- for i := 0; i < num; i++ {
- if d == c.head {
- break
+ pq := make(connEntries, 0, len(c.ridCache))
+ for _, e := range c.ridCache {
+ if isClosed(e.conn) {
+ delete(c.addrCache, e.addrKey)
+ delete(c.ridCache, e.rid)
+ continue
}
+ pq = append(pq, e)
+ }
+ sort.Sort(pq)
+ for i := 0; i < num; i++ {
+ d := pq[i]
d.conn.Close(ctx, err)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
- prev := d.prev
- d.removeFromList()
- d = prev
}
return nil
}
+// TODO(suharshs): If sorting the connections becomes too slow, switch to
+// container/heap instead of sorting all the connections.
+type connEntries []*connEntry
+
+func (c connEntries) Len() int {
+ return len(c)
+}
+
+func (c connEntries) Less(i, j int) bool {
+ return c[i].conn.LastUsedTime().Before(c[j].conn.LastUsedTime())
+}
+
+func (c connEntries) Swap(i, j int) {
+ c[i], c[j] = c[j], c[i]
+}
+
// FindWithRoutingID returns a Conn where the remote end of the connection
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
@@ -169,10 +180,8 @@
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
- entry.removeFromList()
return nil, nil
}
- entry.moveAfter(c.head)
return entry.conn, nil
}
@@ -189,24 +198,6 @@
return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
}
-func (c *connEntry) removeFromList() {
- if c.prev != nil {
- c.prev.next = c.next
- }
- if c.next != nil {
- c.next.prev = c.prev
- }
- c.next, c.prev = nil, nil
-}
-
-func (c *connEntry) moveAfter(prev *connEntry) {
- c.removeFromList()
- c.prev = prev
- c.next = prev.next
- prev.next.prev = c
- prev.next = c
-}
-
func isClosed(conn *conn.Conn) bool {
select {
case <-conn.Closed():
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 80892a7..248d296 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -10,8 +10,10 @@
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
+
connpackage "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
inaming "v.io/x/ref/runtime/internal/naming"
@@ -28,7 +30,7 @@
RID: naming.FixedRoutingID(0x5555),
Blessings: []string{"A", "B", "C"},
}
- conn := makeConn(t, ctx, remote)
+ conn := makeConnAndFlow(t, ctx, remote).c
if err := c.Insert(conn); err != nil {
t.Fatal(err)
}
@@ -67,7 +69,7 @@
Address: "other",
Blessings: []string{"other"},
}
- otherConn := makeConn(t, ctx, otherEP)
+ otherConn := makeConnAndFlow(t, ctx, otherEP).c
// Looking up a not yet inserted endpoint should fail.
if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -111,11 +113,11 @@
ctx, shutdown := v23.Init()
defer shutdown()
- // Ensure that the least recently inserted conns are killed by KillConnections.
+ // Ensure that the least recently created conns are killed by KillConnections.
c := NewConnCache()
- conns := nConns(t, ctx, 10)
+ conns := nConnAndFlows(t, ctx, 10)
for _, conn := range conns {
- if err := c.Insert(conn); err != nil {
+ if err := c.Insert(conn.c); err != nil {
t.Fatal(err)
}
}
@@ -128,34 +130,30 @@
// conns[3:] should not be closed and still in the cache.
// conns[:3] should be closed and removed from the cache.
for _, conn := range conns[3:] {
- if isClosed(conn) {
+ if isClosed(conn.c) {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn) {
+ if !isInCache(t, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[:3] {
- <-conn.Closed()
- if isInCache(t, c, conn) {
+ <-conn.c.Closed()
+ if isInCache(t, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
- // Ensure that ReservedFind marks conns as more recently used.
+ // Ensure that writing to conns marks conns as more recently used.
c = NewConnCache()
- conns = nConns(t, ctx, 10)
+ conns = nConnAndFlows(t, ctx, 10)
for _, conn := range conns {
- if err := c.Insert(conn); err != nil {
+ if err := c.Insert(conn.c); err != nil {
t.Fatal(err)
}
}
for _, conn := range conns[:7] {
- rep := conn.RemoteEndpoint()
- if got, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames()); err != nil || got != conn {
- t.Errorf("got %v, want %v, err: %v", got, conn, err)
- }
- c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
+ conn.write()
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
@@ -166,33 +164,30 @@
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
- if isClosed(conn) {
+ if isClosed(conn.c) {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn) {
+ if !isInCache(t, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[7:] {
- <-conn.Closed()
- if isInCache(t, c, conn) {
+ <-conn.c.Closed()
+ if isInCache(t, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
- // Ensure that FindWithRoutingID marks conns as more recently used.
+ // Ensure that reading from conns marks conns as more recently used.
c = NewConnCache()
- conns = nConns(t, ctx, 10)
+ conns = nConnAndFlows(t, ctx, 10)
for _, conn := range conns {
- if err := c.Insert(conn); err != nil {
+ if err := c.Insert(conn.c); err != nil {
t.Fatal(err)
}
}
for _, conn := range conns[:7] {
- rep := conn.RemoteEndpoint()
- if got, err := c.FindWithRoutingID(rep.RoutingID()); err != nil || got != conn {
- t.Errorf("got %v, want %v, err: %v", got, conn, err)
- }
+ conn.read()
}
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
@@ -203,16 +198,16 @@
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
- if isClosed(conn) {
+ if isClosed(conn.c) {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn) {
+ if !isInCache(t, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[7:] {
- <-conn.Closed()
- if isInCache(t, c, conn) {
+ <-conn.c.Closed()
+ if isInCache(t, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -233,32 +228,40 @@
}
func cacheSizeMatches(c *ConnCache) bool {
- ls := listSize(c)
- return ls == len(c.addrCache) && ls == len(c.ridCache)
+ return len(c.addrCache) == len(c.ridCache)
}
-func listSize(c *ConnCache) int {
- size := 0
- d := c.head.next
- for d != c.head {
- size++
- d = d.next
+type connAndFlow struct {
+ c *connpackage.Conn
+ f flow.Flow
+}
+
+func (c connAndFlow) write() {
+ _, err := c.f.WriteMsg([]byte{0})
+ if err != nil {
+ panic(err)
}
- return size
}
-func nConns(t *testing.T, ctx *context.T, n int) []*connpackage.Conn {
- conns := make([]*connpackage.Conn, n)
+func (c connAndFlow) read() {
+ _, err := c.f.ReadMsg()
+ if err != nil {
+ panic(err)
+ }
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+ cfs := make([]connAndFlow, n)
for i := 0; i < n; i++ {
- conns[i] = makeConn(t, ctx, &inaming.Endpoint{
+ cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
Protocol: strconv.Itoa(i),
RID: naming.FixedRoutingID(uint64(i)),
})
}
- return conns
+ return cfs
}
-func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *connpackage.Conn {
+func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
dch := make(chan *connpackage.Conn)
ach := make(chan *connpackage.Conn)
@@ -272,12 +275,34 @@
}()
go func() {
a, err := connpackage.NewAccepted(ctx, amrw, ep,
- version.RPCVersionRange{Min: 1, Max: 5}, nil)
+ version.RPCVersionRange{Min: 1, Max: 5}, fh{t})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ach <- a
}()
- <-dch
- return <-ach
+ conn := <-dch
+ <-ach
+ f, err := conn.Dial(ctx, flowtest.BlessingsForPeer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write a byte to send the openFlow message.
+ if _, err := f.Write([]byte{0}); err != nil {
+ t.Fatal(err)
+ }
+ return connAndFlow{conn, f}
+}
+
+type fh struct {
+ t *testing.T
+}
+
+func (h fh) HandleFlow(f flow.Flow) error {
+ go func() {
+ if _, err := f.WriteMsg([]byte{0}); err != nil {
+ h.t.Errorf("failed to write: %v", err)
+ }
+ }()
+ return nil
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 0402f15..0c45db3 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -13,9 +13,9 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
- "v.io/v23/security"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
)
@@ -27,7 +27,6 @@
ctx, shutdown := v23.Init()
defer shutdown()
- p := v23.GetPrincipal(ctx)
rid := naming.FixedRoutingID(0x5555)
m := New(ctx, rid)
want := "read this please"
@@ -36,19 +35,11 @@
t.Fatal(err)
}
- bFn := func(
- ctx *context.T,
- localEndpoint, remoteEndpoint naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, map[string]security.Discharge, error) {
- return p.BlessingStore().Default(), nil, nil
- }
eps := m.ListeningEndpoints()
if len(eps) == 0 {
t.Fatalf("no endpoints listened on")
}
- flow, err := m.Dial(ctx, eps[0], bFn)
+ flow, err := m.Dial(ctx, eps[0], flowtest.BlessingsForPeer)
if err != nil {
t.Error(err)
}
@@ -71,20 +62,11 @@
ctx, shutdown := v23.Init()
defer shutdown()
- p := v23.GetPrincipal(ctx)
am := New(ctx, naming.FixedRoutingID(0x5555))
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- bFn := func(
- ctx *context.T,
- localEndpoint, remoteEndpoint naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, map[string]security.Discharge, error) {
- return p.BlessingStore().Default(), nil, nil
- }
eps := am.ListeningEndpoints()
if len(eps) == 0 {
t.Fatalf("no endpoints listened on")
@@ -95,13 +77,13 @@
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
- dialAndAccept(t, ctx, dm, am, eps[0], bFn)
+ dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
if got, want := dm.(*manager).cache.Size(), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
- dialAndAccept(t, ctx, dm, am, eps[0], bFn)
+ dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
if got, want := dm.(*manager).cache.Size(), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}