ref: Setup messages should always be exchanged by VIFs and should be async.

This change also removes support for RPCVersion5.

MultiPart: 2/2
Change-Id: Ie8d3cb30170ca8fac58594197ad0f28108957e3c
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index ab268dc..f963e1d 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -650,7 +650,7 @@
 				topLevelError = verror.ErrNotTrusted
 				topLevelAction = verror.NoRetry
 				onlyErrNetwork = false
-			case stream.ErrNetwork.ID:
+			case stream.ErrAborted.ID, stream.ErrNetwork.ID:
 				// do nothing
 			default:
 				onlyErrNetwork = false
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 09d3586..7836963 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -90,7 +90,6 @@
 	conn         net.Conn
 	pool         *iobuf.Pool
 	reader       *iobuf.Reader
-	isSetup      bool
 	ctrlCipher   crypto.ControlCipher
 	queue        *upcqueue.T
 	mu           sync.RWMutex
@@ -261,12 +260,25 @@
 
 func (p *Proxy) acceptProcess(conn net.Conn) {
 	pool := iobuf.NewPool(0)
+	reader := iobuf.NewReader(pool, conn)
+
+	var blessings security.Blessings
+	if p.principal != nil {
+		blessings = p.principal.BlessingStore().Default()
+	}
+
+	c, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
+	if err != nil {
+		processLog().Infof("Process %v failed to authenticate: %s", p, err)
+		return
+	}
+
 	process := &process{
 		proxy:        p,
 		conn:         conn,
 		pool:         pool,
-		reader:       iobuf.NewReader(pool, conn),
-		ctrlCipher:   &crypto.NullControlCipher{},
+		reader:       reader,
+		ctrlCipher:   c,
 		queue:        upcqueue.New(),
 		routingTable: make(map[id.VC]*destination),
 		servers:      make(map[id.VC]*vc.VC),
@@ -661,26 +673,8 @@
 			dstprocess.queue.Put(m)
 			p.proxy.routeCounters(p, counters)
 
-		case *message.Setup:
-			// Set up the hop.  This takes over the process during negotiation.
-			if p.isSetup {
-				// Already performed authentication.  We don't do it again.
-				processLog().Infof("Process %v is already setup", p)
-				return
-			}
-			var blessings security.Blessings
-			if p.proxy.principal != nil {
-				blessings = p.proxy.principal.BlessingStore().Default()
-			}
-			c, err := vif.AuthenticateAsServer(p.conn, p.reader, nil, p.proxy.principal, blessings, nil, m)
-			if err != nil {
-				processLog().Infof("Process %v failed to authenticate: %s", p, err)
-				return
-			}
-			p.ctrlCipher = c
-			p.isSetup = true
 		default:
-			processLog().Infof("Closing %v because of unrecognized message %T", p, m)
+			processLog().Infof("Closing %v because of invalid message %T", p, m)
 			return
 		}
 	}
diff --git a/profiles/internal/rpc/stream/vc/auth.go b/profiles/internal/rpc/stream/vc/auth.go
index b74a351..e19d10f 100644
--- a/profiles/internal/rpc/stream/vc/auth.go
+++ b/profiles/internal/rpc/stream/vc/auth.go
@@ -110,10 +110,8 @@
 	if err := enc.Encode(b); err != nil {
 		return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
 	}
