Merge "rpc/stream/vom: change to open vom.encorder first"
diff --git a/profiles/internal/rpc/stream/manager/listener.go b/profiles/internal/rpc/stream/manager/listener.go
index 298d20f..3824089 100644
--- a/profiles/internal/rpc/stream/manager/listener.go
+++ b/profiles/internal/rpc/stream/manager/listener.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"math/rand"
 	"net"
 	"strings"
 	"sync"
@@ -62,6 +63,9 @@
 	manager *manager
 	vifs    *vif.Set
 
+	connsMu sync.Mutex
+	conns   map[net.Conn]bool
+
 	netLoop  sync.WaitGroup
 	vifLoops sync.WaitGroup
 }
@@ -87,6 +91,7 @@
 		manager: m,
 		netLn:   netLn,
 		vifs:    vif.NewSet(),
+		conns:   make(map[net.Conn]bool),
 	}
 
 	// Set the default idle timeout for VC. But for "unixfd", we do not set
@@ -114,6 +119,44 @@
 	return false
 }
 
+func (ln *netListener) killConnections(n int) {
+	ln.connsMu.Lock()
+	if n > len(ln.conns) {
+		n = len(ln.conns)
+	}
+	remaining := make([]net.Conn, 0, len(ln.conns))
+	for c := range ln.conns {
+		remaining = append(remaining, c)
+	}
+	removed := remaining[:n]
+	ln.connsMu.Unlock()
+
+	vlog.Infof("Killing %d Conns", n)
+
+	var wg sync.WaitGroup
+	wg.Add(n)
+	for i := 0; i < n; i++ {
+		idx := rand.Intn(len(remaining))
+		conn := remaining[idx]
+		go func(conn net.Conn) {
+			vlog.Infof("Killing connection (%s, %s)", conn.LocalAddr(), conn.RemoteAddr())
+			conn.Close()
+			ln.manager.killedConns.Incr(1)
+			wg.Done()
+		}(conn)
+		remaining[idx], remaining[0] = remaining[0], remaining[idx]
+		remaining = remaining[1:]
+	}
+
+	ln.connsMu.Lock()
+	for _, conn := range removed {
+		delete(ln.conns, conn)
+	}
+	ln.connsMu.Unlock()
+
+	wg.Wait()
+}
+
 func (ln *netListener) netAcceptLoop(principal security.Principal, blessings security.Blessings, opts []stream.ListenerOpt) {
 	defer ln.netLoop.Done()
 	opts = append([]stream.ListenerOpt{vc.StartTimeout{defaultStartTimeout}}, opts...)
@@ -126,7 +169,7 @@
 			vlog.Infof("net.Listener.Accept() failed on %v with %v", ln.netLn, err)
 			for tokill := 1; isTemporaryError(err); tokill *= 2 {
 				if isTooManyOpenFiles(err) {
-					ln.manager.killConnections(tokill)
+					ln.killConnections(tokill)
 				} else {
 					tokill = 1
 				}
@@ -145,6 +188,10 @@
 			vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
 			return
 		}
+		ln.connsMu.Lock()
+		ln.conns[conn] = true
+		ln.connsMu.Unlock()
+
 		vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
 		go func() {
 			vf, err := vif.InternalNewAcceptedVIF(conn, ln.manager.rid, principal, blessings, nil, ln.deleteVIF, opts...)
@@ -157,7 +204,12 @@
 			ln.manager.vifs.Insert(vf)
 
 			ln.vifLoops.Add(1)
-			vifLoop(vf, ln.q, &ln.vifLoops)
+			vifLoop(vf, ln.q, func() {
+				ln.connsMu.Lock()
+				delete(ln.conns, conn)
+				ln.connsMu.Unlock()
+				ln.vifLoops.Done()
+			})
 		}()
 	}
 }
@@ -226,7 +278,9 @@
 	}
 	ln.vif = vf
 	ln.vifLoop.Add(1)
-	go vifLoop(ln.vif, ln.q, &ln.vifLoop)
+	go vifLoop(ln.vif, ln.q, func() {
+		ln.vifLoop.Done()
+	})
 	return ln, ep, nil
 }
 
@@ -338,8 +392,8 @@
 	return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
 }
 
