rpc/stream/manager: Reenable connection killing after it was broken in
v.io/c/10433.

Change-Id: I29fd5bf0ac2a951941c19b46702263b1973f19c4
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 298d20f..3824089 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"math/rand"
 	"net"
 	"strings"
 	"sync"
@@ -62,6 +63,9 @@
 	manager *manager
 	vifs    *vif.Set
 
+	connsMu sync.Mutex
+	conns   map[net.Conn]bool
+
 	netLoop  sync.WaitGroup
 	vifLoops sync.WaitGroup
 }
@@ -87,6 +91,7 @@
 		manager: m,
 		netLn:   netLn,
 		vifs:    vif.NewSet(),
+		conns:   make(map[net.Conn]bool),
 	}
 
 	// Set the default idle timeout for VC. But for "unixfd", we do not set
@@ -114,6 +119,44 @@
 	return false
 }
 
+func (ln *netListener) killConnections(n int) {
+	ln.connsMu.Lock()
+	if n > len(ln.conns) {
+		n = len(ln.conns)
+	}
+	remaining := make([]net.Conn, 0, len(ln.conns))
+	for c := range ln.conns {
+		remaining = append(remaining, c)
+	}
+	removed := remaining[:n]
+	ln.connsMu.Unlock()
+
+	vlog.Infof("Killing %d Conns", n)
+
+	var wg sync.WaitGroup
+	wg.Add(n)
+	for i := 0; i < n; i++ {
+		idx := rand.Intn(len(remaining))
+		conn := remaining[idx]
+		go func(conn net.Conn) {
+			vlog.Infof("Killing connection (%s, %s)", conn.LocalAddr(), conn.RemoteAddr())
+			conn.Close()
+			ln.manager.killedConns.Incr(1)
+			wg.Done()
+		}(conn)
+		remaining[idx], remaining[0] = remaining[0], remaining[idx]
+		remaining = remaining[1:]
+	}
+
+	ln.connsMu.Lock()
+	for _, conn := range removed {
+		delete(ln.conns, conn)
+	}
+	ln.connsMu.Unlock()
+
+	wg.Wait()
+}
+
 func (ln *netListener) netAcceptLoop(principal security.Principal, blessings security.Blessings, opts []stream.ListenerOpt) {
 	defer ln.netLoop.Done()
 	opts = append([]stream.ListenerOpt{vc.StartTimeout{defaultStartTimeout}}, opts...)
@@ -126,7 +169,7 @@
 			vlog.Infof("net.Listener.Accept() failed on %v with %v", ln.netLn, err)
 			for tokill := 1; isTemporaryError(err); tokill *= 2 {
 				if isTooManyOpenFiles(err) {
-					ln.manager.killConnections(tokill)
+					ln.killConnections(tokill)
 				} else {
 					tokill = 1
 				}
@@ -145,6 +188,10 @@
 			vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
 			return
 		}
+		ln.connsMu.Lock()
+		ln.conns[conn] = true
+		ln.connsMu.Unlock()
+
 		vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
 		go func() {
 			vf, err := vif.InternalNewAcceptedVIF(conn, ln.manager.rid, principal, blessings, nil, ln.deleteVIF, opts...)
@@ -157,7 +204,12 @@
 			ln.manager.vifs.Insert(vf)
 
 			ln.vifLoops.Add(1)
-			vifLoop(vf, ln.q, &ln.vifLoops)
+			vifLoop(vf, ln.q, func() {
+				ln.connsMu.Lock()
+				delete(ln.conns, conn)
+				ln.connsMu.Unlock()
+				ln.vifLoops.Done()
+			})
 		}()
 	}
 }
@@ -226,7 +278,9 @@
 	}
 	ln.vif = vf
 	ln.vifLoop.Add(1)
-	go vifLoop(ln.vif, ln.q, &ln.vifLoop)
+	go vifLoop(ln.vif, ln.q, func() {
+		ln.vifLoop.Done()
+	})
 	return ln, ep, nil
 }
 
@@ -338,8 +392,8 @@
 	return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
 }
 
-func vifLoop(vf *vif.VIF, q *upcqueue.T, wg *sync.WaitGroup) {
-	defer wg.Done()
+func vifLoop(vf *vif.VIF, q *upcqueue.T, cleanup func()) {
+	defer cleanup()
 	for {
 		cAndf, err := vf.Accept()
 		switch {
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 371f86a..5fb6232 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -7,7 +7,6 @@
 
 import (
 	"fmt"
-	"math/rand"
 	"net"
 	"strings"
 	"sync"
@@ -330,29 +329,6 @@
 	return strings.Join(l, "\n")
 }
 
-func (m *manager) killConnections(n int) {
-	vifs := m.vifs.List()
-	if n > len(vifs) {
-		n = len(vifs)
-	}
-	vlog.Infof("Killing %d VIFs", n)
-	var wg sync.WaitGroup
-	wg.Add(n)
-	for i := 0; i < n; i++ {
-		idx := rand.Intn(len(vifs))
-		vf := vifs[idx]
-		go func(vf *vif.VIF) {
-			vlog.Infof("Killing VIF %v", vf)
-			vf.Shutdown()
-			m.killedConns.Incr(1)
-			wg.Done()
-		}(vf)
-		vifs[idx], vifs[0] = vifs[0], vifs[idx]
-		vifs = vifs[1:]
-	}
-	wg.Wait()
-}
-
 func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
 	if !b.IsZero() && p == nil {
 		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index e28f505..5ae22f3 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -918,8 +918,8 @@
 	if err := h.Shutdown(nil, &stderr); err != nil {
 		t.Fatal(err)
 	}
-	if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("manager.go.*Killing [1-9][0-9]* VIFs"); len(log) == 0 {
-		t.Errorf("Failed to find log message talking about killing VIFs in:\n%v", stderr.String())
+	if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("listener.go.*Killing [1-9][0-9]* Conns"); len(log) == 0 {
+		t.Errorf("Failed to find log message talking about killing Conns in:\n%v", stderr.String())
 	}
 	t.Logf("Server FD limit:%d", nfiles)
 	t.Logf("Client connection attempts: %d", nattempts)
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index d20b63e..2342ddd 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -412,12 +412,6 @@
 	}
 }
 
-// Shutdown terminates the underlying network connection (any pending reads and
-// writes of flows/VCs over it will be discarded).
-func (vif *VIF) Shutdown() {
-	vif.conn.Close()
-}
-
 // StartAccepting begins accepting Flows (and VCs) initiated by the remote end
 // of a VIF. opts is used to setup the listener on newly established VCs.
 func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {