Flow: Fix some cancel function leaks.

Change-Id: I114142c2b3011ed14a5a9440f3c580285bafd346
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 72e66c0..ba68899 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -93,6 +93,7 @@
 	c.mu.Lock()
 
 	if c.conns == nil {
+		r.cancel()
 		return NewErrCacheClosed(r.ctx)
 	}
 
@@ -101,8 +102,9 @@
 	}
 
 	if proxyConn != nil {
-		c.insertConnLocked(r.remote, proxyConn, true, true, r.cancel)
-		r.cancel = nil
+		if c.insertConnLocked(r.remote, proxyConn, true, true, r.cancel) {
+			r.cancel = nil
+		}
 	}
 
 	if conn != nil {
@@ -506,10 +508,10 @@
 	stats.NewStringFunc(naming.Join(prefix, "reserved"), func() string { return c.debugStringForDialing() })
 }
 
-func (c *ConnCache) insertConnLocked(remote naming.Endpoint, conn CachedConn, proxy bool, keyByAddr bool, cancel context.CancelFunc) {
+func (c *ConnCache) insertConnLocked(remote naming.Endpoint, conn CachedConn, proxy bool, keyByAddr bool, cancel context.CancelFunc) bool {
 	if _, ok := c.conns[conn]; ok {
 		// If the conn is already in the cache, don't re-add it.
-		return
+		return false
 	}
 	ep := conn.RemoteEndpoint()
 	entry := &connEntry{
@@ -537,6 +539,7 @@
 		c.cache[k] = append(c.cache[k], entry)
 	}
 	c.conns[entry.conn] = entry
+	return true
 }
 
 func (c *ConnCache) rttEntriesLocked(ctx *context.T, keys []interface{}) (rttEntries, error) {
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 74e5852..f098ca4 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -80,6 +80,7 @@
 	defer shutdown()
 
 	c := NewConnCache(0)
+	defer c.Close(ctx)
 	nullep := makeEP(ctx, "local", "a1", 0, "b1")
 	oneep := makeEP(ctx, "local", "a1", 1, "b1")
 	twoep := makeEP(ctx, "local", "a1", 2, "b1")
@@ -259,6 +260,7 @@
 
 	// Ensure that the least recently created conns are killed by KillConnections.
 	c := NewConnCache(0)
+	defer c.Close(ctx)
 	conns, stop := nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -289,6 +291,7 @@
 
 	// Ensure that writing to conns marks conns as more recently used.
 	c = NewConnCache(0)
+	defer c.Close(ctx)
 	conns, stop = nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -321,6 +324,7 @@
 
 	// Ensure that reading from conns marks conns as more recently used.
 	c = NewConnCache(0)
+	defer c.Close(ctx)
 	conns, stop = nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -361,6 +365,7 @@
 	// Ensure that idle conns are killed by the cache.
 	// Set the idle timeout very low to ensure all the connections are closed.
 	c := NewConnCache(1)
+	defer c.Close(ctx)
 	conns, stop := nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -398,6 +403,7 @@
 
 	// Set the idle timeout very high to ensure none of the connections are closed.
 	c = NewConnCache(time.Hour)
+	defer c.Close(ctx)
 	conns, stop = nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -422,6 +428,7 @@
 
 	// Ensure that a low idle timeout, but live flows on the conns keep the connection alive.
 	c = NewConnCache(1)
+	defer c.Close(ctx)
 	conns, stop = nConnAndFlows(t, ctx, 10)
 	defer stop()
 	for _, conn := range conns {
@@ -450,6 +457,7 @@
 	defer shutdown()
 
 	c := NewConnCache(0)
+	defer c.Close(ctx)
 	remote, _, _, _, _, _ := makeEPs(ctx, "normal")
 	auth := flowtest.NewPeerAuthorizer(remote.BlessingNames())
 	slow, med, fast := 3*time.Millisecond, 2*time.Millisecond, 1*time.Millisecond