-func vifLoop(vf *vif.VIF, q *upcqueue.T, wg *sync.WaitGroup) {
-	defer wg.Done()
+func vifLoop(vf *vif.VIF, q *upcqueue.T, cleanup func()) {
+	defer cleanup()
 	for {
 		cAndf, err := vf.Accept()
 		switch {
diff --git a/profiles/internal/rpc/stream/manager/manager.go b/profiles/internal/rpc/stream/manager/manager.go
index 371f86a..5fb6232 100644
--- a/profiles/internal/rpc/stream/manager/manager.go
+++ b/profiles/internal/rpc/stream/manager/manager.go
@@ -7,7 +7,6 @@
 
 import (
 	"fmt"
-	"math/rand"
 	"net"
 	"strings"
 	"sync"
@@ -330,29 +329,6 @@
 	return strings.Join(l, "\n")
 }
 
-func (m *manager) killConnections(n int) {
-	vifs := m.vifs.List()
-	if n > len(vifs) {
-		n = len(vifs)
-	}
-	vlog.Infof("Killing %d VIFs", n)
-	var wg sync.WaitGroup
-	wg.Add(n)
-	for i := 0; i < n; i++ {
-		idx := rand.Intn(len(vifs))
-		vf := vifs[idx]
-		go func(vf *vif.VIF) {
-			vlog.Infof("Killing VIF %v", vf)
-			vf.Shutdown()
-			m.killedConns.Incr(1)
-			wg.Done()
-		}(vf)
-		vifs[idx], vifs[0] = vifs[0], vifs[idx]
-		vifs = vifs[1:]
-	}
-	wg.Wait()
-}
-
 func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
 	if !b.IsZero() && p == nil {
 		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index e28f505..5ae22f3 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -918,8 +918,8 @@
 	if err := h.Shutdown(nil, &stderr); err != nil {
 		t.Fatal(err)
 	}
-	if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("manager.go.*Killing [1-9][0-9]* VIFs"); len(log) == 0 {
-		t.Errorf("Failed to find log message talking about killing VIFs in:\n%v", stderr.String())
+	if log := expect.NewSession(t, bytes.NewReader(stderr.Bytes()), time.Minute).ExpectSetEventuallyRE("listener.go.*Killing [1-9][0-9]* Conns"); len(log) == 0 {
+		t.Errorf("Failed to find log message talking about killing Conns in:\n%v", stderr.String())
 	}
 	t.Logf("Server FD limit:%d", nfiles)
 	t.Logf("Client connection attempts: %d", nattempts)
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 3944e13..ae984c5 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -286,7 +286,7 @@
 		blessings = p.principal.BlessingStore().Default()
 	}
 
-	c, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
+	c, _, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
 	if err != nil {
 		processLog().Infof("Process %v failed to authenticate: %s", p, err)
 		return
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index 65bec0b..55a938d 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -67,6 +67,12 @@
 // including a hash of the Setup message in the encrypted stream.  It is
 // likely that this will be addressed in subsequent protocol versions.
 func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, versions *version.Range, params security.CallParams, auth *vc.ServerAuthorizer) (crypto.ControlCipher, error) {
+	// TODO(mattr): Temporarily don't do version negotiation over insecure channels.
+	// This is because the current production agent is broken (see v.io/i/293).
+	if params.LocalPrincipal == nil {
+		return nullCipher, nil
+	}
+
 	if versions == nil {
 		versions = version.SupportedRange
 	}
@@ -103,10 +109,6 @@
 	}
 	v := vrange.Max
 
-	if params.LocalPrincipal == nil {
-		return nullCipher, nil
-	}
-
 	// Perform the authentication.
 	return authenticateAsClient(writer, reader, params, auth, pvt, pub, ppub, v)
 }
@@ -131,8 +133,13 @@
 // based on the max common version.
 //
 // See AuthenticateAsClient for a description of the negotiation.
+// TODO(mattr): I have temporarily added a new output parmeter message.T.
+// If this message is non-nil then it is the first message that should be handled
+// by the VIF.  This is needed because the first message we read here may
+// or may not be a Setup message, if it's not we need to handle it in the
+// message loop.  This parameter will be removed shortly.
 func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, versions *version.Range, principal security.Principal, lBlessings security.Blessings,
-	dc vc.DischargeClient) (crypto.ControlCipher, error) {
+	dc vc.DischargeClient) (crypto.ControlCipher, message.T, error) {
 	var err error
 	if versions == nil {
 		versions = version.SupportedRange
@@ -141,11 +148,11 @@
 	// Send server's public data.
 	pvt, pub, err := makeSetup(versions, principal != nil)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	errch := make(chan error, 1)
-	readch := make(chan struct{})
+	readch := make(chan bool, 1)
 	go func() {
 		// TODO(mattr,ribrdb): In the case of the agent, which is
 		// currently the only user of insecure connections, we need to
@@ -155,42 +162,66 @@
 		// the magic byte has been sent the client will use the first
 		// byte of this message instead rendering the remainder of the
 		// stream uninterpretable.
-		if principal == nil {
-			<-readch
+		// TODO(mattr): As an even worse hack, we want to temporarily be
+		// compatible with really old agents.  We can't send them our setup
+		// message because they have a bug.  However we want new servers
+		// to be compatible with clients that DO send the setup message.
+		// To accomplish this we'll see if the client sends us a setup message
+		// first.  If it does, we'll send ours.  If not, we wont.
+		if principal != nil || <-readch {
+			err := message.WriteTo(writer, pub, nullCipher)
+			errch <- err
 		}
-		err := message.WriteTo(writer, pub, nullCipher)
-		errch <- err
+		close(errch)
 	}()
 
 	// Read client's public data.
 	pmsg, err := message.ReadFrom(reader, nullCipher)
-	close(readch) // Note: we need to close this whether we get an error or not.
 	if err != nil {
-		return nil, verror.New(stream.ErrNetwork, nil, err)
+		readch <- false
+		return nil, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	ppub, ok := pmsg.(*message.Setup)
 	if !ok {
-		return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+		// TODO(mattr): We got a message from the client, but it's not a Setup, so
+		// assume this is a client that doesn't negotiate.  This should
+		// be removed once we no longer have to be compatible with old agents.
+		readch <- false
+		return nullCipher, pmsg, nil
+	} else {
+		// TODO(mattr): We got a Setup message from the client, so
+		// assume this is a client that does negotiate.  This should
+		// be removed once we no longer have to be compatible with old agents.
+		readch <- true
 	}
 
 	// Wait for the write to succeed.
-	if err := <-errch; err != nil {
-		return nil, err
+	if err, ok := <-errch; !ok {
+		// We didn't write because we didn't get a Setup message from the client.
+		// Just return a nil cipher.  This basically just assumes the agent is
+		// RPC version compatible.
+		// TODO(mattr): This is part of the same hack described above, it should be
+		// removed later.
+		return nullCipher, nil, nil
+	} else if err != nil {
+		// The write failed, the VIF cannot be set up.
+		return nil, nil, err
 	}
 
 	// Choose the max version in the intersection.
 	vrange, err := versions.Intersect(&ppub.Versions)
 	if err != nil {
-		return nil, verror.New(stream.ErrNetwork, nil, err)
+		return nil, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	v := vrange.Max
 
 	if principal == nil {
-		return nullCipher, nil
+		return nullCipher, nil, nil
 	}
 
 	// Perform authentication.
-	return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
+	cipher, err := authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
+	return cipher, nil, err
 }
 
 func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index d20b63e..6689d5d 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -195,7 +195,7 @@
 			startTimeout = v.Duration
 		}
 	}
-	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c)
+	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c, nil)
 }
 
 // InternalNewAcceptedVIF creates a new virtual interface over the provided
@@ -215,7 +215,7 @@
 
 	dischargeClient := getDischargeClient(lopts)
 
-	c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
+	c, initialMessage, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
 	if err != nil {
 		return nil, err
 	}
@@ -227,10 +227,10 @@
 			startTimeout = v.Duration
 		}
 	}
-	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
+	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c, initialMessage)
 }
 