-	if v >= version.RPCVersion5 {
-		if err := enc.Encode(discharges); err != nil {
-			return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
-		}
+	if err := enc.Encode(discharges); err != nil {
+		return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
 	}
 	msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
 	if err != nil {
@@ -161,10 +159,8 @@
 		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	var discharges []security.Discharge
-	if v >= version.RPCVersion5 {
-		if err := dec.Decode(&discharges); err != nil {
-			return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
-		}
+	if err := dec.Decode(&discharges); err != nil {
+		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
 		return noBlessings, nil, verror.New(stream.ErrSecurity, nil, verror.New(errInvalidSignatureInMessage, nil))
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index 4f7243a..65bec0b 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -44,20 +44,19 @@
 //
 // The sequence is initiated by the client.
 //
-//    - If the versions include RPCVersion6 or greater, the client sends a
-//      Setup message to the server, containing the client's supported
-//      versions, and the client's crypto options.  The Setup message
+//    - The client sends a Setup message to the server, containing the client's
+//      supported versions, and the client's crypto options.  The Setup message
 //      is sent in the clear.
 //
 //    - When the server receives the Setup message, it calls
 //      AuthenticateAsServer, which constructs a response Setup containing
 //      the server's version range, and any crypto options.
 //
-//    - For RPCVersion6 and RPCVersion7, the client and server generate fresh
-//      public/private key pairs, sending the public key to the peer as a crypto
-//      option.  The remainder of the communication is encrypted as
-//      SetupStream messages using NewControlCipherRPC6, which is based on
-//      code.google.com/p/go.crypto/nacl/box.
+//    - The client and server use the public/private key pairs
+//      generated for the Setup messages to create an encrypted stream
+//      of SetupStream messages for the remainder of the authentication
+//      setup.  The encyrption uses NewControlCipherRPC6, which is based
+//      on code.google.com/p/go.crypto/nacl/box.
 //
 //    - Once the encrypted SetupStream channel is setup, the client and
 //      server authenticate using the vc.AuthenticateAs{Client,Server} protocol.
@@ -66,36 +65,23 @@
 // modification by a man-in-the-middle, which can currently force a downgrade by
 // modifying the acceptable version ranges downward.  This can be addressed by
 // including a hash of the Setup message in the encrypted stream.  It is
-// likely that this will be addressed in subsequent protocol versions (or it may
-// not be addressed at all if RPCVersion6 becomes the only supported version).
+// likely that this will be addressed in subsequent protocol versions.
 func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, versions *version.Range, params security.CallParams, auth *vc.ServerAuthorizer) (crypto.ControlCipher, error) {
 	if versions == nil {
 		versions = version.SupportedRange
 	}
-	if params.LocalPrincipal == nil {
-		// If there is no principal, we do not support encryption/authentication.
-		// TODO(ashankar, mattr): We should still exchange version information even
-		// if we don't do encryption.
-		var err error
-		versions, err = versions.Intersect(&version.Range{Min: 0, Max: rpcversion.RPCVersion5})
-		if err != nil {
-			return nil, verror.New(stream.ErrNetwork, nil, err)
-		}
-	}
-	if versions.Max < rpcversion.RPCVersion6 {
-		return nullCipher, nil
-	}
 
-	// The client has not yet sent its public data.  Construct it and send it.
-	pvt, pub, err := makeSetup(versions)
+	// Send the client's public data.
+	pvt, pub, err := makeSetup(versions, params.LocalPrincipal != nil)
 	if err != nil {
 		return nil, verror.New(stream.ErrSecurity, nil, err)
 	}
-	if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
-		return nil, verror.New(stream.ErrNetwork, nil, err)
-	}
 
-	// Read the server's public data.
+	errch := make(chan error, 1)
+	go func() {
+		errch <- message.WriteTo(writer, pub, nullCipher)
+	}()
+
 	pmsg, err := message.ReadFrom(reader, nullCipher)
 	if err != nil {
 		return nil, verror.New(stream.ErrNetwork, nil, err)
@@ -105,25 +91,28 @@
 		return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
 	}
 
+	// Wait for the write to succeed.
+	if err := <-errch; err != nil {
+		return nil, verror.New(stream.ErrNetwork, nil, err)
+	}
+
 	// Choose the max version in the intersection.
 	vrange, err := pub.Versions.Intersect(&ppub.Versions)
 	if err != nil {
 		return nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	v := vrange.Max
-	if v < rpcversion.RPCVersion6 {
+
+	if params.LocalPrincipal == nil {
 		return nullCipher, nil
 	}
 
 	// Perform the authentication.
-	return authenticateAsClient(writer, reader, params, auth, &pvt, &pub, ppub, v)
+	return authenticateAsClient(writer, reader, params, auth, pvt, pub, ppub, v)
 }
 
 func authenticateAsClient(writer io.Writer, reader *iobuf.Reader, params security.CallParams, auth *vc.ServerAuthorizer,
 	pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
-	if version < rpcversion.RPCVersion6 {
-		return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
-	}
 	pbox := ppub.NaclBox()
 	if pbox == nil {
 		return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
@@ -143,51 +132,69 @@
 //
 // See AuthenticateAsClient for a description of the negotiation.
 func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, versions *version.Range, principal security.Principal, lBlessings security.Blessings,
-	dc vc.DischargeClient, ppub *message.Setup) (crypto.ControlCipher, error) {
+	dc vc.DischargeClient) (crypto.ControlCipher, error) {
 	var err error
 	if versions == nil {
 		versions = version.SupportedRange
 	}
 
-	if principal == nil {
-		// If we're not encrypting the connection we can just send them
-		// our version information.
-		pub := &message.Setup{
-			Versions: *versions,
-		}
-		if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
-			return nil, err
-		}
-		_, err := versions.Intersect(&ppub.Versions)
-		return nullCipher, err
+	// Send server's public data.
+	pvt, pub, err := makeSetup(versions, principal != nil)
+	if err != nil {
+		return nil, err
 	}
 
-	// Create our public data and send it to the client.
-	pvt, pub, err := makeSetup(versions)
+	errch := make(chan error, 1)
+	readch := make(chan struct{})
+	go func() {
+		// TODO(mattr,ribrdb): In the case of the agent, which is
+		// currently the only user of insecure connections, we need to
+		// wait for the client to initiate the communication.  The agent
+		// sends an extra first byte to clients, which clients read before
+		// dialing their side of the vif.  If we send this message before
+		// the magic byte has been sent the client will use the first
+		// byte of this message instead rendering the remainder of the
+		// stream uninterpretable.
+		if principal == nil {
+			<-readch
+		}
+		err := message.WriteTo(writer, pub, nullCipher)
+		errch <- err
+	}()
+
+	// Read client's public data.
+	pmsg, err := message.ReadFrom(reader, nullCipher)
+	close(readch) // Note: we need to close this whether we get an error or not.
 	if err != nil {
+		return nil, verror.New(stream.ErrNetwork, nil, err)
+	}
+	ppub, ok := pmsg.(*message.Setup)
+	if !ok {
+		return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+	}
+
+	// Wait for the write to succeed.
+	if err := <-errch; err != nil {
 		return nil, err
 	}
-	if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
-		return nil, err
-	}
+
+	// Choose the max version in the intersection.
 	vrange, err := versions.Intersect(&ppub.Versions)
 	if err != nil {
-		return nil, err
+		return nil, verror.New(stream.ErrNetwork, nil, err)
 	}
 	v := vrange.Max
-	if v < rpcversion.RPCVersion6 {
+
+	if principal == nil {
 		return nullCipher, nil
 	}
 
 	// Perform authentication.
-	return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, &pvt, &pub, ppub, v)
+	return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
 }
 
 func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
 	pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
-	if version < rpcversion.RPCVersion6 {
-		return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
-	}
 	box := ppub.NaclBox()
 	if box == nil {
 		return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
@@ -215,14 +222,24 @@
 }
 
 // makeSetup constructs the options that this process can support.
-func makeSetup(versions *version.Range) (pvt privateData, pub message.Setup, err error) {
-	pub.Versions = *versions
-	var pubKey, pvtKey *[32]byte
-	pubKey, pvtKey, err = box.GenerateKey(rand.Reader)
-	if err != nil {
-		return
+func makeSetup(versions *version.Range, secure bool) (*privateData, *message.Setup, error) {
+	var options []message.SetupOption
+	var pvt *privateData
+	if secure {
+		pubKey, pvtKey, err := box.GenerateKey(rand.Reader)
+		if err != nil {
+			return nil, nil, err
+		}
+		options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
+		pvt = &privateData{
+			naclBoxPrivateKey: *pvtKey,
+		}
 	}
-	pub.Options = append(pub.Options, &message.NaclBox{PublicKey: *pubKey})
-	pvt.naclBoxPrivateKey = *pvtKey
-	return
+
+	pub := &message.Setup{
+		Versions: *versions,
+		Options:  options,
+	}
+
+	return pvt, pub, nil
 }
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index eb4776c..e3175e2 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -80,8 +80,6 @@
 	reader  *iobuf.Reader
 	localEP naming.Endpoint
 
-	// control channel encryption.
-	isSetup bool
 	// ctrlCipher is normally guarded by writeMu, however see the exception in
 	// readLoop.
 	ctrlCipher crypto.ControlCipher
@@ -183,7 +181,7 @@
 	// server and not the remote endpoint of the VIF.
 	c, err := AuthenticateAsClient(conn, reader, versions, params, nil)
 	if err != nil {
-		return nil, err
+		return nil, verror.New(stream.ErrNetwork, ctx, err)
 	}
 	var blessings security.Blessings
 
@@ -214,6 +212,14 @@
 func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
 	pool := iobuf.NewPool(0)
 	reader := iobuf.NewReader(pool, conn)
+
+	dischargeClient := getDischargeClient(lopts)
+
+	c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
+	if err != nil {
+		return nil, err
+	}
+
 	var startTimeout time.Duration
 	for _, o := range lopts {
 		switch v := o.(type) {
@@ -221,7 +227,7 @@
 			startTimeout = v.Duration
 		}
 	}
-	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, &crypto.NullControlCipher{})
+	return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
 }
 
 func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
@@ -520,6 +526,7 @@
 	vif.muMsgCounters.Unlock()
 
 	switch m := msg.(type) {
+
 	case *message.Data:
 		_, rq, _ := vif.vcMap.Find(m.VCI)
 		if rq == nil {
@@ -531,6 +538,7 @@
 			vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err)
 			m.Release()
 		}
+
 	case *message.OpenVC:
 		vif.muListen.Lock()
 		closed := vif.acceptor == nil || vif.acceptor.IsClosed()
@@ -569,6 +577,7 @@
 			return nil
 		}
 		go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(vers, vif.principal, vif.blessings, nil, lopts...))
+
 	case *message.SetupVC:
 		// First, find the public key we need out of the message.
 		var theirPK *crypto.BoxKey
@@ -669,8 +678,10 @@
 			return nil
 		}
 		vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
+
 	case *message.AddReceiveBuffers:
 		vif.distributeCounters(m.Counters)
+
 	case *message.OpenFlow:
 		if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
 			if err := vc.AcceptFlow(m.Flow); err != nil {
@@ -684,24 +695,10 @@
 			return nil
 		}
 		vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
-	case *message.Setup:
-		// Configure the VIF.  This takes over the conn during negotiation.
-		if vif.isSetup {
-			return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
-		}
-		vif.muListen.Lock()
-		dischargeClient := getDischargeClient(vif.listenerOpts)
-		vif.muListen.Unlock()
 
-		vif.writeMu.Lock()
-		c, err := AuthenticateAsServer(vif.conn, vif.reader, vif.versions, vif.principal, vif.blessings, dischargeClient, m)
-		if err != nil {
-			vif.writeMu.Unlock()
-			return err
-		}
-		vif.ctrlCipher = c
-		vif.writeMu.Unlock()
-		vif.isSetup = true
+	case *message.Setup:
+		vlog.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif)
+
 	default:
 		vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif)
 	}
