ref/runtime/internal/flow: Conncache can kill conns based on LRU.

Now, LRU is based on bytes being read or written on any of the flows
on a conn.

Change-Id: Id25245e6c552f3b8744ead253e8f8e2db3f78b56
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 9d00cb7..2a0fec1 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -64,6 +64,7 @@
 	nextFid        uint64
 	flows          map[uint64]*flw
 	dischargeTimer *time.Timer
+	lastUsedTime   time.Time
 }
 
 // Ensure that *Conn implements flow.Conn.
@@ -77,15 +78,16 @@
 	versions version.RPCVersionRange,
 	handler FlowHandler) (*Conn, error) {
 	c := &Conn{
-		fc:         flowcontrol.New(defaultBufferSize, mtu),
-		mp:         newMessagePipe(conn),
-		handler:    handler,
-		lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
-		local:      local,
-		remote:     remote,
-		closed:     make(chan struct{}),
-		nextFid:    reservedFlows,
-		flows:      map[uint64]*flw{},
+		fc:           flowcontrol.New(defaultBufferSize, mtu),
+		mp:           newMessagePipe(conn),
+		handler:      handler,
+		lBlessings:   v23.GetPrincipal(ctx).BlessingStore().Default(),
+		local:        local,
+		remote:       remote,
+		closed:       make(chan struct{}),
+		nextFid:      reservedFlows,
+		flows:        map[uint64]*flw{},
+		lastUsedTime: time.Now(),
 	}
 	if err := c.dialHandshake(ctx, versions); err != nil {
 		c.Close(ctx, err)
@@ -104,14 +106,15 @@
 	versions version.RPCVersionRange,
 	handler FlowHandler) (*Conn, error) {
 	c := &Conn{
-		fc:         flowcontrol.New(defaultBufferSize, mtu),
-		mp:         newMessagePipe(conn),
-		handler:    handler,
-		lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
-		local:      local,
-		closed:     make(chan struct{}),
-		nextFid:    reservedFlows + 1,
-		flows:      map[uint64]*flw{},
+		fc:           flowcontrol.New(defaultBufferSize, mtu),
+		mp:           newMessagePipe(conn),
+		handler:      handler,
+		lBlessings:   v23.GetPrincipal(ctx).BlessingStore().Default(),
+		local:        local,
+		closed:       make(chan struct{}),
+		nextFid:      reservedFlows + 1,
+		flows:        map[uint64]*flw{},
+		lastUsedTime: time.Now(),
 	}
 	if err := c.acceptHandshake(ctx, versions); err != nil {
 		c.Close(ctx, err)
@@ -155,6 +158,13 @@
 // RemoteEndpoint returns the remote vanadium Endpoint
 func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
 
+// LastUsedTime returns the time at which the Conn had bytes read or written on it.
+func (c *Conn) LastUsedTime() time.Time {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	return c.lastUsedTime
+}
+
 // Closed returns a channel that will be closed after the Conn is shutdown.
 // After this channel is closed it is guaranteed that all Dial calls will fail
 // with an error and no more flows will be sent to the FlowHandler.
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index a149ad1..435a4b4 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
 
 import (
 	"strconv"
+	"time"
 
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -50,6 +51,9 @@
 // Read and ReadMsg should not be called concurrently with themselves
 // or each other.
 func (f *flw) Read(p []byte) (n int, err error) {
+	f.conn.mu.Lock()
+	f.conn.lastUsedTime = time.Now()
+	f.conn.mu.Unlock()
 	var release bool
 	if n, release, err = f.q.read(f.ctx, p); release {
 		f.conn.release(f.ctx)
@@ -65,6 +69,9 @@
 // Read and ReadMsg should not be called concurrently with themselves
 // or each other.
 func (f *flw) ReadMsg() (buf []byte, err error) {
+	f.conn.mu.Lock()
+	f.conn.lastUsedTime = time.Now()
+	f.conn.mu.Unlock()
 	var release bool
 	// TODO(mattr): Currently we only ever release counters when some flow
 	// reads.  We may need to do it more or less often.  Currently
@@ -86,6 +93,9 @@
 }
 
 func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+	f.conn.mu.Lock()
+	f.conn.lastUsedTime = time.Now()
+	f.conn.mu.Unlock()
 	sent := 0
 	var left []byte
 	err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
index 652e8b4..b9a9907 100644
--- a/runtime/internal/flow/flowtest/flowtest.go
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -8,7 +8,11 @@
 	"io"
 	"sync"
 
+	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+
 	"v.io/x/ref/internal/logger"
 )
 
@@ -69,6 +73,7 @@
 	f.wire.c.Broadcast()
 	return len(buf), nil
 }
+
 func (f *MRW) ReadMsg() (buf []byte, err error) {
 	defer f.wire.mu.Unlock()
 	f.wire.mu.Lock()
@@ -87,7 +92,17 @@
 	logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
 	return buf, nil
 }
+
 func (f *MRW) Close() error {
 	f.wire.Close()
 	return nil
 }
+
+func BlessingsForPeer(
+	ctx *context.T,
+	localEndpoint, remoteEndpoint naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge,
+) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cc5943a..dc97290 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -5,6 +5,7 @@
 package manager
 
 import (
+	"sort"
 	"strings"
 	"sync"
 
@@ -23,28 +24,23 @@
 	ridCache  map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
 	started   map[string]bool                 // keyed by (protocol, address, blessingNames)
 	cond      *sync.Cond
-	head      *connEntry // the head and tail pointer of the linked list for implementing LRU.
 }
 
 type connEntry struct {
-	conn       *conn.Conn
-	rid        naming.RoutingID
-	addrKey    string
-	next, prev *connEntry
+	conn    *conn.Conn
+	rid     naming.RoutingID
+	addrKey string
 }
 
 func NewConnCache() *ConnCache {
 	mu := &sync.Mutex{}
 	cond := sync.NewCond(mu)
-	head := &connEntry{}
-	head.next, head.prev = head, head
 	return &ConnCache{
 		mu:        mu,
 		addrCache: make(map[string]*connEntry),
 		ridCache:  make(map[naming.RoutingID]*connEntry),
 		started:   make(map[string]bool),
 		cond:      cond,
-		head:      head,
 	}
 }
 
@@ -76,10 +72,8 @@
 	if isClosed(entry.conn) {
 		delete(c.addrCache, entry.addrKey)
 		delete(c.ridCache, entry.rid)
-		entry.removeFromList()
 		return nil, nil
 	}
-	entry.moveAfter(c.head)
 	return entry.conn, nil
 }
 
@@ -109,7 +103,6 @@
 	}
 	c.addrCache[k] = entry
 	c.ridCache[entry.rid] = entry
-	entry.moveAfter(c.head)
 	return nil
 }
 
@@ -117,16 +110,14 @@
 func (c *ConnCache) Close(ctx *context.T) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
-	c.addrCache, c.ridCache, c.started = nil, nil, nil
-	d := c.head.next
-	for d != c.head {
+	c.addrCache, c.started = nil, nil
+	for _, d := range c.ridCache {
 		d.conn.Close(ctx, NewErrCacheClosed(ctx))
-		d = d.next
 	}
-	c.head = nil
 }
 
 // KillConnections will close and remove num LRU Conns in the cache.
+// If connections are already closed they will be removed from the cache.
 // This is useful when the manager is approaching system FD limits.
 // If num is greater than the number of connections in the cache, all cached
 // connections will be closed and removed.
@@ -137,22 +128,42 @@
 	if c.addrCache == nil {
 		return NewErrCacheClosed(ctx)
 	}
-	d := c.head.prev
 	err := NewErrConnKilledToFreeResources(ctx)
-	for i := 0; i < num; i++ {
-		if d == c.head {
-			break
+	pq := make(connEntries, 0, len(c.ridCache))
+	for _, e := range c.ridCache {
+		if isClosed(e.conn) {
+			delete(c.addrCache, e.addrKey)
+			delete(c.ridCache, e.rid)
+			continue
 		}
+		pq = append(pq, e)
+	}
+	sort.Sort(pq)
+	for i := 0; i < num; i++ {
+		d := pq[i]
 		d.conn.Close(ctx, err)
 		delete(c.addrCache, d.addrKey)
 		delete(c.ridCache, d.rid)
-		prev := d.prev
-		d.removeFromList()
-		d = prev
 	}
 	return nil
 }
 
+// TODO(suharshs): If sorting the connections becomes too slow, switch to
+// container/heap instead of sorting all the connections.
+type connEntries []*connEntry
+
+func (c connEntries) Len() int {
+	return len(c)
+}
+
+func (c connEntries) Less(i, j int) bool {
+	return c[i].conn.LastUsedTime().Before(c[j].conn.LastUsedTime())
+}
+
+func (c connEntries) Swap(i, j int) {
+	c[i], c[j] = c[j], c[i]
+}
+
 // FindWithRoutingID returns a Conn where the remote end of the connection
 // is identified by the provided rid. nil is returned if there is no such Conn.
 // FindWithRoutingID will return an error iff the cache is closed.
@@ -169,10 +180,8 @@
 	if isClosed(entry.conn) {
 		delete(c.addrCache, entry.addrKey)
 		delete(c.ridCache, entry.rid)
-		entry.removeFromList()
 		return nil, nil
 	}
-	entry.moveAfter(c.head)
 	return entry.conn, nil
 }
 
@@ -189,24 +198,6 @@
 	return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
 }
 
-func (c *connEntry) removeFromList() {
-	if c.prev != nil {
-		c.prev.next = c.next
-	}
-	if c.next != nil {
-		c.next.prev = c.prev
-	}
-	c.next, c.prev = nil, nil
-}
-
-func (c *connEntry) moveAfter(prev *connEntry) {
-	c.removeFromList()
-	c.prev = prev
-	c.next = prev.next
-	prev.next.prev = c
-	prev.next = c
-}
-
 func isClosed(conn *conn.Conn) bool {
 	select {
 	case <-conn.Closed():
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 80892a7..248d296 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -10,8 +10,10 @@
 
 	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
+
 	connpackage "v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	inaming "v.io/x/ref/runtime/internal/naming"
@@ -28,7 +30,7 @@
 		RID:       naming.FixedRoutingID(0x5555),
 		Blessings: []string{"A", "B", "C"},
 	}
-	conn := makeConn(t, ctx, remote)
+	conn := makeConnAndFlow(t, ctx, remote).c
 	if err := c.Insert(conn); err != nil {
 		t.Fatal(err)
 	}
@@ -67,7 +69,7 @@
 		Address:   "other",
 		Blessings: []string{"other"},
 	}
-	otherConn := makeConn(t, ctx, otherEP)
+	otherConn := makeConnAndFlow(t, ctx, otherEP).c
 
 	// Looking up a not yet inserted endpoint should fail.
 	if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -111,11 +113,11 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	// Ensure that the least recently inserted conns are killed by KillConnections.