-func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
+func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher, initialMessage message.T) (*VIF, error) {
 	var (
 		// Choose IDs that will not conflict with any other (VC, Flow)
 		// pairs.  VCI 0 is never used by the application (it is
@@ -294,7 +294,7 @@
 			vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
 		}
 	})
-	go vif.readLoop()
+	go vif.readLoop(initialMessage)
 	go vif.writeLoop()
 	return vif, nil
 }
@@ -412,12 +412,6 @@
 	}
 }
 
-// Shutdown terminates the underlying network connection (any pending reads and
-// writes of flows/VCs over it will be discarded).
-func (vif *VIF) Shutdown() {
-	vif.conn.Close()
-}
-
 // StartAccepting begins accepting Flows (and VCs) initiated by the remote end
 // of a VIF. opts is used to setup the listener on newly established VCs.
 func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {
@@ -481,17 +475,24 @@
 	return fmt.Sprintf("(%s, %s) <-> (%s, %s)", l.Network(), l, r.Network(), r)
 }
 
-func (vif *VIF) readLoop() {
+func (vif *VIF) readLoop(initialMessage message.T) {
 	defer vif.Close()
 	defer vif.stopVCDispatchLoops()
 	for {
 		// vif.ctrlCipher is guarded by vif.writeMu.  However, the only mutation
 		// to it is in handleMessage, which runs in the same goroutine, so a
 		// lock is not required here.
-		msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher)
-		if err != nil {
-			vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
-			return
+		var msg message.T
+		if initialMessage != nil {
+			msg = initialMessage
+			initialMessage = nil
+		} else {
+			var err error
+			msg, err = message.ReadFrom(vif.reader, vif.ctrlCipher)
+			if err != nil {
+				vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
+				return
+			}
 		}
 		vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif)
 		if err := vif.handleMessage(msg); err != nil {