runtime/internal/flow/conn: Fix flow control accounting when flows
close..

Previously we were sometimes loosing shared tokens when a flow closed
with borrowed tokens outstanding.  This happened in two ways:

1. When the remote end closed a flow it didn't release the counts
remaining in it's readq.
2. If the local end closes the flow with outstanding borrowed counts
then an incoming release message will not find the flow in the map and
so we'll just skip it's release and the borrowed counts will never be
returned.

This fix addresses those issues by not removing a flow from the flow map
until all its borrowed counters have been released, and by assuming that
a close flow message means the other end has erased its buffer.

Change-Id: Ic6b433890c973a6d1e6637d535aca0f52582f392
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16ac858..030ce8e 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -85,7 +85,10 @@
 	// Resolve of the discharge server name.  The two resolve calls may be to
 	// the same mounttable.
 	c.loopWG.Add(1)
-	go c.refreshDischarges(ctx)
+	go func() {
+		c.refreshDischarges(ctx)
+		c.loopWG.Done()
+	}()
 	return nil
 }
 
@@ -105,7 +108,6 @@
 	lAuth := &message.Auth{
 		ChannelBinding: signedBinding,
 	}
-	c.loopWG.Add(1)
 	if lAuth.BlessingsKey, lAuth.DischargeKey, err = c.refreshDischarges(ctx); err != nil {
 		return err
 	}
@@ -211,7 +213,6 @@
 }
 
 func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
-	defer c.loopWG.Done()
 	dis := slib.PrepareDischarges(ctx, c.lBlessings,
 		security.DischargeImpetus{}, time.Minute)
 	// Schedule the next update.
@@ -221,6 +222,7 @@
 		c.loopWG.Add(1)
 		c.dischargeTimer = time.AfterFunc(dur, func() {
 			c.refreshDischarges(ctx)
+			c.loopWG.Done()
 		})
 	}
 	c.mu.Unlock()
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 80d0034..4806bef 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -340,7 +340,7 @@
 	}
 	c.status = Closing
 
-	go func() {
+	go func(c *Conn) {
 		if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
 			msg := ""
 			if err != nil {
@@ -373,7 +373,7 @@
 		c.status = Closed
 		close(c.closed)
 		c.mu.Unlock()
-	}()
+	}(c)
 }
 
 func (c *Conn) release(ctx *context.T, fid, count uint64) {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index e4127a3..1cad267 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
 
 import (
 	"io"
+	"sync"
 	"time"
 
 	"v.io/v23/context"
@@ -47,6 +48,9 @@
 	// borrowed indicates the number of tokens we have borrowed from the shared pool for
 	// sending on newly dialed flows.
 	borrowed uint64
+	// borrowCond is a condition variable that we can use to wait for shared
+	// counters to be released.
+	borrowCond *sync.Cond
 	// borrowing indicates whether this flow is using borrowed counters for a newly
 	// dialed flow.  This will be set to false after we first receive a
 	// release from the remote end.  This is always false for accepted flows.
@@ -60,14 +64,15 @@
 
 func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, remote naming.Endpoint, dialed, preopen bool) *flw {
 	f := &flw{
-		id:        id,
-		dialed:    dialed,
-		conn:      c,
-		q:         newReadQ(c, id),
-		bkey:      bkey,
-		dkey:      dkey,
-		opened:    preopen,
-		borrowing: dialed,
+		id:         id,
+		dialed:     dialed,
+		conn:       c,
+		q:          newReadQ(c, id),
+		bkey:       bkey,
+		dkey:       dkey,
+		opened:     preopen,
+		borrowing:  dialed,
+		borrowCond: sync.NewCond(&c.mu),
 		// It's important that this channel has a non-zero buffer.  Sometimes this
 		// flow will be notifying itself, so if there's no buffer a deadlock will
 		// occur.
@@ -144,12 +149,16 @@
 		return int(max), func(used int) {
 			f.conn.lshared -= uint64(used)
 			f.borrowed += uint64(used)
+			f.ctx.VI(2).Infof("deducting %d borrowed tokens on flow %d(%p), total: %d", used, f.id, f, f.borrowed)
 		}
 	}
 	if f.released < max {
 		max = f.released
 	}
-	return int(max), func(used int) { f.released -= uint64(used) }
+	return int(max), func(used int) {
+		f.released -= uint64(used)
+		f.ctx.VI(2).Infof("flow %d(%p) deducting %d tokens, %d left", f.id, f, used, f.released)
+	}
 }
 
 // releaseLocked releases some counters from a remote reader to the local
@@ -161,12 +170,16 @@
 		if f.borrowed < tokens {
 			n = f.borrowed
 		}
+		f.ctx.VI(2).Infof("Returning %d tokens borrowed by %d(%p)", f.borrowed, f.id, f)
 		tokens -= n
 		f.borrowed -= n
 		f.conn.lshared += n
+		f.borrowCond.Broadcast()
 	}
 	f.released += tokens