+	// Ensure that the least recently created conns are killed by KillConnections.
 	c := NewConnCache()
-	conns := nConns(t, ctx, 10)
+	conns := nConnAndFlows(t, ctx, 10)
 	for _, conn := range conns {
-		if err := c.Insert(conn); err != nil {
+		if err := c.Insert(conn.c); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -128,34 +130,30 @@
 	// conns[3:] should not be closed and still in the cache.
 	// conns[:3] should be closed and removed from the cache.
 	for _, conn := range conns[3:] {
-		if isClosed(conn) {
+		if isClosed(conn.c) {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn) {
+		if !isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should still be in cache", conn)
 		}
 	}
 	for _, conn := range conns[:3] {
-		<-conn.Closed()
-		if isInCache(t, c, conn) {
+		<-conn.c.Closed()
+		if isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
 
-	// Ensure that ReservedFind marks conns as more recently used.
+	// Ensure that writing to conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConns(t, ctx, 10)
+	conns = nConnAndFlows(t, ctx, 10)
 	for _, conn := range conns {
-		if err := c.Insert(conn); err != nil {
+		if err := c.Insert(conn.c); err != nil {
 			t.Fatal(err)
 		}
 	}
 	for _, conn := range conns[:7] {
-		rep := conn.RemoteEndpoint()
-		if got, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames()); err != nil || got != conn {
-			t.Errorf("got %v, want %v, err: %v", got, conn, err)
-		}
-		c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
+		conn.write()
 	}
 	if err := c.KillConnections(ctx, 3); err != nil {
 		t.Fatal(err)
@@ -166,33 +164,30 @@
 	// conns[:7] should not be closed and still in the cache.
 	// conns[7:] should be closed and removed from the cache.
 	for _, conn := range conns[:7] {
-		if isClosed(conn) {
+		if isClosed(conn.c) {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn) {
+		if !isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should still be in cache", conn)
 		}
 	}
 	for _, conn := range conns[7:] {
-		<-conn.Closed()
-		if isInCache(t, c, conn) {
+		<-conn.c.Closed()
+		if isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
 
-	// Ensure that FindWithRoutingID marks conns as more recently used.
+	// Ensure that reading from conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConns(t, ctx, 10)
+	conns = nConnAndFlows(t, ctx, 10)
 	for _, conn := range conns {
-		if err := c.Insert(conn); err != nil {
+		if err := c.Insert(conn.c); err != nil {
 			t.Fatal(err)
 		}
 	}
 	for _, conn := range conns[:7] {
-		rep := conn.RemoteEndpoint()
-		if got, err := c.FindWithRoutingID(rep.RoutingID()); err != nil || got != conn {
-			t.Errorf("got %v, want %v, err: %v", got, conn, err)
-		}
+		conn.read()
 	}
 	if err := c.KillConnections(ctx, 3); err != nil {
 		t.Fatal(err)
@@ -203,16 +198,16 @@
 	// conns[:7] should not be closed and still in the cache.
 	// conns[7:] should be closed and removed from the cache.
 	for _, conn := range conns[:7] {
-		if isClosed(conn) {
+		if isClosed(conn.c) {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn) {
+		if !isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should still be in cache", conn)
 		}
 	}
 	for _, conn := range conns[7:] {
-		<-conn.Closed()
-		if isInCache(t, c, conn) {
+		<-conn.c.Closed()
+		if isInCache(t, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
@@ -233,32 +228,40 @@
 }
 
 func cacheSizeMatches(c *ConnCache) bool {
-	ls := listSize(c)
-	return ls == len(c.addrCache) && ls == len(c.ridCache)
+	return len(c.addrCache) == len(c.ridCache)
 }
 
-func listSize(c *ConnCache) int {
-	size := 0
-	d := c.head.next
-	for d != c.head {
-		size++
-		d = d.next
+type connAndFlow struct {
+	c *connpackage.Conn
+	f flow.Flow
+}
+
+func (c connAndFlow) write() {
+	_, err := c.f.WriteMsg([]byte{0})
+	if err != nil {
+		panic(err)
 	}
-	return size
 }
 
-func nConns(t *testing.T, ctx *context.T, n int) []*connpackage.Conn {
-	conns := make([]*connpackage.Conn, n)
+func (c connAndFlow) read() {
+	_, err := c.f.ReadMsg()
+	if err != nil {
+		panic(err)
+	}
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+	cfs := make([]connAndFlow, n)
 	for i := 0; i < n; i++ {
-		conns[i] = makeConn(t, ctx, &inaming.Endpoint{
+		cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
 			Protocol: strconv.Itoa(i),
 			RID:      naming.FixedRoutingID(uint64(i)),
 		})
 	}
-	return conns
+	return cfs
 }
 
-func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *connpackage.Conn {
+func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
 	dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
 	dch := make(chan *connpackage.Conn)
 	ach := make(chan *connpackage.Conn)
@@ -272,12 +275,34 @@
 	}()
 	go func() {
 		a, err := connpackage.NewAccepted(ctx, amrw, ep,
-			version.RPCVersionRange{Min: 1, Max: 5}, nil)
+			version.RPCVersionRange{Min: 1, Max: 5}, fh{t})
 		if err != nil {
 			t.Fatalf("Unexpected error: %v", err)
 		}
 		ach <- a
 	}()
-	<-dch
-	return <-ach
+	conn := <-dch
+	<-ach
+	f, err := conn.Dial(ctx, flowtest.BlessingsForPeer)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Write a byte to send the openFlow message.
+	if _, err := f.Write([]byte{0}); err != nil {
+		t.Fatal(err)
+	}
+	return connAndFlow{conn, f}
+}
+
+type fh struct {
+	t *testing.T
+}
+
+func (h fh) HandleFlow(f flow.Flow) error {
+	go func() {
+		if _, err := f.WriteMsg([]byte{0}); err != nil {
+			h.t.Errorf("failed to write: %v", err)
+		}
+	}()
+	return nil
 }
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 0402f15..0c45db3 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -13,9 +13,9 @@
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
-	"v.io/v23/security"
 
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
 )
 
@@ -27,7 +27,6 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	p := v23.GetPrincipal(ctx)
 	rid := naming.FixedRoutingID(0x5555)
 	m := New(ctx, rid)
 	want := "read this please"
@@ -36,19 +35,11 @@
 		t.Fatal(err)
 	}
 
-	bFn := func(
-		ctx *context.T,
-		localEndpoint, remoteEndpoint naming.Endpoint,
-		remoteBlessings security.Blessings,
-		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, map[string]security.Discharge, error) {
-		return p.BlessingStore().Default(), nil, nil
-	}
 	eps := m.ListeningEndpoints()
 	if len(eps) == 0 {
 		t.Fatalf("no endpoints listened on")
 	}
-	flow, err := m.Dial(ctx, eps[0], bFn)
+	flow, err := m.Dial(ctx, eps[0], flowtest.BlessingsForPeer)
 	if err != nil {
 		t.Error(err)
 	}
@@ -71,20 +62,11 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	p := v23.GetPrincipal(ctx)
 	am := New(ctx, naming.FixedRoutingID(0x5555))
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
-	bFn := func(
-		ctx *context.T,
-		localEndpoint, remoteEndpoint naming.Endpoint,
-		remoteBlessings security.Blessings,
-		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, map[string]security.Discharge, error) {
-		return p.BlessingStore().Default(), nil, nil
-	}
 	eps := am.ListeningEndpoints()
 	if len(eps) == 0 {
 		t.Fatalf("no endpoints listened on")
@@ -95,13 +77,13 @@
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing a connection the cache should hold one connection.
-	dialAndAccept(t, ctx, dm, am, eps[0], bFn)
+	dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
 	if got, want := dm.(*manager).cache.Size(), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing another connection the cache should still hold one connection
 	// because the connections should be reused.
-	dialAndAccept(t, ctx, dm, am, eps[0], bFn)
+	dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
 	if got, want := dm.(*manager).cache.Size(), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}