Flow: Fix some cancel function leaks.
Change-Id: I114142c2b3011ed14a5a9440f3c580285bafd346
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 72e66c0..ba68899 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -93,6 +93,7 @@
c.mu.Lock()
if c.conns == nil {
+ r.cancel()
return NewErrCacheClosed(r.ctx)
}
@@ -101,8 +102,9 @@
}
if proxyConn != nil {
- c.insertConnLocked(r.remote, proxyConn, true, true, r.cancel)
- r.cancel = nil
+ if c.insertConnLocked(r.remote, proxyConn, true, true, r.cancel) {
+ r.cancel = nil
+ }
}
if conn != nil {
@@ -506,10 +508,10 @@
stats.NewStringFunc(naming.Join(prefix, "reserved"), func() string { return c.debugStringForDialing() })
}
-func (c *ConnCache) insertConnLocked(remote naming.Endpoint, conn CachedConn, proxy bool, keyByAddr bool, cancel context.CancelFunc) {
+func (c *ConnCache) insertConnLocked(remote naming.Endpoint, conn CachedConn, proxy bool, keyByAddr bool, cancel context.CancelFunc) bool {
if _, ok := c.conns[conn]; ok {
// If the conn is already in the cache, don't re-add it.
- return
+ return false
}
ep := conn.RemoteEndpoint()
entry := &connEntry{
@@ -537,6 +539,7 @@
c.cache[k] = append(c.cache[k], entry)
}
c.conns[entry.conn] = entry
+ return true
}
func (c *ConnCache) rttEntriesLocked(ctx *context.T, keys []interface{}) (rttEntries, error) {
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 74e5852..f098ca4 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -80,6 +80,7 @@
defer shutdown()
c := NewConnCache(0)
+ defer c.Close(ctx)
nullep := makeEP(ctx, "local", "a1", 0, "b1")
oneep := makeEP(ctx, "local", "a1", 1, "b1")
twoep := makeEP(ctx, "local", "a1", 2, "b1")
@@ -259,6 +260,7 @@
// Ensure that the least recently created conns are killed by KillConnections.
c := NewConnCache(0)
+ defer c.Close(ctx)
conns, stop := nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -289,6 +291,7 @@
// Ensure that writing to conns marks conns as more recently used.
c = NewConnCache(0)
+ defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -321,6 +324,7 @@
// Ensure that reading from conns marks conns as more recently used.
c = NewConnCache(0)
+ defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -361,6 +365,7 @@
// Ensure that idle conns are killed by the cache.
// Set the idle timeout very low to ensure all the connections are closed.
c := NewConnCache(1)
+ defer c.Close(ctx)
conns, stop := nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -398,6 +403,7 @@
// Set the idle timeout very high to ensure none of the connections are closed.
c = NewConnCache(time.Hour)
+ defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -422,6 +428,7 @@
// Ensure that a low idle timeout, but live flows on the conns keep the connection alive.
c = NewConnCache(1)
+ defer c.Close(ctx)
conns, stop = nConnAndFlows(t, ctx, 10)
defer stop()
for _, conn := range conns {
@@ -450,6 +457,7 @@
defer shutdown()
c := NewConnCache(0)
+ defer c.Close(ctx)
remote, _, _, _, _, _ := makeEPs(ctx, "normal")
auth := flowtest.NewPeerAuthorizer(remote.BlessingNames())
slow, med, fast := 3*time.Millisecond, 2*time.Millisecond, 1*time.Millisecond