Merge "flow/manager: Fix stoplistening for proxy listen."
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 0bc1ea2..044ae8a 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -54,7 +54,8 @@
listeners: []flow.Listener{},
// TODO(mattr): This channel is sized somewhat arbitrarily right now.
// I should measure the impact of the size.
- events: make(chan conn.StatusUpdate, 16),
+ events: make(chan conn.StatusUpdate, 16),
+ stopProxy: make(chan struct{}),
}
events = m.ls.events
}
@@ -88,6 +89,7 @@
activeConns sync.WaitGroup
mu sync.Mutex
+ stopProxy chan struct{}
listeners []flow.Listener
endpoints []naming.Endpoint
}
@@ -100,6 +102,10 @@
listeners := m.ls.listeners
m.ls.listeners = nil
m.ls.endpoints = nil
+ if m.ls.stopProxy != nil {
+ close(m.ls.stopProxy)
+ m.ls.stopProxy = nil
+ }
m.ls.mu.Unlock()
for _, ln := range listeners {
ln.Close()
@@ -180,6 +186,8 @@
select {
case <-ctx.Done():
return
+ case <-m.ls.stopProxy:
+ return
default:
}
f, c, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run)
@@ -206,6 +214,8 @@
select {
case <-ctx.Done():
return
+ case <-m.ls.stopProxy:
+ return
case <-f.Closed():
update(nil)
delay = reconnectDelay