+	f.ctx.VI(2).Infof("Tokens release to %d(%p): %d => %d", f.id, f, tokens, f.released)
 	if f.writing {
+		f.ctx.VI(2).Infof("Activating writing flow %d(%p) now that we have tokens.", f.id, f)
 		f.conn.activateWriterLocked(f)
 		f.conn.notifyNextWriterLocked(nil)
 	}
@@ -176,6 +189,7 @@
 	if err = f.checkBlessings(); err != nil {
 		return 0, err
 	}
+	f.ctx.VI(2).Infof("starting write on flow %d(%p)", f.id, f)
 	select {
 	// Catch cancellations early.  If we caught a cancel when waiting
 	// our turn below its possible that we were notified simultaneously.
@@ -212,6 +226,7 @@
 		if tokens == 0 {
 			// Oops, we really don't have data to send, probably because we've exhausted
 			// the remote buffer.  deactivate ourselves but keep trying.
+			f.ctx.VI(2).Infof("Deactivating write on flow %d(%p) due to lack of tokens", f.id, f)
 			f.conn.deactivateWriterLocked(f)
 			continue
 		}
@@ -250,6 +265,7 @@
 		f.opened = true
 	}
 	f.writing = false
+	f.ctx.VI(2).Infof("finishing write on %d(%p): %v", f.id, f, err)
 	f.conn.deactivateWriterLocked(f)
 	f.conn.notifyNextWriterLocked(f)
 	f.conn.mu.Unlock()
@@ -394,8 +410,19 @@
 }
 
 func (f *flw) close(ctx *context.T, err error) {
+	closedRemotely := verror.ErrorID(err) == ErrFlowClosedRemotely.ID
+	f.conn.mu.Lock()
+	if closedRemotely {
+		// When the other side closes a flow, it implicitly releases all the
+		// counters used by that flow.  That means we should release the shared
+		// counter to be used on other new flows.
+		f.conn.lshared += f.borrowed
+		f.borrowed = 0
+	}
+	f.borrowCond.Broadcast()
+	f.conn.mu.Unlock()
 	if f.q.close(ctx) {
-		eid := verror.ErrorID(err)
+		f.ctx.VI(2).Infof("closing %d(%p): %v", f.id, f, err)
 		f.cancel()
 		// After cancel has been called no new writes will begin for this
 		// flow.  There may be a write in progress, but it must finish
@@ -403,13 +430,12 @@
 		// can simply use sendMessageLocked to send the close flow
 		// message.
 		f.conn.mu.Lock()
-		delete(f.conn.flows, f.id)
 		connClosing := f.conn.status == Closing
 		var serr error
 		if !f.opened {
 			// Closing a flow that was never opened.
 			f.conn.unopenedFlows.Done()
-		} else if eid != ErrFlowClosedRemotely.ID && !connClosing {
+		} else if !closedRemotely && !connClosing {
 			// Note: If the conn is closing there is no point in trying to
 			// send the flow close message as it will fail.  This is racy
 			// with the connection closing, but there are no ill-effects
@@ -419,6 +445,18 @@
 				Flags: message.CloseFlag,
 			})
 		}
+		if f.borrowed > 0 && f.conn.status < Closing {
+			f.conn.loopWG.Add(1)
+			go func() {
+				defer f.conn.loopWG.Done()
+				f.conn.mu.Lock()
+				for f.borrowed > 0 && f.conn.status < Closing {
+					f.borrowCond.Wait()
+				}
+				delete(f.conn.flows, f.id)
+				f.conn.mu.Unlock()
+			}()
+		}
 		f.conn.mu.Unlock()
 		if serr != nil {
 			ctx.Errorf("Could not send close flow message: %v", err)
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index ff1f83c..186a40e 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -18,9 +18,11 @@
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/local"
 	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/test/goroutines"
 )
 
 func TestCache(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
@@ -31,7 +33,9 @@
 		RID:       naming.FixedRoutingID(0x5555),
 		Blessings: []string{"A", "B", "C"},
 	}
