Merge "rpc/stream/vom: change to open vom.encorder first"
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 298d20f..3824089 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "math/rand"
"net"
"strings"
"sync"
@@ -62,6 +63,9 @@
manager *manager
vifs *vif.Set
+ connsMu sync.Mutex
+ conns map[net.Conn]bool
+
netLoop sync.WaitGroup
vifLoops sync.WaitGroup
}
@@ -87,6 +91,7 @@
manager: m,
netLn: netLn,
vifs: vif.NewSet(),
+ conns: make(map[net.Conn]bool),
}
// Set the default idle timeout for VC. But for "unixfd", we do not set
@@ -114,6 +119,44 @@
return false
}
+func (ln *netListener) killConnections(n int) {
+ ln.connsMu.Lock()
+ if n > len(ln.conns) {
+ n = len(ln.conns)
+ }
+ remaining := make([]net.Conn, 0, len(ln.conns))
+ for c := range ln.conns {
+ remaining = append(remaining, c)
+ }
+ removed := remaining[:n]
+ ln.connsMu.Unlock()
+
+ vlog.Infof("Killing %d Conns", n)
+
+ var wg sync.WaitGroup
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ idx := rand.Intn(len(remaining))
+ conn := remaining[idx]
+ go func(conn net.Conn) {
+ vlog.Infof("Killing connection (%s, %s)", conn.LocalAddr(), conn.RemoteAddr())
+ conn.Close()
+ ln.manager.killedConns.Incr(1)
+ wg.Done()
+ }(conn)
+ remaining[idx], remaining[0] = remaining[0], remaining[idx]
+ remaining = remaining[1:]
+ }
+
+ ln.connsMu.Lock()
+ for _, conn := range removed {
+ delete(ln.conns, conn)
+ }
+ ln.connsMu.Unlock()
+
+ wg.Wait()
+}
+
func (ln *netListener) netAcceptLoop(principal security.Principal, blessings security.Blessings, opts []stream.ListenerOpt) {
defer ln.netLoop.Done()
opts = append([]stream.ListenerOpt{vc.StartTimeout{defaultStartTimeout}}, opts...)
@@ -126,7 +169,7 @@
vlog.Infof("net.Listener.Accept() failed on %v with %v", ln.netLn, err)
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
- ln.manager.killConnections(tokill)
+ ln.killConnections(tokill)
} else {
tokill = 1
}
@@ -145,6 +188,10 @@
vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
return
}
+ ln.connsMu.Lock()
+ ln.conns[conn] = true
+ ln.connsMu.Unlock()
+
vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
go func() {
vf, err := vif.InternalNewAcceptedVIF(conn, ln.manager.rid, principal, blessings, nil, ln.deleteVIF, opts...)
@@ -157,7 +204,12 @@
ln.manager.vifs.Insert(vf)
ln.vifLoops.Add(1)
- vifLoop(vf, ln.q, &ln.vifLoops)
+ vifLoop(vf, ln.q, func() {
+ ln.connsMu.Lock()
+ delete(ln.conns, conn)
+ ln.connsMu.Unlock()
+ ln.vifLoops.Done()
+ })
}()
}
}
@@ -226,7 +278,9 @@
}
ln.vif = vf
ln.vifLoop.Add(1)
- go vifLoop(ln.vif, ln.q, &ln.vifLoop)
+ go vifLoop(ln.vif, ln.q, func() {
+ ln.vifLoop.Done()
+ })
return ln, ep, nil
}
@@ -338,8 +392,8 @@
return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
}
-func vifLoop(vf *vif.VIF, q *upcqueue.T, wg *sync.WaitGroup) {
- defer wg.Done()
+func vifLoop(vf *vif.VIF, q *upcqueue.T, cleanup func()) {
+ defer cleanup()
for {
cAndf, err := vf.Accept()
switch {
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 371f86a..5fb6232 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -7,7 +7,6 @@
import (
"fmt"
- "math/rand"
"net"
"strings"
"sync"
@@ -330,29 +329,6 @@
return strings.Join(l, "\n")
}
-func (m *manager) killConnections(n int) {
- vifs := m.vifs.List()
- if n > len(vifs) {
- n = len(vifs)
- }
- vlog.Infof("Killing %d VIFs", n)
- var wg sync.WaitGroup
- wg.Add(n)
- for i := 0; i < n; i++ {
- idx := rand.Intn(len(vifs))
- vf := vifs[idx]
- go func(vf *vif.VIF) {
- vlog.Infof("Killing VIF %v", vf)
- vf.Shutdown()
- m.killedConns.Incr(1)
- wg.Done()
- }(vf)
- vifs[idx], vifs[0] = vifs[0], vifs[idx]
- vifs = vifs[1:]
- }
- wg.Wait()
-}
-
func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
if !b.IsZero() && p == nil {
return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index e28f505..5ae22f3 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -918,8 +918,8 @@
if err := h.Shutdown(nil, &stderr); err != nil {
t.Fatal(err)
}
- if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("manager.go.*Killing [1-9][0-9]* VIFs"); len(log) == 0 {
- t.Errorf("Failed to find log message talking about killing VIFs in:\n%v", stderr.String())
+ if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("listener.go.*Killing [1-9][0-9]* Conns"); len(log) == 0 {
+ t.Errorf("Failed to find log message talking about killing Conns in:\n%v", stderr.String())
}
t.Logf("Server FD limit:%d", nfiles)
t.Logf("Client connection attempts: %d", nattempts)
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 3944e13..ae984c5 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -286,7 +286,7 @@
blessings = p.principal.BlessingStore().Default()
}
- c, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
+ c, _, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
if err != nil {
processLog().Infof("Process %v failed to authenticate: %s", p, err)
return
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index 65bec0b..55a938d 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -67,6 +67,12 @@
// including a hash of the Setup message in the encrypted stream. It is
// likely that this will be addressed in subsequent protocol versions.
func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, versions *version.Range, params security.CallParams, auth *vc.ServerAuthorizer) (crypto.ControlCipher, error) {
+ // TODO(mattr): Temporarily don't do version negotiation over insecure channels.
+ // This is because the current production agent is broken (see v.io/i/293).
+ if params.LocalPrincipal == nil {
+ return nullCipher, nil
+ }
+
if versions == nil {
versions = version.SupportedRange
}
@@ -103,10 +109,6 @@
}
v := vrange.Max
- if params.LocalPrincipal == nil {
- return nullCipher, nil
- }
-
// Perform the authentication.
return authenticateAsClient(writer, reader, params, auth, pvt, pub, ppub, v)
}
@@ -131,8 +133,13 @@
// based on the max common version.
//
// See AuthenticateAsClient for a description of the negotiation.
+// TODO(mattr): I have temporarily added a new output parmeter message.T.
+// If this message is non-nil then it is the first message that should be handled
+// by the VIF. This is needed because the first message we read here may
+// or may not be a Setup message, if it's not we need to handle it in the
+// message loop. This parameter will be removed shortly.
func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, versions *version.Range, principal security.Principal, lBlessings security.Blessings,
- dc vc.DischargeClient) (crypto.ControlCipher, error) {
+ dc vc.DischargeClient) (crypto.ControlCipher, message.T, error) {
var err error
if versions == nil {
versions = version.SupportedRange
@@ -141,11 +148,11 @@
// Send server's public data.
pvt, pub, err := makeSetup(versions, principal != nil)
if err != nil {
- return nil, err
+ return nil, nil, err
}
errch := make(chan error, 1)
- readch := make(chan struct{})
+ readch := make(chan bool, 1)
go func() {
// TODO(mattr,ribrdb): In the case of the agent, which is
// currently the only user of insecure connections, we need to
@@ -155,42 +162,66 @@
// the magic byte has been sent the client will use the first
// byte of this message instead rendering the remainder of the
// stream uninterpretable.
- if principal == nil {
- <-readch
+ // TODO(mattr): As an even worse hack, we want to temporarily be
+ // compatible with really old agents. We can't send them our setup
+ // message because they have a bug. However we want new servers
+ // to be compatible with clients that DO send the setup message.
+ // To accomplish this we'll see if the client sends us a setup message
+ // first. If it does, we'll send ours. If not, we wont.
+ if principal != nil || <-readch {
+ err := message.WriteTo(writer, pub, nullCipher)
+ errch <- err
}
- err := message.WriteTo(writer, pub, nullCipher)
- errch <- err
+ close(errch)
}()
// Read client's public data.
pmsg, err := message.ReadFrom(reader, nullCipher)
- close(readch) // Note: we need to close this whether we get an error or not.
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ readch <- false
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
ppub, ok := pmsg.(*message.Setup)
if !ok {
- return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+ // TODO(mattr): We got a message from the client, but it's not a Setup, so
+ // assume this is a client that doesn't negotiate. This should
+ // be removed once we no longer have to be compatible with old agents.
+ readch <- false
+ return nullCipher, pmsg, nil
+ } else {
+ // TODO(mattr): We got a Setup message from the client, so
+ // assume this is a client that does negotiate. This should
+ // be removed once we no longer have to be compatible with old agents.
+ readch <- true
}
// Wait for the write to succeed.
- if err := <-errch; err != nil {
- return nil, err
+ if err, ok := <-errch; !ok {
+ // We didn't write because we didn't get a Setup message from the client.
+ // Just return a nil cipher. This basically just assumes the agent is
+ // RPC version compatible.
+ // TODO(mattr): This is part of the same hack described above, it should be
+ // removed later.
+ return nullCipher, nil, nil
+ } else if err != nil {
+ // The write failed, the VIF cannot be set up.
+ return nil, nil, err
}
// Choose the max version in the intersection.
vrange, err := versions.Intersect(&ppub.Versions)
if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
+ return nil, nil, verror.New(stream.ErrNetwork, nil, err)
}
v := vrange.Max
if principal == nil {
- return nullCipher, nil
+ return nullCipher, nil, nil
}
// Perform authentication.
- return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
+ cipher, err := authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
+ return cipher, nil, err
}
func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index d20b63e..6689d5d 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -195,7 +195,7 @@
startTimeout = v.Duration
}
}
- return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c)
+ return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c, nil)
}
// InternalNewAcceptedVIF creates a new virtual interface over the provided
@@ -215,7 +215,7 @@
dischargeClient := getDischargeClient(lopts)
- c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
+ c, initialMessage, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
if err != nil {
return nil, err
}
@@ -227,10 +227,10 @@
startTimeout = v.Duration
}
}
- return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
+ return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c, initialMessage)
}
-func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
+func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher, initialMessage message.T) (*VIF, error) {
var (
// Choose IDs that will not conflict with any other (VC, Flow)
// pairs. VCI 0 is never used by the application (it is
@@ -294,7 +294,7 @@
vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
}
})
- go vif.readLoop()
+ go vif.readLoop(initialMessage)
go vif.writeLoop()
return vif, nil
}
@@ -412,12 +412,6 @@
}
}
-// Shutdown terminates the underlying network connection (any pending reads and
-// writes of flows/VCs over it will be discarded).
-func (vif *VIF) Shutdown() {
- vif.conn.Close()
-}
-
// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
// of a VIF. opts is used to setup the listener on newly established VCs.
func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {
@@ -481,17 +475,24 @@
return fmt.Sprintf("(%s, %s) <-> (%s, %s)", l.Network(), l, r.Network(), r)
}
-func (vif *VIF) readLoop() {
+func (vif *VIF) readLoop(initialMessage message.T) {
defer vif.Close()
defer vif.stopVCDispatchLoops()
for {
// vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation
// to it is in handleMessage, which runs in the same goroutine, so a
// lock is not required here.
- msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher)
- if err != nil {
- vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
- return
+ var msg message.T
+ if initialMessage != nil {
+ msg = initialMessage
+ initialMessage = nil
+ } else {
+ var err error
+ msg, err = message.ReadFrom(vif.reader, vif.ctrlCipher)
+ if err != nil {
+ vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
+ return
+ }
}
vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif)
if err := vif.handleMessage(msg); err != nil {