runtime/internal/rpc/stream/vif: Fix shutdown races.
The existing code can leave some vifLoops running and
also orphan some VIFs.
Change-Id: Ib187cca22548abd621ab250adafd0e4d6b789fda
diff --git a/runtime/internal/rpc/stream/manager/listener.go b/runtime/internal/rpc/stream/manager/listener.go
index 64a7819..a7afc98 100644
--- a/runtime/internal/rpc/stream/manager/listener.go
+++ b/runtime/internal/rpc/stream/manager/listener.go
@@ -70,11 +70,11 @@
q *upcqueue.T
netLn net.Listener
manager *manager
- vifs *vif.Set
ctx *context.T
connsMu sync.Mutex
conns map[net.Conn]bool
+ vifs *vif.Set
netLoop sync.WaitGroup
vifLoops sync.WaitGroup
@@ -206,15 +206,24 @@
ln.ctx.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
+ ln.vifLoops.Add(1)
go func() {
vf, err := vif.InternalNewAcceptedVIF(ln.ctx, conn, ln.manager.rid, blessings, nil, ln.deleteVIF, opts...)
if err != nil {
ln.ctx.Infof("Shutting down conn from %s (local address: %s) as a VIF could not be created: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
conn.Close()
+ ln.vifLoops.Done()
return
}
- ln.vifLoops.Add(1)
+ ln.connsMu.Lock()
+ if ln.vifs == nil {
+ ln.connsMu.Unlock()
+ vf.Close()
+ ln.vifLoops.Done()
+ return
+ }
ln.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
+ ln.connsMu.Unlock()
ln.manager.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
vifLoop(ln.ctx, vf, ln.q, func() {
ln.connsMu.Lock()
@@ -228,7 +237,11 @@
func (ln *netListener) deleteVIF(vf *vif.VIF) {
ln.ctx.VI(2).Infof("VIF %v is closed, removing from cache", vf)
- ln.vifs.Delete(vf)
+ ln.connsMu.Lock()
+ if ln.vifs != nil {
+ ln.vifs.Delete(vf)
+ }
+ ln.connsMu.Unlock()
ln.manager.vifs.Delete(vf)
}
@@ -247,13 +260,21 @@
func (ln *netListener) Close() error {
closeNetListener(ln.ctx, ln.netLn)
ln.netLoop.Wait()
- for _, vif := range ln.vifs.List() {
+
+ ln.connsMu.Lock()
+ var vfs []*vif.VIF
+ if ln.vifs != nil {
+ vfs, ln.vifs = ln.vifs.List(), nil
+ }
+ ln.connsMu.Unlock()
+
+ for _, vf := range vfs {
// NOTE(caprita): We do not actually Close down the vifs, as
// that would require knowing when all outstanding requests are
// finished. For now, do not worry about it, since we expect
// shut down to immediately precede process exit.
//v23.Logger().Infof("Close: stop accepting: %p", vif)
- vif.StopAccepting()
+ vf.StopAccepting()
}
ln.q.Shutdown()
ln.manager.removeListener(ln)
@@ -269,7 +290,13 @@
ret := []string{
fmt.Sprintf("stream.Listener: net.Listener on (%q, %q)", ln.netLn.Addr().Network(), ln.netLn.Addr()),
}
- if vifs := ln.vifs.List(); len(vifs) > 0 {
+ ln.connsMu.Lock()
+ var vifs []*vif.VIF
+ if ln.vifs != nil {
+ vifs, ln.vifs = ln.vifs.List(), nil
+ }
+ ln.connsMu.Unlock()
+ if len(vifs) > 0 {
ret = append(ret, fmt.Sprintf("===Accepted VIFs(%d)===", len(vifs)))
for ix, vif := range vifs {
ret = append(ret, fmt.Sprintf("%4d) %v", ix, vif))
diff --git a/runtime/internal/rpc/stream/manager/manager_test.go b/runtime/internal/rpc/stream/manager/manager_test.go
index 0d050fd..22f6a61 100644
--- a/runtime/internal/rpc/stream/manager/manager_test.go
+++ b/runtime/internal/rpc/stream/manager/manager_test.go
@@ -70,6 +70,7 @@
var clientVC stream.VC
var clientF1 stream.Flow
go func() {
+ var err error
cctx, _ := v23.WithPrincipal(ctx, pclient)
if clientVC, err = client.Dial(cctx, ep); err != nil {
t.Errorf("Dial(%q) failed: %v", ep, err)
diff --git a/runtime/internal/rpc/stream/vif/vif.go b/runtime/internal/rpc/stream/vif/vif.go
index 86facf0..d8fd6d7 100644
--- a/runtime/internal/rpc/stream/vif/vif.go
+++ b/runtime/internal/rpc/stream/vif/vif.go
@@ -129,6 +129,8 @@
muStats sync.Mutex
stats Stats
+
+ loopWG sync.WaitGroup
}
// ConnectorAndFlow represents a Flow and the Connector that can be used to
@@ -313,6 +315,7 @@
vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
}
})
+ vif.loopWG.Add(2)
go vif.readLoop()
go vif.writeLoop()
return vif, nil
@@ -559,10 +562,13 @@
if err := vif.conn.Close(); err != nil {
vif.ctx.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err)
}
+
// Notify that the VIF has been closed.
if vif.onClose != nil {
go vif.onClose(vif)
}
+
+ vif.loopWG.Wait()
}
// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
@@ -630,6 +636,7 @@
func (vif *VIF) readLoop() {
defer vif.Close()
+ defer vif.loopWG.Done()
defer vif.stopVCDispatchLoops()
for {
// vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation
@@ -819,6 +826,7 @@
}
func (vif *VIF) writeLoop() {
+ defer vif.loopWG.Done()
defer vif.outgoing.Close()
defer vif.stopVCWriteLoops()
for {