ref: Move to NaclBox for channel encryption and move versions out of endpoints.
This change does several things:
1) Create a new endpoint version (version 5) this will eventually
replace all previous endpoint version and the only valid format.
The difference from v4 is the removal of versioning information.
2) Switch to NaclBox for channel encryption.
3) Change the low level protocol to use SetupVC instead of OpenVC for
new messages. This does version and encryption negotiation all
at once as the first thing on a new VC..
There are several reasons for this change:
1) We save round-trips vs TLS.
2) Versioning is reliable since it's negotiated. This is compared
to now where the root mounttable must be updated before new
clients are deployed. This has proven an unrealistic requirement.
Old benchmarks:
Benchmark_dial_VC_TLS 100 36758152 ns/op
--- Histogram (unit: ms)
Count: 100 Min: 22 Max: 49 Avg: 36.21
------------------------------------------------------------
[ 22, 23) 1 1.0% 1.0%
[ 23, 24) 0 0.0% 1.0%
[ 24, 25) 0 0.0% 1.0%
[ 25, 26) 0 0.0% 1.0%
[ 26, 28) 0 0.0% 1.0%
[ 28, 31) 5 5.0% 6.0% #
[ 31, 34) 38 38.0% 44.0% ####
[ 34, 38) 14 14.0% 58.0% #
[ 38, 43) 28 28.0% 86.0% ###
[ 43, 50) 14 14.0% 100.0% #
[ 50, 59) 0 0.0% 100.0%
[ 59, 70) 0 0.0% 100.0%
[ 70, 83) 0 0.0% 100.0%
[ 83, 100) 0 0.0% 100.0%
[100, 121) 0 0.0% 100.0%
[121, 148) 0 0.0% 100.0%
[148, inf) 0 0.0% 100.0%
Benchmark_throughput_Flow_1VIF_1VC_1Flow 50000 36354 ns/op 1408.36 MB/s
Benchmark_throughput_Flow_1VIF_1VC_2Flow 50000 36849 ns/op 1389.43 MB/s
Benchmark_throughput_Flow_1VIF_1VC_8Flow 30000 44898 ns/op 1140.35 MB/s
Benchmark_throughput_Flow_1VIF_2VC_2Flow 50000 37180 ns/op 1377.06 MB/s
Benchmark_throughput_Flow_1VIF_2VC_8Flow 30000 44599 ns/op 1147.99 MB/s
Benchmark_throughput_Flow_2VIF_4VC_8Flow 30000 44945 ns/op 1139.15 MB/s
Benchmark_throughput_Flow_1VIF_1VC_1FlowTLS 3000 543051 ns/op 94.28 MB/s
Benchmark_throughput_Flow_1VIF_1VC_2FlowTLS 3000 534945 ns/op 95.71 MB/s
Benchmark_throughput_Flow_1VIF_1VC_8FlowTLS 3000 552158 ns/op 92.73 MB/s
Benchmark_throughput_Flow_1VIF_2VC_2FlowTLS 3000 760363 ns/op 67.34 MB/s
Benchmark_throughput_Flow_1VIF_2VC_8FlowTLS 3000 462552 ns/op 110.69 MB/s
Benchmark_throughput_Flow_2VIF_4VC_8FlowTLS 3000 463113 ns/op 110.56 MB/s
ok v.io/x/ref/profiles/internal/rpc/stream/benchmark 37.809s
New benchmarks:
--- Histogram (unit: ms)
Count: 100 Min: 14 Max: 39 Avg: 30.63
------------------------------------------------------------
[ 14, 15) 1 1.0% 1.0%
[ 15, 16) 0 0.0% 1.0%
[ 16, 17) 0 0.0% 1.0%
[ 17, 18) 0 0.0% 1.0%
[ 18, 20) 0 0.0% 1.0%
[ 20, 22) 0 0.0% 1.0%
[ 22, 25) 0 0.0% 1.0%
[ 25, 29) 0 0.0% 1.0%
[ 29, 34) 97 97.0% 98.0% ##########
[ 34, 40) 2 2.0% 100.0%
[ 40, 48) 0 0.0% 100.0%
[ 48, 58) 0 0.0% 100.0%
[ 58, 71) 0 0.0% 100.0%
[ 71, 87) 0 0.0% 100.0%
[ 87, 107) 0 0.0% 100.0%
[107, 131) 0 0.0% 100.0%
[131, inf) 0 0.0% 100.0%
Benchmark_throughput_Flow_1VIF_1VC_1Flow 50000 32806 ns/op 1560.65 MB/s
Benchmark_throughput_Flow_1VIF_1VC_2Flow 50000 31752 ns/op 1612.47 MB/s
Benchmark_throughput_Flow_1VIF_1VC_8Flow 50000 39765 ns/op 1287.54 MB/s
Benchmark_throughput_Flow_1VIF_2VC_2Flow 50000 31967 ns/op 1601.62 MB/s
Benchmark_throughput_Flow_1VIF_2VC_8Flow 50000 39513 ns/op 1295.74 MB/s
Benchmark_throughput_Flow_2VIF_4VC_8Flow 30000 40676 ns/op 1258.72 MB/s
Benchmark_throughput_Flow_1VIF_1VC_1FlowTLS 10000 237259 ns/op 215.80 MB/s
Benchmark_throughput_Flow_1VIF_1VC_2FlowTLS 10000 233769 ns/op 219.02 MB/s
Benchmark_throughput_Flow_1VIF_1VC_8FlowTLS 10000 244584 ns/op 209.33 MB/s
Benchmark_throughput_Flow_1VIF_2VC_2FlowTLS 10000 235281 ns/op 217.61 MB/s
Benchmark_throughput_Flow_1VIF_2VC_8FlowTLS 10000 238344 ns/op 214.81 MB/s
Benchmark_throughput_Flow_2VIF_4VC_8FlowTLS 10000 239573 ns/op 213.71 MB/s
ok v.io/x/ref/profiles/internal/rpc/stream/benchmark 39.893s
MultiPart: 2/2
Change-Id: Ia2397c445116d12d0b037ad65f686ddd9846f33b
diff --git a/profiles/internal/rpc/stream/vif/vif.go b/profiles/internal/rpc/stream/vif/vif.go
index dcd37b4..e59a153 100644
--- a/profiles/internal/rpc/stream/vif/vif.go
+++ b/profiles/internal/rpc/stream/vif/vif.go
@@ -20,6 +20,7 @@
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vtrace"
@@ -36,7 +37,7 @@
"v.io/x/ref/profiles/internal/rpc/stream/id"
"v.io/x/ref/profiles/internal/rpc/stream/message"
"v.io/x/ref/profiles/internal/rpc/stream/vc"
- "v.io/x/ref/profiles/internal/rpc/version"
+ iversion "v.io/x/ref/profiles/internal/rpc/version"
)
const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif"
@@ -115,7 +116,7 @@
// non-nil only in testing. nil is equivalent to using the versions
// actually supported by this RPC implementation (which is always
// what you want outside of tests).
- versions *version.Range
+ versions *iversion.Range
isClosedMu sync.Mutex
isClosed bool // GUARDED_BY(isClosedMu)
@@ -164,7 +165,7 @@
// As the name suggests, this method is intended for use only within packages
// placed inside v.io/x/ref/profiles/internal. Code outside the
// v.io/x/ref/profiles/internal/* packages should never call this method.
-func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, versions *version.Range, onClose func(*VIF), opts ...stream.VCOpt) (*VIF, error) {
+func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, versions *iversion.Range, onClose func(*VIF), opts ...stream.VCOpt) (*VIF, error) {
ctx := getDialContext(opts)
if ctx != nil {
var span vtrace.Span
@@ -210,7 +211,7 @@
// As the name suggests, this method is intended for use only within packages
// placed inside v.io/x/ref/profiles/internal. Code outside the
// v.io/x/ref/profiles/internal/* packages should never call this method.
-func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *version.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
+func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
pool := iobuf.NewPool(0)
reader := iobuf.NewReader(pool, conn)
var startTimeout time.Duration
@@ -223,7 +224,7 @@
return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, &crypto.NullControlCipher{})
}
-func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *version.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
+func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
var (
// Choose IDs that will not conflict with any other (VC, Flow)
// pairs. VCI 0 is never used by the application (it is
@@ -253,6 +254,10 @@
}
stopQ.Release(-1) // Disable flow control
+ if versions == nil {
+ versions = iversion.SupportedRange
+ }
+
vif := &VIF{
conn: conn,
pool: pool,
@@ -304,27 +309,52 @@
}
counters := message.NewCounters()
counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow)
- // TODO(ashankar,mattr): If remoteEP/localEP version ranges allow, then
- // use message.SetupVC instead of message.OpenVC.
- // Rough outline:
- // (1) Switch to NaclBox for VC encryption (thus the VC handshake will
- // no longer require the TLS flow and roundtrips for that).
- // (2) Send an appropriate SetupVC message in response to a received
- // SetupVC message.
- // (3) Use the SetupVC received from the remote end to establish the
- // exact protocol version to use.
- err = vif.sendOnExpressQ(&message.OpenVC{
- VCI: vc.VCI(),
- DstEndpoint: remoteEP,
- SrcEndpoint: vif.localEP,
- Counters: counters})
- if err != nil {
- vif.deleteVC(vc.VCI())
- err = verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
- vc.Close(err)
- return nil, err
+
+ ver, err := vif.versions.CommonVersion(vif.localEP, remoteEP)
+
+ switch {
+ case verror.ErrorID(err) == iversion.ErrDeprecatedVersion.ID || ver >= version.RPCVersion9:
+ sendPublicKey := func(pubKey *crypto.BoxKey) error {
+ var options []message.SetupOption
+ if pubKey != nil {
+ options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
+ }
+ err := vif.sendOnExpressQ(&message.SetupVC{
+ VCI: vc.VCI(),
+ RemoteEndpoint: remoteEP,
+ LocalEndpoint: vif.localEP,
+ Counters: counters,
+ Setup: message.Setup{
+ Versions: *vif.versions,
+ Options: options,
+ },
+ })
+ if err != nil {
+ err = verror.New(stream.ErrNetwork, nil,
+ verror.New(errSendOnExpressQFailed, nil, err))
+ }
+ return err
+ }
+ if err = vc.HandshakeDialedVC(principal, ver, sendPublicKey, opts...); err != nil {
+ break
+ }
+ case err != nil:
+ break
+ default:
+ err = vif.sendOnExpressQ(&message.OpenVC{
+ VCI: vc.VCI(),
+ DstEndpoint: remoteEP,
+ SrcEndpoint: vif.localEP,
+ Counters: counters})
+ if err != nil {
+ err = verror.New(stream.ErrNetwork, nil, verror.New(errSendOnExpressQFailed, nil, err))
+ break
+ }
+ if err = vc.HandshakeDialedVC(principal, ver, nil, opts...); err != nil {
+ break
+ }
}
- if err := vc.HandshakeDialedVC(principal, opts...); err != nil {
+ if err != nil {
vif.deleteVC(vc.VCI())
vc.Close(err)
return nil, err
@@ -530,16 +560,103 @@
})
return nil
}
- go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(vif.principal, vif.blessings, lopts...))
+ vers, err := vif.versions.CommonVersion(m.DstEndpoint, m.SrcEndpoint)
+ if err != nil {
+ vif.sendOnExpressQ(&message.CloseVC{
+ VCI: m.VCI,
+ Error: err.Error(),
+ })
+ return nil
+ }
+ go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(vers, vif.principal, vif.blessings, nil, lopts...))
case *message.SetupVC:
- // TODO(ashankar,mattr): Handle this! See comment about SetupVC
- // in vif.Dial
+ // First, find the public key we need out of the message.
+ var theirPK *crypto.BoxKey
+ box := m.Setup.NaclBox()
+ if box != nil {
+ theirPK = &box.PublicKey
+ }
+
+ // If we dialed this VC, then this is a response and we should finish
+ // the vc handshake. Otherwise, this message is opening a new VC.
+ if vif.dialedVCI(m.VCI) {
+ vif.distributeCounters(m.Counters)
+ if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
+ intersection, err := vif.versions.Intersect(&m.Setup.Versions)
+ if err != nil {
+ vif.closeVCAndSendMsg(vc, false, err)
+ } else if err := vc.FinishHandshakeDialedVC(intersection.Max, theirPK); err != nil {
+ vif.closeVCAndSendMsg(vc, false, err)
+ }
+ return nil
+ }
+ vlog.VI(2).Infof("Ignoring SetupVC message %+v for unknown dialed VC", m)
+ return nil
+ }
+
+ // This is an accepted VC.
+ intersection, err := vif.versions.Intersect(&m.Setup.Versions)
+ if err != nil {
+ vlog.VI(2).Infof("SetupVC message %+v to VIF %s did not present compatible versions: %v", m, vif, err)
+ vif.sendOnExpressQ(&message.CloseVC{
+ VCI: m.VCI,
+ Error: err.Error(),
+ })
+ return nil
+ }
+ vif.muListen.Lock()
+ closed := vif.acceptor == nil || vif.acceptor.IsClosed()
+ lopts := vif.listenerOpts
+ vif.muListen.Unlock()
+ if closed {
+ vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif)
+ vif.sendOnExpressQ(&message.CloseVC{
+ VCI: m.VCI,
+ Error: "VCs not accepted",
+ })
+ return nil
+ }
+ var idleTimeout time.Duration
+ for _, o := range lopts {
+ switch v := o.(type) {
+ case vc.IdleTimeout:
+ idleTimeout = v.Duration
+ }
+ }
+ vc, err := vif.newVC(m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false)
+ if err != nil {
+ vif.sendOnExpressQ(&message.CloseVC{
+ VCI: m.VCI,
+ Error: err.Error(),
+ })
+ return nil
+ }
vif.distributeCounters(m.Counters)
- vif.sendOnExpressQ(&message.CloseVC{
- VCI: m.VCI,
- Error: "SetupVC handling not implemented yet",
- })
- vlog.VI(2).Infof("Received SetupVC message, but handling not yet implemented")
+ keyExchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
+ var options []message.SetupOption
+ if pubKey != nil {
+ options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
+ }
+ err = vif.sendOnExpressQ(&message.SetupVC{
+ VCI: m.VCI,
+ Setup: message.Setup{
+ // Note that servers send clients not their actual supported versions,
+ // but the intersected range of the server and client ranges. This
+ // is important because proxies may have adjusted the version ranges
+ // along the way, and we should negotiate a version that is compatible
+ // with all intermediate hops.
+ Versions: *intersection,
+ Options: options,
+ },
+ RemoteEndpoint: m.LocalEndpoint,
+ LocalEndpoint: vif.localEP,
+ // TODO(mattr): Consider adding counters. See associated comment
+ // in vc.go:VC.HandshakeAcceptedVC for more details.
+ })
+ return theirPK, err
+ }
+ go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(intersection.Max, vif.principal, vif.blessings, keyExchanger, lopts...))
+
case *message.CloseVC:
if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
vif.deleteVC(vc.VCI())
@@ -567,7 +684,7 @@
return nil
}
vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
- case *message.HopSetup:
+ case *message.Setup:
// Configure the VIF. This takes over the conn during negotiation.
if vif.isSetup {
return verror.New(stream.ErrNetwork, nil, verror.New(errVIFAlreadySetup, nil))
@@ -601,7 +718,7 @@
}
m := qm.(*message.Data)
if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
- vlog.VI(2).Infof("Ignoring data message %v for on VIF %v: %v", m, vif, err)
+ vlog.VI(2).Infof("Ignoring data message %v for on VIF %s: %v", m, vif, err)
}
if m.Close() {
vif.shutdownFlow(vc, m.Flow)
@@ -852,6 +969,10 @@
}
}
+func (vif *VIF) dialedVCI(VCI id.VC) bool {
+ return vif.nextVCI%2 == VCI%2
+}
+
func (vif *VIF) allocVCI() id.VC {
vif.muNextVCI.Lock()
ret := vif.nextVCI
@@ -861,13 +982,6 @@
}
func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, dialed bool) (*vc.VC, error) {
- version, err := version.CommonVersion(localEP, remoteEP)
- if vif.versions != nil {
- version, err = vif.versions.CommonVersion(localEP, remoteEP)
- }
- if err != nil {
- return nil, err
- }
vif.muStartTimer.Lock()
if vif.startTimer != nil {
vif.startTimer.Stop()
@@ -889,7 +1003,6 @@
Pool: vif.pool,
ReserveBytes: uint(message.HeaderSizeBytes + macSize),
Helper: vcHelper{vif},
- Version: version,
})
added, rq, wq := vif.vcMap.Insert(vc)
if added {
@@ -1061,9 +1174,9 @@
// The addition of the endpoints is left as an excercise to higher layers of
// the stack, where the desire to share or hide blessings from the endpoint is
// clearer.
-func localEP(conn net.Conn, rid naming.RoutingID, versions *version.Range) naming.Endpoint {
+func localEP(conn net.Conn, rid naming.RoutingID, versions *iversion.Range) naming.Endpoint {
localAddr := conn.LocalAddr()
- ep := version.Endpoint(localAddr.Network(), localAddr.String(), rid)
+ ep := iversion.Endpoint(localAddr.Network(), localAddr.String(), rid)
if versions != nil {
ep = versions.Endpoint(localAddr.Network(), localAddr.String(), rid)
}