-	conn := makeConnAndFlow(t, ctx, remote).c
+	caf := makeConnAndFlow(t, ctx, remote)
+	defer caf.stop(ctx)
+	conn := caf.c
 	if err := c.Insert(conn, remote.Protocol, remote.Address); err != nil {
 		t.Fatal(err)
 	}
@@ -72,7 +76,9 @@
 		RID:       naming.FixedRoutingID(0x1111),
 		Blessings: []string{"ridonly"},
 	}
-	ridConn := makeConnAndFlow(t, ctx, ridEP).c
+	caf = makeConnAndFlow(t, ctx, ridEP)
+	defer caf.stop(ctx)
+	ridConn := caf.c
 	if err := c.InsertWithRoutingID(ridConn); err != nil {
 		t.Fatal(err)
 	}
@@ -90,7 +96,9 @@
 		RID:       naming.FixedRoutingID(0x2222),
 		Blessings: []string{"other"},
 	}
-	otherConn := makeConnAndFlow(t, ctx, otherEP).c
+	caf = makeConnAndFlow(t, ctx, otherEP)
+	defer caf.stop(ctx)
+	otherConn := caf.c
 
 	// Looking up a not yet inserted endpoint should fail.
 	if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -117,7 +125,9 @@
 	}
 
 	// Insert a duplicate conn to ensure that replaced conns still get closed.
-	dupConn := makeConnAndFlow(t, ctx, remote).c
+	caf = makeConnAndFlow(t, ctx, remote)
+	defer caf.stop(ctx)
+	dupConn := caf.c
 	if err := c.Insert(dupConn, remote.Protocol, remote.Address); err != nil {
 		t.Fatal(err)
 	}
@@ -136,17 +146,20 @@
 	c.Close(ctx)
 	// Now the connections should be closed.
 	<-conn.Closed()
+	<-ridConn.Closed()
 	<-dupConn.Closed()
 	<-otherConn.Closed()
 }
 
 func TestLRU(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
 	// Ensure that the least recently created conns are killed by KillConnections.
 	c := NewConnCache()
-	conns := nConnAndFlows(t, ctx, 10)
+	conns, stop := nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -178,7 +191,8 @@
 
 	// Ensure that writing to conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConnAndFlows(t, ctx, 10)
+	conns, stop = nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -213,7 +227,8 @@
 
 	// Ensure that reading from conns marks conns as more recently used.
 	c = NewConnCache()
-	conns = nConnAndFlows(t, ctx, 10)
+	conns, stop = nConnAndFlows(t, ctx, 10)
+	defer stop()
 	for _, conn := range conns {
 		addr := conn.c.RemoteEndpoint().Addr()
 		if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -267,6 +282,7 @@
 
 type connAndFlow struct {
 	c *connpackage.Conn
+	a *connpackage.Conn
 	f flow.Flow
 }
 
@@ -284,7 +300,12 @@
 	}
 }
 
-func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+func (c connAndFlow) stop(ctx *context.T) {
+	c.c.Close(ctx, nil)
+	c.a.Close(ctx, nil)
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) ([]connAndFlow, func()) {
 	cfs := make([]connAndFlow, n)
 	for i := 0; i < n; i++ {
 		cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
@@ -292,7 +313,11 @@
 			RID:      naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
 		})
 	}
-	return cfs
+	return cfs, func() {
+		for _, conn := range cfs {
+			conn.stop(ctx)
+		}
+	}
 }
 
 func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
@@ -318,7 +343,7 @@
 		ach <- a
 	}()
 	conn := <-dch
-	<-ach
+	aconn := <-ach
 	f, err := conn.Dial(ctx, flowtest.AllowAllPeersAuthorizer{}, nil)
 	if err != nil {
 		t.Fatal(err)
@@ -328,7 +353,7 @@
 		t.Fatal(err)
 	}
 	<-fh.ch
-	return connAndFlow{conn, f}
+	return connAndFlow{conn, aconn, f}
 }
 
 type fh struct {