@@ -1093,7 +1090,7 @@
 	l := make([]string, 0, len(vcs)+1)
 
 	vif.muNextVCI.Lock() // Needed for vif.nextVCI
-	l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.isSetup, vif.isClosed))
+	l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.ctrlCipher != nullCipher, vif.isClosed))
 	vif.muNextVCI.Unlock()
 
 	for _, vc := range vcs {
diff --git a/profiles/internal/rpc/stream/vif/vif_test.go b/profiles/internal/rpc/stream/vif/vif_test.go
index e630edc..32ccc74 100644
--- a/profiles/internal/rpc/stream/vif/vif_test.go
+++ b/profiles/internal/rpc/stream/vif/vif_test.go
@@ -650,13 +650,9 @@
 		{&iversion.Range{2, 3}, &iversion.Range{3, 5}, &iversion.Range{3, 5}, false, false},
 		{&iversion.Range{2, 3}, &iversion.Range{3, 5}, unknown, false, false},
 
-		// No VIF error because the client does not initiate authentication.
-		{&iversion.Range{2, 3}, &iversion.Range{4, 5}, &iversion.Range{4, 5}, true, false},
-		{&iversion.Range{2, 3}, &iversion.Range{4, 5}, unknown, true, false},
-
-		// VIF error because the client asks for authentication, but the server
-		// doesn't understand it.
-		{&iversion.Range{6, 6}, &iversion.Range{2, 5}, unknown, true, true},
+		// VIF error since there are no versions in common.
+		{&iversion.Range{2, 3}, &iversion.Range{4, 5}, &iversion.Range{4, 5}, true, true},
+		{&iversion.Range{2, 3}, &iversion.Range{4, 5}, unknown, true, true},
 	}
 
 	for _, tc := range tests {
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 63be60b..157ed01 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -26,7 +26,7 @@
 	// change that's not both forward and backward compatible.
 	// Min should be incremented whenever we want to remove
 	// support for old protocol versions.
-	SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion9}
+	SupportedRange = &Range{Min: version.RPCVersion6, Max: version.RPCVersion9}
 
 	// Export the methods on supportedRange.
 	Endpoint           = SupportedRange.Endpoint