flow/manager: Periodically kill closed connections.

Change-Id: I6bed03b5123139c9d911319b8b3d5271b7828fb5
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 2490352..6320fc3 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,7 +26,10 @@
 	"v.io/x/ref/runtime/internal/rpc/version"
 )
 
-const reconnectDelay = 50 * time.Millisecond
+const (
+	reconnectDelay    = 50 * time.Millisecond
+	reapCacheInterval = 5 * time.Minute
+)
 
 type manager struct {
 	rid    naming.RoutingID
@@ -52,19 +55,27 @@
 		listeners:      []flow.Listener{},
 	}
 	go func() {
-		select {
-		case <-ctx.Done():
-			m.mu.Lock()
-			listeners := m.listeners
-			m.listeners = nil
-			m.mu.Unlock()
-			for _, ln := range listeners {
-				ln.Close()
+		ticker := time.NewTicker(reapCacheInterval)
+		for {
+			select {
+			case <-ctx.Done():
+				m.mu.Lock()
+				listeners := m.listeners
+				m.listeners = nil
+				m.mu.Unlock()
+				for _, ln := range listeners {
+					ln.Close()
+				}
+				m.cache.Close(ctx)
+				m.q.Close()
+				m.wg.Wait()
+				ticker.Stop()
+				close(m.closed)
+				return
+			case <-ticker.C:
+				// Periodically kill closed connections.
+				m.cache.KillConnections(ctx, 0)
 			}
-			m.cache.Close(ctx)
-			m.q.Close()
-			m.wg.Wait()
-			close(m.closed)
 		}
 	}()
 	return m