ref: Setup messages should always be exchanged by VIFs and should be async.
This change also removes support for RPCVersion5.
MultiPart: 2/2
Change-Id: Ie8d3cb30170ca8fac58594197ad0f28108957e3c
diff --git a/profiles/internal/rpc/client.go b/profiles/internal/rpc/client.go
index ab268dc..f963e1d 100644
--- a/profiles/internal/rpc/client.go
+++ b/profiles/internal/rpc/client.go
@@ -650,7 +650,7 @@
topLevelError = verror.ErrNotTrusted
topLevelAction = verror.NoRetry
onlyErrNetwork = false
- case stream.ErrNetwork.ID:
+ case stream.ErrAborted.ID, stream.ErrNetwork.ID:
// do nothing
default:
onlyErrNetwork = false
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 09d3586..7836963 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -90,7 +90,6 @@
conn net.Conn
pool *iobuf.Pool
reader *iobuf.Reader
- isSetup bool
ctrlCipher crypto.ControlCipher
queue *upcqueue.T
mu sync.RWMutex
@@ -261,12 +260,25 @@
func (p *Proxy) acceptProcess(conn net.Conn) {
pool := iobuf.NewPool(0)
+ reader := iobuf.NewReader(pool, conn)
+
+ var blessings security.Blessings
+ if p.principal != nil {
+ blessings = p.principal.BlessingStore().Default()
+ }
+
+ c, err := vif.AuthenticateAsServer(conn, reader, nil, p.principal, blessings, nil)
+ if err != nil {
+ processLog().Infof("Process %v failed to authenticate: %s", p, err)
+ return
+ }
+
process := &process{
proxy: p,
conn: conn,
pool: pool,
- reader: iobuf.NewReader(pool, conn),
- ctrlCipher: &crypto.NullControlCipher{},
+ reader: reader,
+ ctrlCipher: c,
queue: upcqueue.New(),
routingTable: make(map[id.VC]*destination),
servers: make(map[id.VC]*vc.VC),
@@ -661,26 +673,8 @@
dstprocess.queue.Put(m)
p.proxy.routeCounters(p, counters)
- case *message.Setup:
- // Set up the hop. This takes over the process during negotiation.
- if p.isSetup {
- // Already performed authentication. We don't do it again.
- processLog().Infof("Process %v is already setup", p)
- return
- }
- var blessings security.Blessings
- if p.proxy.principal != nil {
- blessings = p.proxy.principal.BlessingStore().Default()
- }
- c, err := vif.AuthenticateAsServer(p.conn, p.reader, nil, p.proxy.principal, blessings, nil, m)
- if err != nil {
- processLog().Infof("Process %v failed to authenticate: %s", p, err)
- return
- }
- p.ctrlCipher = c
- p.isSetup = true
default:
- processLog().Infof("Closing %v because of unrecognized message %T", p, m)
+ processLog().Infof("Closing %v because of invalid message %T", p, m)
return
}
}
diff --git a/profiles/internal/rpc/stream/vc/auth.go b/profiles/internal/rpc/stream/vc/auth.go
index b74a351..e19d10f 100644
--- a/profiles/internal/rpc/stream/vc/auth.go
+++ b/profiles/internal/rpc/stream/vc/auth.go
@@ -110,10 +110,8 @@
if err := enc.Encode(b); err != nil {
return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
- if v >= version.RPCVersion5 {
- if err := enc.Encode(discharges); err != nil {
- return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
- }
+ if err := enc.Encode(discharges); err != nil {
+ return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
}
msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
if err != nil {
@@ -161,10 +159,8 @@
return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
var discharges []security.Discharge
- if v >= version.RPCVersion5 {
- if err := dec.Decode(&discharges); err != nil {
- return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
- }
+ if err := dec.Decode(&discharges); err != nil {
+ return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
}
if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
return noBlessings, nil, verror.New(stream.ErrSecurity, nil, verror.New(errInvalidSignatureInMessage, nil))
diff --git a/profiles/internal/rpc/stream/vif/auth.go b/profiles/internal/rpc/stream/vif/auth.go
index 4f7243a..65bec0b 100644
--- a/profiles/internal/rpc/stream/vif/auth.go
+++ b/profiles/internal/rpc/stream/vif/auth.go
@@ -44,20 +44,19 @@
//
// The sequence is initiated by the client.
//
-// - If the versions include RPCVersion6 or greater, the client sends a
-// Setup message to the server, containing the client's supported
-// versions, and the client's crypto options. The Setup message
+// - The client sends a Setup message to the server, containing the client's
+// supported versions, and the client's crypto options. The Setup message
// is sent in the clear.
//
// - When the server receives the Setup message, it calls
// AuthenticateAsServer, which constructs a response Setup containing
// the server's version range, and any crypto options.
//
-// - For RPCVersion6 and RPCVersion7, the client and server generate fresh
-// public/private key pairs, sending the public key to the peer as a crypto
-// option. The remainder of the communication is encrypted as
-// SetupStream messages using NewControlCipherRPC6, which is based on
-// code.google.com/p/go.crypto/nacl/box.
+// - The client and server use the public/private key pairs
+// generated for the Setup messages to create an encrypted stream
+// of SetupStream messages for the remainder of the authentication
+// setup. The encyrption uses NewControlCipherRPC6, which is based
+// on code.google.com/p/go.crypto/nacl/box.
//
// - Once the encrypted SetupStream channel is setup, the client and
// server authenticate using the vc.AuthenticateAs{Client,Server} protocol.
@@ -66,36 +65,23 @@
// modification by a man-in-the-middle, which can currently force a downgrade by
// modifying the acceptable version ranges downward. This can be addressed by
// including a hash of the Setup message in the encrypted stream. It is
-// likely that this will be addressed in subsequent protocol versions (or it may
-// not be addressed at all if RPCVersion6 becomes the only supported version).
+// likely that this will be addressed in subsequent protocol versions.
func AuthenticateAsClient(writer io.Writer, reader *iobuf.Reader, versions *version.Range, params security.CallParams, auth *vc.ServerAuthorizer) (crypto.ControlCipher, error) {
if versions == nil {
versions = version.SupportedRange
}
- if params.LocalPrincipal == nil {
- // If there is no principal, we do not support encryption/authentication.
- // TODO(ashankar, mattr): We should still exchange version information even
- // if we don't do encryption.
- var err error
- versions, err = versions.Intersect(&version.Range{Min: 0, Max: rpcversion.RPCVersion5})
- if err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
- }
- }
- if versions.Max < rpcversion.RPCVersion6 {
- return nullCipher, nil
- }
- // The client has not yet sent its public data. Construct it and send it.
- pvt, pub, err := makeSetup(versions)
+ // Send the client's public data.
+ pvt, pub, err := makeSetup(versions, params.LocalPrincipal != nil)
if err != nil {
return nil, verror.New(stream.ErrSecurity, nil, err)
}
- if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
- return nil, verror.New(stream.ErrNetwork, nil, err)
- }
- // Read the server's public data.
+ errch := make(chan error, 1)
+ go func() {
+ errch <- message.WriteTo(writer, pub, nullCipher)
+ }()
+
pmsg, err := message.ReadFrom(reader, nullCipher)
if err != nil {
return nil, verror.New(stream.ErrNetwork, nil, err)
@@ -105,25 +91,28 @@
return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
}
+ // Wait for the write to succeed.
+ if err := <-errch; err != nil {
+ return nil, verror.New(stream.ErrNetwork, nil, err)
+ }
+
// Choose the max version in the intersection.
vrange, err := pub.Versions.Intersect(&ppub.Versions)
if err != nil {
return nil, verror.New(stream.ErrNetwork, nil, err)
}
v := vrange.Max
- if v < rpcversion.RPCVersion6 {
+
+ if params.LocalPrincipal == nil {
return nullCipher, nil
}
// Perform the authentication.
- return authenticateAsClient(writer, reader, params, auth, &pvt, &pub, ppub, v)
+ return authenticateAsClient(writer, reader, params, auth, pvt, pub, ppub, v)
}
func authenticateAsClient(writer io.Writer, reader *iobuf.Reader, params security.CallParams, auth *vc.ServerAuthorizer,
pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
- if version < rpcversion.RPCVersion6 {
- return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
- }
pbox := ppub.NaclBox()
if pbox == nil {
return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
@@ -143,51 +132,69 @@
//
// See AuthenticateAsClient for a description of the negotiation.
func AuthenticateAsServer(writer io.Writer, reader *iobuf.Reader, versions *version.Range, principal security.Principal, lBlessings security.Blessings,
- dc vc.DischargeClient, ppub *message.Setup) (crypto.ControlCipher, error) {
+ dc vc.DischargeClient) (crypto.ControlCipher, error) {
var err error
if versions == nil {
versions = version.SupportedRange
}
- if principal == nil {
- // If we're not encrypting the connection we can just send them
- // our version information.
- pub := &message.Setup{
- Versions: *versions,
- }
- if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
- return nil, err
- }
- _, err := versions.Intersect(&ppub.Versions)
- return nullCipher, err
+ // Send server's public data.
+ pvt, pub, err := makeSetup(versions, principal != nil)
+ if err != nil {
+ return nil, err
}
- // Create our public data and send it to the client.
- pvt, pub, err := makeSetup(versions)
+ errch := make(chan error, 1)
+ readch := make(chan struct{})
+ go func() {
+ // TODO(mattr,ribrdb): In the case of the agent, which is
+ // currently the only user of insecure connections, we need to
+ // wait for the client to initiate the communication. The agent
+ // sends an extra first byte to clients, which clients read before
+ // dialing their side of the vif. If we send this message before
+ // the magic byte has been sent the client will use the first
+ // byte of this message instead rendering the remainder of the
+ // stream uninterpretable.
+ if principal == nil {
+ <-readch
+ }
+ err := message.WriteTo(writer, pub, nullCipher)
+ errch <- err
+ }()
+
+ // Read client's public data.
+ pmsg, err := message.ReadFrom(reader, nullCipher)
+ close(readch) // Note: we need to close this whether we get an error or not.
if err != nil {
+ return nil, verror.New(stream.ErrNetwork, nil, err)
+ }
+ ppub, ok := pmsg.(*message.Setup)
+ if !ok {
+ return nil, verror.New(stream.ErrSecurity, nil, verror.New(errVersionNegotiationFailed, nil))
+ }
+
+ // Wait for the write to succeed.
+ if err := <-errch; err != nil {
return nil, err
}
- if err := message.WriteTo(writer, &pub, nullCipher); err != nil {
- return nil, err
- }
+
+ // Choose the max version in the intersection.
vrange, err := versions.Intersect(&ppub.Versions)
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, nil, err)
}
v := vrange.Max
- if v < rpcversion.RPCVersion6 {
+
+ if principal == nil {
return nullCipher, nil
}
// Perform authentication.
- return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, &pvt, &pub, ppub, v)
+ return authenticateAsServerRPC6(writer, reader, principal, lBlessings, dc, pvt, pub, ppub, v)
}
func authenticateAsServerRPC6(writer io.Writer, reader *iobuf.Reader, principal security.Principal, lBlessings security.Blessings, dc vc.DischargeClient,
pvt *privateData, pub, ppub *message.Setup, version rpcversion.RPCVersion) (crypto.ControlCipher, error) {
- if version < rpcversion.RPCVersion6 {
- return nil, verror.New(errUnsupportedEncryptVersion, nil, version, rpcversion.RPCVersion6)
- }
box := ppub.NaclBox()
if box == nil {
return nil, verror.New(errNaclBoxVersionNegotiationFailed, nil)
@@ -215,14 +222,24 @@
}
// makeSetup constructs the options that this process can support.
-func makeSetup(versions *version.Range) (pvt privateData, pub message.Setup, err error) {
- pub.Versions = *versions
- var pubKey, pvtKey *[32]byte
- pubKey, pvtKey, err = box.GenerateKey(rand.Reader)
- if err != nil {
- return
+func makeSetup(versions *version.Range, secure bool) (*privateData, *message.Setup, error) {
+ var options []message.SetupOption
+ var pvt *privateData
+ if secure {
+ pubKey, pvtKey, err := box.GenerateKey(rand.Reader)
+ if err != nil {
+ return nil, nil, err
+ }
+ options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
+ pvt = &privateData{
+ naclBoxPrivateKey: *pvtKey,
+ }
}
- pub.Options = append(pub.Options, &message.NaclBox{PublicKey: *pubKey})
- pvt.naclBoxPrivateKey = *pvtKey
- return
+
+ pub := &message.Setup{
+ Versions: *versions,
+ Options: options,
+ }
+
+ return pvt, pub, nil
}
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index eb4776c..e3175e2 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -80,8 +80,6 @@
reader *iobuf.Reader
localEP naming.Endpoint
- // control channel encryption.
- isSetup bool
// ctrlCipher is normally guarded by writeMu, however see the exception in
// readLoop.
ctrlCipher crypto.ControlCipher
@@ -183,7 +181,7 @@
// server and not the remote endpoint of the VIF.
c, err := AuthenticateAsClient(conn, reader, versions, params, nil)
if err != nil {
- return nil, err
+ return nil, verror.New(stream.ErrNetwork, ctx, err)
}
var blessings security.Blessings
@@ -214,6 +212,14 @@
func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
pool := iobuf.NewPool(0)
reader := iobuf.NewReader(pool, conn)
+
+ dischargeClient := getDischargeClient(lopts)
+
+ c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
+ if err != nil {
+ return nil, err
+ }
+
var startTimeout time.Duration
for _, o := range lopts {
switch v := o.(type) {
@@ -221,7 +227,7 @@
startTimeout = v.Duration
}
}
- return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, &crypto.NullControlCipher{})
+ return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
}
func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
@@ -520,6 +526,7 @@
vif.muMsgCounters.Unlock()
switch m := msg.(type) {
+
case *message.Data:
_, rq, _ := vif.vcMap.Find(m.VCI)
if rq == nil {
@@ -531,6 +538,7 @@
vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err)
m.Release()
}
+
case *message.OpenVC:
vif.muListen.Lock()
closed := vif.acceptor == nil || vif.acceptor.IsClosed()
@@ -569,6 +577,7 @@
return nil
}
go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(vers, vif.principal, vif.blessings, nil, lopts...))
+
case *message.SetupVC:
// First, find the public key we need out of the message.
var theirPK *crypto.BoxKey
@@ -669,8 +678,10 @@
return nil
}
vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
+
case *message.AddReceiveBuffers:
vif.distributeCounters(m.Counters)
+
case *message.OpenFlow:
if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
if err := vc.AcceptFlow(m.Flow); err != nil {
@@ -684,24 +695,10 @@
return nil
}
vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
- case *message.Setup:
- // Configure the VIF. This takes over the conn during negotiation.
- if vif.isSetup {
- return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
- }
- vif.muListen.Lock()
- dischargeClient := getDischargeClient(vif.listenerOpts)
- vif.muListen.Unlock()
- vif.writeMu.Lock()
- c, err := AuthenticateAsServer(vif.conn, vif.reader, vif.versions, vif.principal, vif.blessings, dischargeClient, m)
- if err != nil {
- vif.writeMu.Unlock()
- return err
- }
- vif.ctrlCipher = c
- vif.writeMu.Unlock()
- vif.isSetup = true
+ case *message.Setup:
+ vlog.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif)
+
default:
vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif)
}
@@ -1093,7 +1090,7 @@
l := make([]string, 0, len(vcs)+1)
vif.muNextVCI.Lock() // Needed for vif.nextVCI
- l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.isSetup, vif.isClosed))
+ l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.ctrlCipher != nullCipher, vif.isClosed))
vif.muNextVCI.Unlock()
for _, vc := range vcs {
diff --git a/profiles/internal/rpc/stream/vif/vif_test.go b/profiles/internal/rpc/stream/vif/vif_test.go
index e630edc..32ccc74 100644
--- a/profiles/internal/rpc/stream/vif/vif_test.go
+++ b/profiles/internal/rpc/stream/vif/vif_test.go
@@ -650,13 +650,9 @@
{&iversion.Range{2, 3}, &iversion.Range{3, 5}, &iversion.Range{3, 5}, false, false},
{&iversion.Range{2, 3}, &iversion.Range{3, 5}, unknown, false, false},
- // No VIF error because the client does not initiate authentication.
- {&iversion.Range{2, 3}, &iversion.Range{4, 5}, &iversion.Range{4, 5}, true, false},
- {&iversion.Range{2, 3}, &iversion.Range{4, 5}, unknown, true, false},
-
- // VIF error because the client asks for authentication, but the server
- // doesn't understand it.
- {&iversion.Range{6, 6}, &iversion.Range{2, 5}, unknown, true, true},
+ // VIF error since there are no versions in common.
+ {&iversion.Range{2, 3}, &iversion.Range{4, 5}, &iversion.Range{4, 5}, true, true},
+ {&iversion.Range{2, 3}, &iversion.Range{4, 5}, unknown, true, true},
}
for _, tc := range tests {
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 63be60b..157ed01 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -26,7 +26,7 @@
// change that's not both forward and backward compatible.
// Min should be incremented whenever we want to remove
// support for old protocol versions.
- SupportedRange = &Range{Min: version.RPCVersion5, Max: version.RPCVersion9}
+ SupportedRange = &Range{Min: version.RPCVersion6, Max: version.RPCVersion9}
// Export the methods on supportedRange.
Endpoint = SupportedRange.Endpoint