flow/manager: Periodically kill closed connections.
Change-Id: I6bed03b5123139c9d911319b8b3d5271b7828fb5
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 2490352..6320fc3 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,7 +26,10 @@
"v.io/x/ref/runtime/internal/rpc/version"
)
-const reconnectDelay = 50 * time.Millisecond
+const (
+ reconnectDelay = 50 * time.Millisecond
+ reapCacheInterval = 5 * time.Minute
+)
type manager struct {
rid naming.RoutingID
@@ -52,19 +55,27 @@
listeners: []flow.Listener{},
}
go func() {
- select {
- case <-ctx.Done():
- m.mu.Lock()
- listeners := m.listeners
- m.listeners = nil
- m.mu.Unlock()
- for _, ln := range listeners {
- ln.Close()
+ ticker := time.NewTicker(reapCacheInterval)
+ for {
+ select {
+ case <-ctx.Done():
+ m.mu.Lock()
+ listeners := m.listeners
+ m.listeners = nil
+ m.mu.Unlock()
+ for _, ln := range listeners {
+ ln.Close()
+ }
+ m.cache.Close(ctx)
+ m.q.Close()
+ m.wg.Wait()
+ ticker.Stop()
+ close(m.closed)
+ return
+ case <-ticker.C:
+ // Periodically kill closed connections.
+ m.cache.KillConnections(ctx, 0)
}
- m.cache.Close(ctx)
- m.q.Close()
- m.wg.Wait()
- close(m.closed)
}
}()
return m