runtime/internal/rpc/stream/vif: Fix shutdown races.

The existing code can leave some vifLoops running and
also orphan some VIFs.

Change-Id: Ib187cca22548abd621ab250adafd0e4d6b789fda
diff --git a/runtime/internal/rpc/stream/manager/listener.go b/runtime/internal/rpc/stream/manager/listener.go
index 64a7819..a7afc98 100644
--- a/runtime/internal/rpc/stream/manager/listener.go
+++ b/runtime/internal/rpc/stream/manager/listener.go
@@ -70,11 +70,11 @@
 	q       *upcqueue.T
 	netLn   net.Listener
 	manager *manager
-	vifs    *vif.Set
 	ctx     *context.T
 
 	connsMu sync.Mutex
 	conns   map[net.Conn]bool
+	vifs    *vif.Set
 
 	netLoop  sync.WaitGroup
 	vifLoops sync.WaitGroup
@@ -206,15 +206,24 @@
 
 		ln.ctx.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
 
+		ln.vifLoops.Add(1)
 		go func() {
 			vf, err := vif.InternalNewAcceptedVIF(ln.ctx, conn, ln.manager.rid, blessings, nil, ln.deleteVIF, opts...)
 			if err != nil {
 				ln.ctx.Infof("Shutting down conn from %s (local address: %s) as a VIF could not be created: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
 				conn.Close()
+				ln.vifLoops.Done()
 				return
 			}
-			ln.vifLoops.Add(1)
+			ln.connsMu.Lock()
+			if ln.vifs == nil {
+				ln.connsMu.Unlock()
+				vf.Close()
+				ln.vifLoops.Done()
+				return
+			}
 			ln.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
+			ln.connsMu.Unlock()
 			ln.manager.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
 			vifLoop(ln.ctx, vf, ln.q, func() {
 				ln.connsMu.Lock()
@@ -228,7 +237,11 @@
 
 func (ln *netListener) deleteVIF(vf *vif.VIF) {
 	ln.ctx.VI(2).Infof("VIF %v is closed, removing from cache", vf)
-	ln.vifs.Delete(vf)
+	ln.connsMu.Lock()
+	if ln.vifs != nil {
+		ln.vifs.Delete(vf)
+	}
+	ln.connsMu.Unlock()
 	ln.manager.vifs.Delete(vf)
 }
 
@@ -247,13 +260,21 @@
 func (ln *netListener) Close() error {
 	closeNetListener(ln.ctx, ln.netLn)
 	ln.netLoop.Wait()
-	for _, vif := range ln.vifs.List() {
+
+	ln.connsMu.Lock()
+	var vfs []*vif.VIF
+	if ln.vifs != nil {
+		vfs, ln.vifs = ln.vifs.List(), nil
+	}
+	ln.connsMu.Unlock()
+
+	for _, vf := range vfs {
 		// NOTE(caprita): We do not actually Close down the vifs, as
 		// that would require knowing when all outstanding requests are
 		// finished.  For now, do not worry about it, since we expect
 		// shut down to immediately precede process exit.
 		//v23.Logger().Infof("Close: stop accepting: %p", vif)
-		vif.StopAccepting()
+		vf.StopAccepting()
 	}
 	ln.q.Shutdown()
 	ln.manager.removeListener(ln)
@@ -269,7 +290,13 @@
 	ret := []string{
 		fmt.Sprintf("stream.Listener: net.Listener on (%q, %q)", ln.netLn.Addr().Network(), ln.netLn.Addr()),
 	}
-	if vifs := ln.vifs.List(); len(vifs) > 0 {
+	ln.connsMu.Lock()
+	var vifs []*vif.VIF
+	if ln.vifs != nil {
+		vifs, ln.vifs = ln.vifs.List(), nil
+	}
+	ln.connsMu.Unlock()
+	if len(vifs) > 0 {
 		ret = append(ret, fmt.Sprintf("===Accepted VIFs(%d)===", len(vifs)))
 		for ix, vif := range vifs {
 			ret = append(ret, fmt.Sprintf("%4d) %v", ix, vif))
diff --git a/runtime/internal/rpc/stream/manager/manager_test.go b/runtime/internal/rpc/stream/manager/manager_test.go
index 0d050fd..22f6a61 100644
--- a/runtime/internal/rpc/stream/manager/manager_test.go
+++ b/runtime/internal/rpc/stream/manager/manager_test.go
@@ -70,6 +70,7 @@
 	var clientVC stream.VC
 	var clientF1 stream.Flow
 	go func() {
+		var err error
 		cctx, _ := v23.WithPrincipal(ctx, pclient)
 		if clientVC, err = client.Dial(cctx, ep); err != nil {
 			t.Errorf("Dial(%q) failed: %v", ep, err)
diff --git a/runtime/internal/rpc/stream/vif/vif.go b/runtime/internal/rpc/stream/vif/vif.go
index 86facf0..d8fd6d7 100644
--- a/runtime/internal/rpc/stream/vif/vif.go
+++ b/runtime/internal/rpc/stream/vif/vif.go
@@ -129,6 +129,8 @@
 
 	muStats sync.Mutex
 	stats   Stats
+
+	loopWG sync.WaitGroup
 }
 
 // ConnectorAndFlow represents a Flow and the Connector that can be used to
@@ -313,6 +315,7 @@
 			vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
 		}
 	})
+	vif.loopWG.Add(2)
 	go vif.readLoop()
 	go vif.writeLoop()
 	return vif, nil
@@ -559,10 +562,13 @@
 	if err := vif.conn.Close(); err != nil {
 		vif.ctx.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err)
 	}
+
 	// Notify that the VIF has been closed.
 	if vif.onClose != nil {
 		go vif.onClose(vif)
 	}
+
+	vif.loopWG.Wait()
 }
 
 // StartAccepting begins accepting Flows (and VCs) initiated by the remote end
@@ -630,6 +636,7 @@
 
 func (vif *VIF) readLoop() {
 	defer vif.Close()
+	defer vif.loopWG.Done()
 	defer vif.stopVCDispatchLoops()
 	for {
 		// vif.ctrlCipher is guarded by vif.writeMu.  However, the only mutation
@@ -819,6 +826,7 @@
 }
 
 func (vif *VIF) writeLoop() {
+	defer vif.loopWG.Done()
 	defer vif.outgoing.Close()
 	defer vif.stopVCWriteLoops()
 	for {