rpc/stream/manager: Reenable connection killing after it was broken in
v.io/c/10433.
Change-Id: I29fd5bf0ac2a951941c19b46702263b1973f19c4
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 298d20f..3824089 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "math/rand"
"net"
"strings"
"sync"
@@ -62,6 +63,9 @@
manager *manager
vifs *vif.Set
+ connsMu sync.Mutex
+ conns map[net.Conn]bool
+
netLoop sync.WaitGroup
vifLoops sync.WaitGroup
}
@@ -87,6 +91,7 @@
manager: m,
netLn: netLn,
vifs: vif.NewSet(),
+ conns: make(map[net.Conn]bool),
}
// Set the default idle timeout for VC. But for "unixfd", we do not set
@@ -114,6 +119,44 @@
return false
}
+func (ln *netListener) killConnections(n int) {
+ ln.connsMu.Lock()
+ if n > len(ln.conns) {
+ n = len(ln.conns)
+ }
+ remaining := make([]net.Conn, 0, len(ln.conns))
+ for c := range ln.conns {
+ remaining = append(remaining, c)
+ }
+ removed := remaining[:n]
+ ln.connsMu.Unlock()
+
+ vlog.Infof("Killing %d Conns", n)
+
+ var wg sync.WaitGroup
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ idx := rand.Intn(len(remaining))
+ conn := remaining[idx]
+ go func(conn net.Conn) {
+ vlog.Infof("Killing connection (%s, %s)", conn.LocalAddr(), conn.RemoteAddr())
+ conn.Close()
+ ln.manager.killedConns.Incr(1)
+ wg.Done()
+ }(conn)
+ remaining[idx], remaining[0] = remaining[0], remaining[idx]
+ remaining = remaining[1:]
+ }
+
+ ln.connsMu.Lock()
+ for _, conn := range removed {
+ delete(ln.conns, conn)
+ }
+ ln.connsMu.Unlock()
+
+ wg.Wait()
+}
+
func (ln *netListener) netAcceptLoop(principal security.Principal, blessings security.Blessings, opts []stream.ListenerOpt) {
defer ln.netLoop.Done()
opts = append([]stream.ListenerOpt{vc.StartTimeout{defaultStartTimeout}}, opts...)
@@ -126,7 +169,7 @@
vlog.Infof("net.Listener.Accept() failed on %v with %v", ln.netLn, err)
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
- ln.manager.killConnections(tokill)
+ ln.killConnections(tokill)
} else {
tokill = 1
}
@@ -145,6 +188,10 @@
vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
return
}
+ ln.connsMu.Lock()
+ ln.conns[conn] = true
+ ln.connsMu.Unlock()
+
vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
go func() {
vf, err := vif.InternalNewAcceptedVIF(conn, ln.manager.rid, principal, blessings, nil, ln.deleteVIF, opts...)
@@ -157,7 +204,12 @@
ln.manager.vifs.Insert(vf)
ln.vifLoops.Add(1)
- vifLoop(vf, ln.q, &ln.vifLoops)
+ vifLoop(vf, ln.q, func() {
+ ln.connsMu.Lock()
+ delete(ln.conns, conn)
+ ln.connsMu.Unlock()
+ ln.vifLoops.Done()
+ })
}()
}
}
@@ -226,7 +278,9 @@
}
ln.vif = vf
ln.vifLoop.Add(1)
- go vifLoop(ln.vif, ln.q, &ln.vifLoop)
+ go vifLoop(ln.vif, ln.q, func() {
+ ln.vifLoop.Done()
+ })
return ln, ep, nil
}
@@ -338,8 +392,8 @@
return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
}
-func vifLoop(vf *vif.VIF, q *upcqueue.T, wg *sync.WaitGroup) {
- defer wg.Done()
+func vifLoop(vf *vif.VIF, q *upcqueue.T, cleanup func()) {
+ defer cleanup()
for {
cAndf, err := vf.Accept()
switch {
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 371f86a..5fb6232 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -7,7 +7,6 @@
import (
"fmt"
- "math/rand"
"net"
"strings"
"sync"
@@ -330,29 +329,6 @@
return strings.Join(l, "\n")
}
-func (m *manager) killConnections(n int) {
- vifs := m.vifs.List()
- if n > len(vifs) {
- n = len(vifs)
- }
- vlog.Infof("Killing %d VIFs", n)
- var wg sync.WaitGroup
- wg.Add(n)
- for i := 0; i < n; i++ {
- idx := rand.Intn(len(vifs))
- vf := vifs[idx]
- go func(vf *vif.VIF) {
- vlog.Infof("Killing VIF %v", vf)
- vf.Shutdown()
- m.killedConns.Incr(1)
- wg.Done()
- }(vf)
- vifs[idx], vifs[0] = vifs[0], vifs[idx]
- vifs = vifs[1:]
- }
- wg.Wait()
-}
-
func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
if !b.IsZero() && p == nil {
return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index e28f505..5ae22f3 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -918,8 +918,8 @@
if err := h.Shutdown(nil, &stderr); err != nil {
t.Fatal(err)
}
- if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("manager.go.*Killing [1-9][0-9]* VIFs"); len(log) == 0 {
- t.Errorf("Failed to find log message talking about killing VIFs in:\n%v", stderr.String())
+ if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("listener.go.*Killing [1-9][0-9]* Conns"); len(log) == 0 {
+ t.Errorf("Failed to find log message talking about killing Conns in:\n%v", stderr.String())
}
t.Logf("Server FD limit:%d", nfiles)
t.Logf("Client connection attempts: %d", nattempts)
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index d20b63e..2342ddd 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -412,12 +412,6 @@
}
}
-// Shutdown terminates the underlying network connection (any pending reads and
-// writes of flows/VCs over it will be discarded).
-func (vif *VIF) Shutdown() {
- vif.conn.Close()
-}
-
// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
// of a VIF. opts is used to setup the listener on newly established VCs.
func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {