Merge "ref: Add ep version 6 that adds supports for Routes."
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index f31bd9a..1abdae7 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -12,8 +12,8 @@
"v.io/v23/security"
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/apilog"
- vsecurity "v.io/x/ref/lib/security"
tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
+ "v.io/x/ref/test/testutil"
)
type contextKey int
@@ -30,11 +30,7 @@
}
func new(ctx *context.T) (*Runtime, *context.T, v23.Shutdown, error) {
- p, err := vsecurity.NewPrincipal()
- if err != nil {
- return nil, nil, func() {}, err
- }
- ctx = context.WithValue(ctx, principalKey, p)
+ ctx = context.WithValue(ctx, principalKey, testutil.NewPrincipal("fake"))
ctx = context.WithLogger(ctx, logger.Global())
return &Runtime{ns: tnaming.NewSimpleNamespace()}, ctx, func() {}, nil
}
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
new file mode 100644
index 0000000..eff876d
--- /dev/null
+++ b/runtime/internal/flow/conn/auth.go
@@ -0,0 +1,277 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "crypto/rand"
+ "reflect"
+ "sync"
+
+ "golang.org/x/crypto/nacl/box"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/rpc/version"
+ "v.io/v23/security"
+ "v.io/v23/vom"
+)
+
+func (c *Conn) dialHandshake(ctx *context.T, versions version.RPCVersionRange) error {
+ binding, err := c.setup(ctx, versions)
+ if err != nil {
+ return err
+ }
+ c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
+ if err = c.readRemoteAuth(ctx, binding); err != nil {
+ return err
+ }
+ if c.rBlessings.IsZero() {
+ return NewErrAcceptorBlessingsMissing(ctx)
+ }
+ signedBinding, err := v23.GetPrincipal(ctx).Sign(binding)
+ if err != nil {
+ return err
+ }
+ lAuth := &auth{
+ channelBinding: signedBinding,
+ }
+ // We only send our blessings if we are a server in addition to being a client.
+ // If we are a pure client, we only send our public key.
+ if c.handler != nil {
+ bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
+ if err != nil {
+ return err
+ }
+ lAuth.bkey, lAuth.dkey = bkey, dkey
+ } else {
+ lAuth.publicKey = c.lBlessings.PublicKey()
+ }
+ if err = c.mp.writeMsg(ctx, lAuth); err != nil {
+ return err
+ }
+ return err
+}
+
+func (c *Conn) acceptHandshake(ctx *context.T, versions version.RPCVersionRange) error {
+ binding, err := c.setup(ctx, versions)
+ if err != nil {
+ return err
+ }
+ c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), false)
+ signedBinding, err := v23.GetPrincipal(ctx).Sign(binding)
+ if err != nil {
+ return err
+ }
+ bkey, dkey, err := c.blessingsFlow.put(ctx, c.lBlessings, c.lDischarges)
+ if err != nil {
+ return err
+ }
+ err = c.mp.writeMsg(ctx, &auth{
+ bkey: bkey,
+ dkey: dkey,
+ channelBinding: signedBinding,
+ })
+ if err != nil {
+ return err
+ }
+ return c.readRemoteAuth(ctx, binding)
+}
+
+func (c *Conn) setup(ctx *context.T, versions version.RPCVersionRange) ([]byte, error) {
+ pk, sk, err := box.GenerateKey(rand.Reader)
+ if err != nil {
+ return nil, err
+ }
+ lSetup := &setup{
+ versions: versions,
+ peerLocalEndpoint: c.local,
+ peerNaClPublicKey: pk,
+ }
+ if c.remote != nil {
+ lSetup.peerRemoteEndpoint = c.remote
+ }
+ ch := make(chan error)
+ go func() {
+ ch <- c.mp.writeMsg(ctx, lSetup)
+ }()
+ msg, err := c.mp.readMsg(ctx)
+ if err != nil {
+ return nil, NewErrRecv(ctx, "unknown", err)
+ }
+ rSetup, valid := msg.(*setup)
+ if !valid {
+ return nil, NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String())
+ }
+ if err := <-ch; err != nil {
+ return nil, NewErrSend(ctx, "setup", c.remote.String(), err)
+ }
+ if c.version, err = version.CommonVersion(ctx, lSetup.versions, rSetup.versions); err != nil {
+ return nil, err
+ }
+ // TODO(mattr): Decide which endpoints to actually keep, the ones we know locally
+ // or what the remote side thinks.
+ if rSetup.peerRemoteEndpoint != nil {
+ c.local = rSetup.peerRemoteEndpoint
+ }
+ if rSetup.peerLocalEndpoint != nil {
+ c.remote = rSetup.peerLocalEndpoint
+ }
+ if rSetup.peerNaClPublicKey == nil {
+ return nil, NewErrMissingSetupOption(ctx, peerNaClPublicKeyOption)
+ }
+ return c.mp.setupEncryption(ctx, pk, sk, rSetup.peerNaClPublicKey), nil
+}
+
+func (c *Conn) readRemoteAuth(ctx *context.T, binding []byte) error {
+ var rauth *auth
+ for {
+ msg, err := c.mp.readMsg(ctx)
+ if err != nil {
+ return NewErrRecv(ctx, c.remote.String(), err)
+ }
+ if rauth, _ = msg.(*auth); rauth != nil {
+ break
+ }
+ if err = c.handleMessage(ctx, msg); err != nil {
+ return err
+ }
+ }
+ var rPublicKey security.PublicKey
+ if rauth.bkey != 0 {
+ var err error
+ // TODO(mattr): Make sure we cancel out of this at some point.
+ c.rBlessings, c.rDischarges, err = c.blessingsFlow.get(ctx, rauth.bkey, rauth.dkey)
+ if err != nil {
+ return err
+ }
+ rPublicKey = c.rBlessings.PublicKey()
+ } else {
+ rPublicKey = rauth.publicKey
+ }
+ if rPublicKey == nil {
+ return NewErrNoPublicKey(ctx)
+ }
+ if !rauth.channelBinding.Verify(rPublicKey, binding) {
+ return NewErrInvalidChannelBinding(ctx)
+ }
+ return nil
+}
+
+type blessingsFlow struct {
+ enc *vom.Encoder
+ dec *vom.Decoder
+
+ mu sync.Mutex
+ cond *sync.Cond
+ closed bool
+ nextKey uint64
+ byUID map[string]*Blessings
+ byBKey map[uint64]*Blessings
+}
+
+func newBlessingsFlow(ctx *context.T, f flow.Flow, dialed bool) *blessingsFlow {
+ b := &blessingsFlow{
+ enc: vom.NewEncoder(f),
+ dec: vom.NewDecoder(f),
+ nextKey: 1,
+ byUID: make(map[string]*Blessings),
+ byBKey: make(map[uint64]*Blessings),
+ }
+ b.cond = sync.NewCond(&b.mu)
+ if !dialed {
+ b.nextKey++
+ }
+ go b.readLoop(ctx)
+ return b
+}
+
+func (b *blessingsFlow) put(ctx *context.T, blessings security.Blessings, discharges map[string]security.Discharge) (bkey, dkey uint64, err error) {
+ defer b.mu.Unlock()
+ b.mu.Lock()
+ buid := string(blessings.UniqueID())
+ element, has := b.byUID[buid]
+ if has && equalDischarges(discharges, element.Discharges) {
+ return element.BKey, element.DKey, nil
+ }
+ defer b.cond.Broadcast()
+ if has {
+ element.Discharges = dischargeList(discharges)
+ element.DKey = b.nextKey
+ b.nextKey += 2
+ return element.BKey, element.DKey, b.enc.Encode(Blessings{
+ Discharges: element.Discharges,
+ DKey: element.DKey,
+ })
+ }
+ element = &Blessings{
+ Blessings: blessings,
+ Discharges: dischargeList(discharges),
+ BKey: b.nextKey,
+ }
+ b.nextKey += 2
+ if len(discharges) > 0 {
+ element.DKey = b.nextKey
+ b.nextKey += 2
+ }
+ b.byUID[buid] = element
+ b.byBKey[element.BKey] = element
+ return element.BKey, element.DKey, b.enc.Encode(element)
+}
+
+func (b *blessingsFlow) get(ctx *context.T, bkey, dkey uint64) (security.Blessings, map[string]security.Discharge, error) {
+ defer b.mu.Unlock()
+ b.mu.Lock()
+ for !b.closed {
+ element, has := b.byBKey[bkey]
+ if has && element.DKey == dkey {
+ return element.Blessings, dischargeMap(element.Discharges), nil
+ }
+ b.cond.Wait()
+ }
+ return security.Blessings{}, nil, NewErrBlessingsFlowClosed(ctx)
+}
+
+func (b *blessingsFlow) readLoop(ctx *context.T) {
+ for {
+ var received Blessings
+ err := b.dec.Decode(&received)
+ b.mu.Lock()
+ if err != nil {
+ b.closed = true
+ b.mu.Unlock()
+ return
+ }
+ b.byUID[string(received.Blessings.UniqueID())] = &received
+ b.byBKey[received.BKey] = &received
+ b.cond.Broadcast()
+ b.mu.Unlock()
+ }
+}
+
+func dischargeList(in map[string]security.Discharge) []security.Discharge {
+ out := make([]security.Discharge, 0, len(in))
+ for _, d := range in {
+ out = append(out, d)
+ }
+ return out
+}
+func dischargeMap(in []security.Discharge) map[string]security.Discharge {
+ out := make(map[string]security.Discharge, len(in))
+ for _, d := range in {
+ out[d.ID()] = d
+ }
+ return out
+}
+func equalDischarges(m map[string]security.Discharge, s []security.Discharge) bool {
+ if len(m) != len(s) {
+ return false
+ }
+ for _, d := range s {
+ if !d.Equivalent(m[d.ID()]) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
new file mode 100644
index 0000000..df8fee6
--- /dev/null
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -0,0 +1,124 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test/testutil"
+)
+
+func checkBlessings(t *testing.T, df, af flow.Flow, db, ab security.Blessings) {
+ msg, err := af.ReadMsg()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if string(msg) != "hello" {
+ t.Fatalf("Got %s, wanted hello", string(msg))
+ }
+ if !af.LocalBlessings().Equivalent(ab) {
+ t.Errorf("got: %#v wanted %#v", af.LocalBlessings(), ab)
+ }
+ if !af.RemoteBlessings().Equivalent(db) {
+ t.Errorf("got: %#v wanted %#v", af.RemoteBlessings(), db)
+ }
+ if !df.LocalBlessings().Equivalent(db) {
+ t.Errorf("got: %#v wanted %#v", df.LocalBlessings(), db)
+ }
+ if !df.RemoteBlessings().Equivalent(ab) {
+ t.Errorf("got: %#v wanted %#v", df.RemoteBlessings(), ab)
+ }
+}
+func dialFlow(t *testing.T, ctx *context.T, dc *Conn, b security.Blessings) flow.Flow {
+ df, err := dc.Dial(ctx, makeBFP(b))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if _, err = df.WriteMsg([]byte("hello")); err != nil {
+ t.Fatal(err)
+ }
+ return df
+}
+
+func TestUnidirectional(t *testing.T) {
+ dctx, shutdown := v23.Init()
+ defer shutdown()
+ actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ aflows := make(chan flow.Flow, 2)
+ dc, ac, _ := setupConns(t, dctx, actx, nil, aflows)
+
+ df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
+ af1 := <-aflows
+ checkBlessings(t, df1, af1,
+ v23.GetPrincipal(dctx).BlessingStore().Default(),
+ v23.GetPrincipal(actx).BlessingStore().Default())
+
+ db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
+ if err != nil {
+ t.Fatal(err)
+ }
+ df2 := dialFlow(t, dctx, dc, db2)
+ af2 := <-aflows
+ checkBlessings(t, df2, af2, db2,
+ v23.GetPrincipal(actx).BlessingStore().Default())
+
+ // We should not be able to dial in the other direction, because that flow
+ // manager is not willing to accept flows.
+ _, err = ac.Dial(actx, testBFP)
+ if verror.ErrorID(err) != ErrDialingNonServer.ID {
+ t.Errorf("got %v, wanted ErrDialingNonServer", err)
+ }
+}
+
+func TestBidirectional(t *testing.T) {
+ dctx, shutdown := v23.Init()
+ defer shutdown()
+ actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ dflows := make(chan flow.Flow, 2)
+ aflows := make(chan flow.Flow, 2)
+ dc, ac, _ := setupConns(t, dctx, actx, dflows, aflows)
+
+ df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
+ af1 := <-aflows
+ checkBlessings(t, df1, af1,
+ v23.GetPrincipal(dctx).BlessingStore().Default(),
+ v23.GetPrincipal(actx).BlessingStore().Default())
+
+ db2, err := v23.GetPrincipal(dctx).BlessSelf("other")
+ if err != nil {
+ t.Fatal(err)
+ }
+ df2 := dialFlow(t, dctx, dc, db2)
+ af2 := <-aflows
+ checkBlessings(t, df2, af2, db2,
+ v23.GetPrincipal(actx).BlessingStore().Default())
+
+ af3 := dialFlow(t, actx, ac, v23.GetPrincipal(actx).BlessingStore().Default())
+ df3 := <-dflows
+ checkBlessings(t, af3, df3,
+ v23.GetPrincipal(actx).BlessingStore().Default(),
+ v23.GetPrincipal(dctx).BlessingStore().Default())
+
+ ab2, err := v23.GetPrincipal(actx).BlessSelf("aother")
+ if err != nil {
+ t.Fatal(err)
+ }
+ af4 := dialFlow(t, actx, ac, ab2)
+ df4 := <-dflows
+ checkBlessings(t, af4, df4, ab2,
+ v23.GetPrincipal(dctx).BlessingStore().Default())
+}
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index 6dc0987..fd73c50 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -55,10 +55,10 @@
d.Close(ctx, fmt.Errorf("Closing randomly."))
<-d.Closed()
<-a.Closed()
- if _, err := d.Dial(ctx); err == nil {
+ if _, err := d.Dial(ctx, testBFP); err == nil {
t.Errorf("Nil error dialing on dialer")
}
- if _, err := a.Dial(ctx); err == nil {
+ if _, err := a.Dial(ctx, testBFP); err == nil {
t.Errorf("Nil error dialing on acceptor")
}
}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 8cd4b6a..321abde 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -15,7 +15,6 @@
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
-
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
@@ -23,9 +22,14 @@
// Each flow on a given conn will have a unique number.
type flowID uint64
+const (
+ invalidFlowID = flowID(iota)
+ blessingsFlowID
+ reservedFlows = 10
+)
+
const mtu = 1 << 16
const defaultBufferSize = 1 << 20
-const reservedFlows = 10
const (
expressPriority = iota
@@ -45,22 +49,24 @@
}
// Conns are a multiplexing encrypted channels that can host Flows.
+// TODO(mattr): track and clean up all spawned goroutines.
type Conn struct {
- fc *flowcontrol.FlowController
- mp *messagePipe
- handler FlowHandler
- versions version.RPCVersionRange
- acceptorBlessings security.Blessings
- dialerPublicKey security.PublicKey
- local, remote naming.Endpoint
- closed chan struct{}
+ fc *flowcontrol.FlowController
+ mp *messagePipe
+ handler FlowHandler
+ version version.RPCVersion
+ lBlessings, rBlessings security.Blessings
+ rDischarges, lDischarges map[string]security.Discharge
+ local, remote naming.Endpoint
+ closed chan struct{}
+ blessingsFlow *blessingsFlow
mu sync.Mutex
nextFid flowID
flows map[flowID]*flw
}
-// Ensure that *Conn implements flow.Conn
+// Ensure that *Conn implements flow.Conn.
var _ flow.Conn = &Conn{}
// NewDialed dials a new Conn on the given conn.
@@ -69,20 +75,21 @@
conn MsgReadWriteCloser,
local, remote naming.Endpoint,
versions version.RPCVersionRange,
- handler FlowHandler,
- fn flow.BlessingsForPeer) (*Conn, error) {
- principal := v23.GetPrincipal(ctx)
+ handler FlowHandler) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
- mp: newMessagePipe(conn),
- handler: handler,
- versions: versions,
- dialerPublicKey: principal.PublicKey(),
- local: local,
- remote: remote,
- closed: make(chan struct{}),
- nextFid: reservedFlows,
- flows: map[flowID]*flw{},
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
+ local: local,
+ remote: remote,
+ closed: make(chan struct{}),
+ nextFid: reservedFlows,
+ flows: map[flowID]*flw{},
+ }
+ if err := c.dialHandshake(ctx, versions); err != nil {
+ c.Close(ctx, err)
+ return nil, err
}
go c.readLoop(ctx)
return c, nil
@@ -93,27 +100,39 @@
ctx *context.T,
conn MsgReadWriteCloser,
local naming.Endpoint,
- lBlessings security.Blessings,
versions version.RPCVersionRange,
handler FlowHandler) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
- mp: newMessagePipe(conn),
- handler: handler,
- versions: versions,
- acceptorBlessings: lBlessings,
- local: local,
- remote: local, // TODO(mattr): Get the real remote endpoint.
- closed: make(chan struct{}),
- nextFid: reservedFlows + 1,
- flows: map[flowID]*flw{},
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
+ local: local,
+ closed: make(chan struct{}),
+ nextFid: reservedFlows + 1,
+ flows: map[flowID]*flw{},
+ }
+ if err := c.acceptHandshake(ctx, versions); err != nil {
+ c.Close(ctx, err)
+ return nil, err
}
go c.readLoop(ctx)
return c, nil
}
// Dial dials a new flow on the Conn.
-func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
+func (c *Conn) Dial(ctx *context.T, fn flow.BlessingsForPeer) (flow.Flow, error) {
+ if c.rBlessings.IsZero() {
+ return nil, NewErrDialingNonServer(ctx)
+ }
+ blessings, err := fn(ctx, c.local, c.remote, c.rBlessings, c.rDischarges)
+ if err != nil {
+ return nil, err
+ }
+ bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, nil)
+ if err != nil {
+ return nil, err
+ }
defer c.mu.Unlock()
c.mu.Lock()
if c.flows == nil {
@@ -121,7 +140,7 @@
}
id := c.nextFid
c.nextFid++
- return c.newFlowLocked(ctx, id), nil
+ return c.newFlowLocked(ctx, id, bkey, dkey, true, false), nil
}
// LocalEndpoint returns the local vanadium Endpoint
@@ -130,16 +149,6 @@
// RemoteEndpoint returns the remote vanadium Endpoint
func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
-// DialerPublicKey returns the public key presented by the dialer during authentication.
-func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
-
-// AcceptorBlessings returns the blessings presented by the acceptor during authentication.
-func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
-
-// AcceptorDischarges returns the discharges presented by the acceptor during authentication.
-// Discharges are organized in a map keyed by the discharge-identifier.
-func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
-
// Closed returns a channel that will be closed after the Conn is shutdown.
// After this channel is closed it is guaranteed that all Dial calls will fail
// with an error and no more flows will be sent to the FlowHandler.
@@ -156,10 +165,7 @@
// We've already torn this conn down.
return
}
- ferr := err
- if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
- ferr = NewErrFlowClosedRemotely(ctx)
- } else {
+ if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
message := ""
if err != nil {
message = err.Error()
@@ -172,7 +178,7 @@
}
}
for _, f := range flows {
- f.close(ctx, ferr)
+ f.close(ctx, NewErrConnectionClosed(ctx))
}
if cerr := c.mp.close(); cerr != nil {
ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
@@ -206,77 +212,83 @@
}
}
-func (c *Conn) readLoop(ctx *context.T) {
- var terr error
- defer c.Close(ctx, terr)
+func (c *Conn) handleMessage(ctx *context.T, x message) error {
+ switch msg := x.(type) {
+ case *tearDown:
+ return NewErrConnClosedRemotely(ctx, msg.Message)
- for {
- x, err := c.mp.readMsg(ctx)
- if err != nil {
- c.Close(ctx, NewErrRecv(ctx, c.remote.String(), err))
- return
+ case *openFlow:
+ if c.handler == nil {
+ return NewErrUnexpectedMsg(ctx, "openFlow")
+ }
+ c.mu.Lock()
+ f := c.newFlowLocked(ctx, msg.id, msg.bkey, msg.dkey, false, false)
+ c.mu.Unlock()
+ c.handler.HandleFlow(f)
+
+ case *release:
+ release := make([]flowcontrol.Release, 0, len(msg.counters))
+ c.mu.Lock()
+ for fid, val := range msg.counters {
+ if f := c.flows[fid]; f != nil {
+ release = append(release, flowcontrol.Release{
+ Worker: f.worker,
+ Tokens: int(val),
+ })
+ }
+ }
+ c.mu.Unlock()
+ if err := c.fc.Release(ctx, release); err != nil {
+ return err
}
- switch msg := x.(type) {
- case *tearDown:
- terr = NewErrConnClosedRemotely(ctx, msg.Message)
- return
+ case *data:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
+ return nil
+ }
+ if err := f.q.put(ctx, msg.payload); err != nil {
+ return err
+ }
+ if msg.flags&closeFlag != 0 {
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+ }
- case *openFlow:
- c.mu.Lock()
- f := c.newFlowLocked(ctx, msg.id)
- c.mu.Unlock()
- c.handler.HandleFlow(f)
+ case *unencryptedData:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Infof("Ignoring data message for unknown flow: %d", msg.id)
+ return nil
+ }
+ if err := f.q.put(ctx, msg.payload); err != nil {
+ return err
+ }
+ if msg.flags&closeFlag != 0 {
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+ }
- case *release:
- release := make([]flowcontrol.Release, 0, len(msg.counters))
- c.mu.Lock()
- for fid, val := range msg.counters {
- if f := c.flows[fid]; f != nil {
- release = append(release, flowcontrol.Release{
- Worker: f.worker,
- Tokens: int(val),
- })
- }
- }
- c.mu.Unlock()
- if terr = c.fc.Release(ctx, release); terr != nil {
- return
- }
+ default:
+ return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String())
+ }
+ return nil
+}
- case *data:
- c.mu.Lock()
- f := c.flows[msg.id]
- c.mu.Unlock()
- if f == nil {
- ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
- continue
- }
- if terr = f.q.put(ctx, msg.payload); terr != nil {
- return
- }
- if msg.flags&closeFlag != 0 {
- f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
- }
-
- case *unencryptedData:
- c.mu.Lock()
- f := c.flows[msg.id]
- c.mu.Unlock()
- if f == nil {
- ctx.Infof("Ignoring data message for unknown flow: %d", msg.id)
- continue
- }
- if terr = f.q.put(ctx, msg.payload); terr != nil {
- return
- }
- if msg.flags&closeFlag != 0 {
- f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
- }
-
- default:
- terr = NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).Name())
- return
+func (c *Conn) readLoop(ctx *context.T) {
+ var err error
+ for {
+ msg, rerr := c.mp.readMsg(ctx)
+ if rerr != nil {
+ err = NewErrRecv(ctx, c.remote.String(), rerr)
+ break
+ }
+ if err = c.handleMessage(ctx, msg); err != nil {
+ break
}
}
+ c.Close(ctx, err)
}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 7fefba6..49bd59e 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -21,7 +21,6 @@
func init() {
test.Init()
-
randData = make([]byte, 2*defaultBufferSize)
if _, err := rand.Read(randData); err != nil {
panic("Could not read random data.")
@@ -68,15 +67,6 @@
<-af.Closed()
}
-func TestDial(t *testing.T) {
- ctx, shutdown := v23.Init()
- defer shutdown()
- for _, dialerDials := range []bool{true, false} {
- df, flows := setupFlow(t, ctx, ctx, dialerDials)
- testWrite(t, ctx, []byte("hello world"), df, flows)
- }
-}
-
func TestLargeWrite(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
diff --git a/runtime/internal/flow/conn/conncache_test.go b/runtime/internal/flow/conn/conncache_test.go
index a3c8b80..2686b0b 100644
--- a/runtime/internal/flow/conn/conncache_test.go
+++ b/runtime/internal/flow/conn/conncache_test.go
@@ -255,10 +255,23 @@
}
func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *Conn {
- d, _, _ := newMRWPair(ctx)
- c, err := NewDialed(ctx, d, ep, ep, version.RPCVersionRange{Min: 1, Max: 5}, nil, nil)
- if err != nil {
- t.Fatalf("Could not create conn: %v", err)
- }
- return c
+ dmrw, amrw, _ := newMRWPair(ctx)
+ dch := make(chan *Conn)
+ ach := make(chan *Conn)
+ go func() {
+ d, err := NewDialed(ctx, dmrw, ep, ep, version.RPCVersionRange{1, 5}, nil)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ dch <- d
+ }()
+ go func() {
+ a, err := NewAccepted(ctx, amrw, ep, version.RPCVersionRange{1, 5}, nil)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ ach <- a
+ }()
+ <-dch
+ return <-ach
}
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 0e0e821..40e5c0f 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -10,13 +10,15 @@
// since all of their errors are intended to be used as arguments to higher level errors.
// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
error (
- InvalidMsg(typ byte, size, field int64) {
- "en": "message of type{:typ} and size{:size} failed decoding at field{:field}."}
- InvalidControlMsg(cmd byte, size, field int64) {
- "en": "control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
+ InvalidMsg(typ byte, size, field uint64) {"en": "message of type{:typ} and size{:size} failed decoding at field{:field}."}
+ InvalidControlMsg(cmd byte, size, field uint64, err error) {"en": "control message of cmd {cmd} and size {size} failed decoding at field {field}{:err}."}
+ InvalidSetupOption(option, field uint64) {
+ "en": "setup option{:option} failed decoding at field{:field}."}
+ MissingSetupOption(option uint64) {
+ "en": "missing required setup option{:option}."}
+ UnknownSetupOption(option uint64) {"en": "unknown setup option{:option}."}
UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
UnknownControlMsg(cmd byte) {"en": "unknown control command{:cmd}."}
-
UnexpectedMsg(typ string) {"en": "unexpected message type{:typ}."}
ConnectionClosed() {"en": "connection closed."}
ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
@@ -26,4 +28,9 @@
Recv(src string, err error) {"en": "error reading from {src}{:err}"}
CacheClosed() {"en":"cache is closed"}
CounterOverflow() {"en": "A remote process has sent more data than allowed."}
+ BlessingsFlowClosed() {"en": "The blessings flow was closed."}
+ InvalidChannelBinding() {"en": "The channel binding was invalid."}
+ NoPublicKey() {"en": "No public key was received by the remote end."}
+ DialingNonServer() {"en": "You are attempting to dial on a connection with no remote server."}
+ AcceptorBlessingsMissing() {"en": "The acceptor did not send blessings."}
)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index 2db4372..ed75b03 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -16,7 +16,10 @@
var (
ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
- ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
+ ErrInvalidSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidSetupOption", verror.NoRetry, "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
+ ErrMissingSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.MissingSetupOption", verror.NoRetry, "{1:}{2:} missing required setup option{:3}.")
+ ErrUnknownSetupOption = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownSetupOption", verror.NoRetry, "{1:}{2:} unknown setup option{:3}.")
ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
ErrUnexpectedMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
@@ -28,11 +31,19 @@
ErrRecv = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
ErrCounterOverflow = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
+ ErrBlessingsFlowClosed = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsFlowClosed", verror.NoRetry, "{1:}{2:} The blessings flow was closed.")
+ ErrInvalidChannelBinding = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidChannelBinding", verror.NoRetry, "{1:}{2:} The channel binding was invalid.")
+ ErrNoPublicKey = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
+ ErrDialingNonServer = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
+ ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidMsg.ID), "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidSetupOption.ID), "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrMissingSetupOption.ID), "{1:}{2:} missing required setup option{:3}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownSetupOption.ID), "{1:}{2:} unknown setup option{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMsg.ID), "{1:}{2:} unexpected message type{:3}.")
@@ -44,16 +55,36 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrRecv.ID), "{1:}{2:} error reading from {3}{:4}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCounterOverflow.ID), "{1:}{2:} A remote process has sent more data than allowed.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlessingsFlowClosed.ID), "{1:}{2:} The blessings flow was closed.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidChannelBinding.ID), "{1:}{2:} The channel binding was invalid.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNoPublicKey.ID), "{1:}{2:} No public key was received by the remote end.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDialingNonServer.ID), "{1:}{2:} You are attempting to dial on a connection with no remote server.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptorBlessingsMissing.ID), "{1:}{2:} The acceptor did not send blessings.")
}
// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
-func NewErrInvalidMsg(ctx *context.T, typ byte, size int64, field int64) error {
+func NewErrInvalidMsg(ctx *context.T, typ byte, size uint64, field uint64) error {
return verror.New(ErrInvalidMsg, ctx, typ, size, field)
}
// NewErrInvalidControlMsg returns an error with the ErrInvalidControlMsg ID.
-func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size int64, field int64) error {
- return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field)
+func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size uint64, field uint64, err error) error {
+ return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field, err)
+}
+
+// NewErrInvalidSetupOption returns an error with the ErrInvalidSetupOption ID.
+func NewErrInvalidSetupOption(ctx *context.T, option uint64, field uint64) error {
+ return verror.New(ErrInvalidSetupOption, ctx, option, field)
+}
+
+// NewErrMissingSetupOption returns an error with the ErrMissingSetupOption ID.
+func NewErrMissingSetupOption(ctx *context.T, option uint64) error {
+ return verror.New(ErrMissingSetupOption, ctx, option)
+}
+
+// NewErrUnknownSetupOption returns an error with the ErrUnknownSetupOption ID.
+func NewErrUnknownSetupOption(ctx *context.T, option uint64) error {
+ return verror.New(ErrUnknownSetupOption, ctx, option)
}
// NewErrUnknownMsg returns an error with the ErrUnknownMsg ID.
@@ -110,3 +141,28 @@
func NewErrCounterOverflow(ctx *context.T) error {
return verror.New(ErrCounterOverflow, ctx)
}
+
+// NewErrBlessingsFlowClosed returns an error with the ErrBlessingsFlowClosed ID.
+func NewErrBlessingsFlowClosed(ctx *context.T) error {
+ return verror.New(ErrBlessingsFlowClosed, ctx)
+}
+
+// NewErrInvalidChannelBinding returns an error with the ErrInvalidChannelBinding ID.
+func NewErrInvalidChannelBinding(ctx *context.T) error {
+ return verror.New(ErrInvalidChannelBinding, ctx)
+}
+
+// NewErrNoPublicKey returns an error with the ErrNoPublicKey ID.
+func NewErrNoPublicKey(ctx *context.T) error {
+ return verror.New(ErrNoPublicKey, ctx)
+}
+
+// NewErrDialingNonServer returns an error with the ErrDialingNonServer ID.
+func NewErrDialingNonServer(ctx *context.T) error {
+ return verror.New(ErrDialingNonServer, ctx)
+}
+
+// NewErrAcceptorBlessingsMissing returns an error with the ErrAcceptorBlessingsMissing ID.
+func NewErrAcceptorBlessingsMissing(ctx *context.T) error {
+ return verror.New(ErrAcceptorBlessingsMissing, ctx)
+}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index f75e10f..a8f832f 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -13,35 +13,36 @@
)
type flw struct {
- id flowID
- ctx *context.T
- cancel context.CancelFunc
- conn *Conn
- worker *flowcontrol.Worker
- opened bool
- q *readq
- dialerBlessings security.Blessings
- dialerDischarges map[string]security.Discharge
+ id flowID
+ dialed bool
+ ctx *context.T
+ cancel context.CancelFunc
+ conn *Conn
+ worker *flowcontrol.Worker
+ opened bool
+ q *readq
+ bkey, dkey uint64
}
+// Ensure that *flw implements flow.Flow.
var _ flow.Flow = &flw{}
-func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
+func (c *Conn) newFlowLocked(ctx *context.T, id flowID, bkey, dkey uint64, dialed, preopen bool) *flw {
f := &flw{
id: id,
+ dialed: dialed,
conn: c,
worker: c.fc.NewWorker(flowPriority),
q: newReadQ(),
+ bkey: bkey,
+ dkey: dkey,
+ opened: preopen,
}
f.SetContext(ctx)
c.flows[id] = f
return f
}
-func (f *flw) dialed() bool {
- return f.id%2 == 0
-}
-
// Implement io.Reader.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
@@ -91,6 +92,8 @@
err := f.conn.mp.writeMsg(f.ctx, &openFlow{
id: f.id,
initialCounters: defaultBufferSize,
+ bkey: f.bkey,
+ dkey: f.dkey,
})
if err != nil {
return 0, false, err
@@ -173,19 +176,27 @@
// LocalBlessings returns the blessings presented by the local end of the flow
// during authentication.
func (f *flw) LocalBlessings() security.Blessings {
- if f.dialed() {
- return f.dialerBlessings
+ if f.dialed {
+ blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return blessings
}
- return f.conn.AcceptorBlessings()
+ return f.conn.lBlessings
}
// RemoteBlessings returns the blessings presented by the remote end of the
// flow during authentication.
func (f *flw) RemoteBlessings() security.Blessings {
- if f.dialed() {
- return f.conn.AcceptorBlessings()
+ if !f.dialed {
+ blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return blessings
}
- return f.dialerBlessings
+ return f.conn.rBlessings
}
// LocalDischarges returns the discharges presented by the local end of the
@@ -193,10 +204,14 @@
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) LocalDischarges() map[string]security.Discharge {
- if f.dialed() {
- return f.dialerDischarges
+ if f.dialed {
+ _, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return discharges
}
- return f.conn.AcceptorDischarges()
+ return f.conn.lDischarges
}
// RemoteDischarges returns the discharges presented by the remote end of the
@@ -204,10 +219,14 @@
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) RemoteDischarges() map[string]security.Discharge {
- if f.dialed() {
- return f.conn.AcceptorDischarges()
+ if !f.dialed {
+ _, discharges, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
+ if err != nil {
+ f.conn.Close(f.ctx, err)
+ }
+ return discharges
}
- return f.dialerDischarges
+ return f.conn.rDischarges
}
// Conn returns the connection the flow is multiplexed on.
@@ -229,7 +248,8 @@
func (f *flw) close(ctx *context.T, err error) {
f.q.close(ctx)
f.cancel()
- if verror.ErrorID(err) != ErrFlowClosedRemotely.ID {
+ if eid := verror.ErrorID(err); eid != ErrFlowClosedRemotely.ID &&
+ eid != ErrConnectionClosed.ID {
// We want to try to send this message even if ctx is already canceled.
ctx, cancel := context.WithRootCancel(ctx)
err := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 6cea426..7202223 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -5,9 +5,12 @@
package conn
import (
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/security"
+ "v.io/x/ref/runtime/internal/rpc/stream/crypto"
)
// TODO(mattr): Link to protocol doc.
@@ -30,6 +33,7 @@
invalidCmd = iota
setupCmd
tearDownCmd
+ authCmd
openFlowCmd
releaseCmd
)
@@ -45,7 +49,6 @@
// data flags.
const (
closeFlag = 1 << iota
- metadataFlag
)
// random consts.
@@ -57,15 +60,48 @@
// and encryption options for connection.
type setup struct {
versions version.RPCVersionRange
- PeerNaClPublicKey *[32]byte
- PeerRemoteEndpoint naming.Endpoint
- PeerLocalEndpoint naming.Endpoint
+ peerNaClPublicKey *[32]byte
+ peerRemoteEndpoint naming.Endpoint
+ peerLocalEndpoint naming.Endpoint
+}
+
+func writeSetupOption(option uint64, payload, buf []byte) []byte {
+ buf = writeVarUint64(option, buf)
+ buf = writeVarUint64(uint64(len(payload)), buf)
+ return append(buf, payload...)
+}
+func readSetupOption(ctx *context.T, orig []byte) (
+ option uint64, payload, data []byte, err error) {
+ var valid bool
+ if option, data, valid = readVarUint64(ctx, orig); !valid {
+ err = NewErrInvalidSetupOption(ctx, invalidOption, 0)
+ return
+ }
+ var size uint64
+ if size, data, valid = readVarUint64(ctx, data); !valid || uint64(len(data)) < size {
+ err = NewErrInvalidSetupOption(ctx, option, 1)
+ return
+ }
+ payload, data = data[:size], data[size:]
+ return
}
func (m *setup) write(ctx *context.T, p *messagePipe) error {
p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
- return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
+ if m.peerNaClPublicKey != nil {
+ p.controlBuf = writeSetupOption(peerNaClPublicKeyOption,
+ m.peerNaClPublicKey[:], p.controlBuf)
+ }
+ if m.peerRemoteEndpoint != nil {
+ p.controlBuf = writeSetupOption(peerRemoteEndpointOption,
+ []byte(m.peerRemoteEndpoint.String()), p.controlBuf)
+ }
+ if m.peerLocalEndpoint != nil {
+ p.controlBuf = writeSetupOption(peerLocalEndpointOption,
+ []byte(m.peerLocalEndpoint.String()), p.controlBuf)
+ }
+ return p.write(ctx, [][]byte{{controlType, setupCmd}, p.controlBuf})
}
func (m *setup) read(ctx *context.T, orig []byte) error {
var (
@@ -74,13 +110,37 @@
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
+ return NewErrInvalidControlMsg(ctx, setupCmd, uint64(len(orig)), 0, nil)
}
m.versions.Min = version.RPCVersion(v)
if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
+ return NewErrInvalidControlMsg(ctx, setupCmd, uint64(len(orig)), 1, nil)
}
m.versions.Max = version.RPCVersion(v)
+ for field := uint64(2); len(data) > 0; field++ {
+ var (
+ payload []byte
+ option uint64
+ err error
+ )
+ if option, payload, data, err = readSetupOption(ctx, data); err != nil {
+ return NewErrInvalidControlMsg(ctx, setupCmd, uint64(len(orig)), field, err)
+ }
+ switch option {
+ case peerNaClPublicKeyOption:
+ m.peerNaClPublicKey = new([32]byte)
+ copy(m.peerNaClPublicKey[:], payload)
+ case peerRemoteEndpointOption:
+ m.peerRemoteEndpoint, err = v23.NewEndpoint(string(payload))
+ case peerLocalEndpointOption:
+ m.peerLocalEndpoint, err = v23.NewEndpoint(string(payload))
+ default:
+ err = NewErrUnknownSetupOption(ctx, option)
+ }
+ if err != nil {
+ return NewErrInvalidControlMsg(ctx, setupCmd, uint64(len(orig)), field, err)
+ }
+ }
return nil
}
@@ -90,7 +150,7 @@
}
func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
- return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Message)})
+ return p.write(ctx, [][]byte{{controlType, tearDownCmd}, []byte(m.Message)})
}
func (m *tearDown) read(ctx *context.T, data []byte) error {
if len(data) > 0 {
@@ -99,16 +159,89 @@
return nil
}
+// auth is used to complete the auth handshake.
+type auth struct {
+ bkey, dkey uint64
+ channelBinding security.Signature
+ publicKey security.PublicKey
+}
+
+func (m *auth) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = writeVarUint64(m.bkey, p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(m.dkey, p.controlBuf)
+ s := m.channelBinding
+ p.controlBuf = writeVarUint64(uint64(len(s.Purpose)), p.controlBuf)
+ p.controlBuf = append(p.controlBuf, s.Purpose...)
+ p.controlBuf = writeVarUint64(uint64(len(s.Hash)), p.controlBuf)
+ p.controlBuf = append(p.controlBuf, []byte(s.Hash)...)
+ p.controlBuf = writeVarUint64(uint64(len(s.R)), p.controlBuf)
+ p.controlBuf = append(p.controlBuf, s.R...)
+ p.controlBuf = writeVarUint64(uint64(len(s.S)), p.controlBuf)
+ p.controlBuf = append(p.controlBuf, s.S...)
+ if m.publicKey != nil {
+ pk, err := m.publicKey.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ p.controlBuf = append(p.controlBuf, pk...)
+ }
+ return p.write(ctx, [][]byte{{controlType, authCmd}, p.controlBuf})
+}
+func (m *auth) read(ctx *context.T, orig []byte) error {
+ var data []byte
+ var valid bool
+ var l uint64
+ if m.bkey, data, valid = readVarUint64(ctx, orig); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 0, nil)
+ }
+ if m.dkey, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 1, nil)
+ }
+ if l, data, valid = readVarUint64(ctx, data); !valid || uint64(len(data)) < l {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 2, nil)
+ }
+ if l > 0 {
+ m.channelBinding.Purpose, data = data[:l], data[l:]
+ }
+ if l, data, valid = readVarUint64(ctx, data); !valid || uint64(len(data)) < l {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 3, nil)
+ }
+ if l > 0 {
+ m.channelBinding.Hash, data = security.Hash(data[:l]), data[l:]
+ }
+ if l, data, valid = readVarUint64(ctx, data); !valid || uint64(len(data)) < l {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 4, nil)
+ }
+ if l > 0 {
+ m.channelBinding.R, data = data[:l], data[l:]
+ }
+ if l, data, valid = readVarUint64(ctx, data); !valid || uint64(len(data)) < l {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 5, nil)
+ }
+ if l > 0 {
+ m.channelBinding.S, data = data[:l], data[l:]
+ }
+ if len(data) > 0 {
+ var err error
+ m.publicKey, err = security.UnmarshalPublicKey(data)
+ return err
+ }
+ return nil
+}
+
// openFlow is sent at the beginning of every new flow.
type openFlow struct {
id flowID
initialCounters uint64
+ bkey, dkey uint64
}
func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
- return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
+ p.controlBuf = writeVarUint64(m.bkey, p.controlBuf)
+ p.controlBuf = writeVarUint64(m.dkey, p.controlBuf)
+ return p.write(ctx, [][]byte{{controlType, openFlowCmd}, p.controlBuf})
}
func (m *openFlow) read(ctx *context.T, orig []byte) error {
var (
@@ -117,11 +250,17 @@
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 0, nil)
}
m.id = flowID(v)
if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 1, nil)
+ }
+ if m.bkey, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 2, nil)
+ }
+ if m.dkey, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, uint64(len(orig)), 3, nil)
}
return nil
}
@@ -138,14 +277,14 @@
p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
p.controlBuf = writeVarUint64(val, p.controlBuf)
}
- return p.write([][]byte{{controlType}}, [][]byte{{releaseCmd}, p.controlBuf})
+ return p.write(ctx, [][]byte{{controlType, releaseCmd}, p.controlBuf})
}
func (m *release) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
fid, val uint64
- n int64
+ n uint64
)
if len(data) == 0 {
return nil
@@ -153,10 +292,10 @@
m.counters = map[flowID]uint64{}
for len(data) > 0 {
if fid, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, releaseCmd, int64(len(orig)), n)
+ return NewErrInvalidControlMsg(ctx, releaseCmd, uint64(len(orig)), n, nil)
}
if val, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidControlMsg(ctx, releaseCmd, int64(len(orig)), n+1)
+ return NewErrInvalidControlMsg(ctx, releaseCmd, uint64(len(orig)), n+1, nil)
}
m.counters[flowID(fid)] = val
n += 2
@@ -172,10 +311,9 @@
}
func (m *data) write(ctx *context.T, p *messagePipe) error {
- p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
- p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
- encrypted := append([][]byte{p.dataBuf}, m.payload...)
- return p.write([][]byte{{dataType}}, encrypted)
+ p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(m.flags, p.controlBuf)
+ return p.write(ctx, append([][]byte{{dataType}, p.controlBuf}, m.payload...))
}
func (m *data) read(ctx *context.T, orig []byte) error {
var (
@@ -184,11 +322,11 @@
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
+ return NewErrInvalidMsg(ctx, dataType, uint64(len(orig)), 0)
}
m.id = flowID(v)
if m.flags, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
+ return NewErrInvalidMsg(ctx, dataType, uint64(len(orig)), 1)
}
if len(data) > 0 {
m.payload = [][]byte{data}
@@ -204,16 +342,13 @@
}
func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
- p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
- p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
- // re-use the controlBuf for the data size.
- size := uint64(0)
- for _, b := range m.payload {
- size += uint64(len(b))
+ p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(m.flags, p.controlBuf)
+ if err := p.write(ctx, [][]byte{{unencryptedDataType}, p.controlBuf}); err != nil {
+ return err
}
- p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
- unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
- return p.write(unencrypted, [][]byte{p.dataBuf})
+ _, err := p.rw.WriteMsg(m.payload...)
+ return err
}
func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
var (
@@ -222,49 +357,67 @@
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
- }
- plen := int(v)
- if plen > len(data) {
- return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
- }
- if plen > 0 {
- m.payload, data = [][]byte{data[:plen]}, data[plen:]
- }
- if v, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
+ return NewErrInvalidMsg(ctx, unencryptedDataType, uint64(len(orig)), 2)
}
m.id = flowID(v)
if m.flags, data, valid = readVarUint64(ctx, data); !valid {
- return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
+ return NewErrInvalidMsg(ctx, unencryptedDataType, uint64(len(orig)), 3)
}
return nil
}
+// TODO(mattr): Consider cleaning up the ControlCipher library to
+// eliminate extraneous functionality and reduce copying.
type messagePipe struct {
rw MsgReadWriteCloser
+ cipher crypto.ControlCipher
controlBuf []byte
- dataBuf []byte
- outBuf [][]byte
+ encBuf []byte
}
func newMessagePipe(rw MsgReadWriteCloser) *messagePipe {
return &messagePipe{
rw: rw,
controlBuf: make([]byte, 256),
- dataBuf: make([]byte, 2*maxVarUint64Size),
- outBuf: make([][]byte, 5),
+ encBuf: make([]byte, mtu),
+ cipher: &crypto.NullControlCipher{},
}
}
+func (p *messagePipe) setupEncryption(ctx *context.T, pk, sk, opk *[32]byte) []byte {
+ p.cipher = crypto.NewControlCipherRPC11(
+ (*crypto.BoxKey)(pk),
+ (*crypto.BoxKey)(sk),
+ (*crypto.BoxKey)(opk))
+ return p.cipher.ChannelBinding()
+}
+
func (p *messagePipe) close() error {
return p.rw.Close()
}
-func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
- p.outBuf = append(p.outBuf[:0], unencrypted...)
- p.outBuf = append(p.outBuf, encrypted...)
- _, err := p.rw.WriteMsg(p.outBuf...)
+func (p *messagePipe) write(ctx *context.T, encrypted [][]byte) error {
+ // TODO(mattr): Because of the API of the underlying crypto library,
+ // an enormous amount of copying happens here.
+ // TODO(mattr): We allocate many buffers here to hold potentially
+ // many copies of the data. The maximum memory usage per Conn is probably
+ // quite high. We should try to reduce it.
+ needed := p.cipher.MACSize()
+ for _, b := range encrypted {
+ needed += len(b)
+ }
+ if cap(p.encBuf) < needed {
+ p.encBuf = make([]byte, needed)
+ }
+ p.encBuf = p.encBuf[:0]
+ for _, b := range encrypted {
+ p.encBuf = append(p.encBuf, b...)
+ }
+ p.encBuf = p.encBuf[:needed]
+ if err := p.cipher.Seal(p.encBuf); err != nil {
+ return err
+ }
+ _, err := p.rw.WriteMsg(p.encBuf)
return err
}
@@ -274,39 +427,56 @@
func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
msg, err := p.rw.ReadMsg()
- if len(msg) == 0 {
- return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
- }
if err != nil {
return nil, err
}
+ minSize := 2 + p.cipher.MACSize()
+ if len(msg) < minSize || !p.cipher.Open(msg) {
+ return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
+ }
+ logmsg := msg
+ if len(msg) > 128 {
+ logmsg = logmsg[:128]
+ }
+ msgType, msg := msg[0], msg[1:len(msg)-p.cipher.MACSize()]
var m message
- switch msg[0] {
+ switch msgType {
case controlType:
- if len(msg) == 1 {
- return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
- }
- msg = msg[1:]
- switch msg[0] {
+ var msgCmd byte
+ msgCmd, msg = msg[0], msg[1:]
+ switch msgCmd {
case setupCmd:
m = &setup{}
case tearDownCmd:
m = &tearDown{}
+ case authCmd:
+ m = &auth{}
case openFlowCmd:
m = &openFlow{}
case releaseCmd:
m = &release{}
default:
- return nil, NewErrUnknownControlMsg(ctx, msg[0])
+ return nil, NewErrUnknownControlMsg(ctx, msgCmd)
}
case dataType:
m = &data{}
case unencryptedDataType:
- m = &unencryptedData{}
+ ud := &unencryptedData{}
+ payload, err := p.rw.ReadMsg()
+ if err != nil {
+ return nil, err
+ }
+ if len(payload) > 0 {
+ ud.payload = [][]byte{payload}
+ }
+ m = ud
default:
- return nil, NewErrUnknownMsg(ctx, msg[0])
+ return nil, NewErrUnknownMsg(ctx, msgType)
}
- return m, m.read(ctx, msg[1:])
+ if err = m.read(ctx, msg); err == nil {
+ ctx.VI(2).Infof("Read low-level message: %#v", m)
+ }
+ return m, err
}
func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index 2414c1c..81ceb64 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -48,9 +48,7 @@
}
}
-func testMessages(t *testing.T, cases []message) {
- ctx, shutdown := v23.Init()
- defer shutdown()
+func testMessages(t *testing.T, ctx *context.T, cases []message) {
w, r, _ := newMRWPair(ctx)
wp, rp := newMessagePipe(w), newMessagePipe(r)
for _, want := range cases {
@@ -73,28 +71,69 @@
}
func TestSetup(t *testing.T) {
- testMessages(t, []message{
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ ep1, err := v23.NewEndpoint(
+ "@5@tcp@foo.com:1234@00112233445566778899aabbccddeeff@m@v.io/foo")
+ if err != nil {
+ t.Fatal(err)
+ }
+ ep2, err := v23.NewEndpoint(
+ "@5@tcp@bar.com:1234@00112233445566778899aabbccddeeff@m@v.io/bar")
+ if err != nil {
+ t.Fatal(err)
+ }
+ testMessages(t, ctx, []message{
&setup{versions: version.RPCVersionRange{Min: 3, Max: 5}},
+ &setup{
+ versions: version.RPCVersionRange{Min: 3, Max: 5},
+ peerNaClPublicKey: &[32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
+ 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31},
+ peerRemoteEndpoint: ep1,
+ peerLocalEndpoint: ep2,
+ },
&setup{},
})
}
func TestTearDown(t *testing.T) {
- testMessages(t, []message{
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ testMessages(t, ctx, []message{
&tearDown{Message: "foobar"},
&tearDown{},
})
}
+func TestAuth(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ p := v23.GetPrincipal(ctx)
+ sig, err := p.Sign([]byte("message"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ testMessages(t, ctx, []message{
+ &auth{bkey: 1, dkey: 5, channelBinding: sig, publicKey: p.PublicKey()},
+ &auth{bkey: 1, dkey: 5, channelBinding: sig},
+ &auth{channelBinding: sig, publicKey: p.PublicKey()},
+ &auth{},
+ })
+}
+
func TestOpenFlow(t *testing.T) {
- testMessages(t, []message{
- &openFlow{id: 23, initialCounters: 1 << 20},
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ testMessages(t, ctx, []message{
+ &openFlow{id: 23, initialCounters: 1 << 20, bkey: 42, dkey: 55},
&openFlow{},
})
}
func TestAddReceiveBuffers(t *testing.T) {
- testMessages(t, []message{
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ testMessages(t, ctx, []message{
&release{},
&release{counters: map[flowID]uint64{
4: 233,
@@ -104,14 +143,18 @@
}
func TestData(t *testing.T) {
- testMessages(t, []message{
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ testMessages(t, ctx, []message{
&data{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
&data{},
})
}
func TestUnencryptedData(t *testing.T) {
- testMessages(t, []message{
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ testMessages(t, ctx, []message{
&unencryptedData{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
&unencryptedData{},
})
diff --git a/runtime/internal/flow/conn/types.vdl b/runtime/internal/flow/conn/types.vdl
new file mode 100644
index 0000000..de776fa
--- /dev/null
+++ b/runtime/internal/flow/conn/types.vdl
@@ -0,0 +1,18 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import "v.io/v23/security"
+
+// Blessings is used to transport blessings and their discharges
+// between the two ends of a Conn. Since these objects can be large
+// we try not to send them more than once, therefore whenever we send
+// new blessings or discharges we associate them with an integer
+// key (BKey and DKey). Thereafter we refer to them by their key.
+type Blessings struct {
+ Blessings security.WireBlessings
+ Discharges []security.WireDischarge
+ BKey, DKey uint64
+}
\ No newline at end of file
diff --git a/runtime/internal/flow/conn/types.vdl.go b/runtime/internal/flow/conn/types.vdl.go
new file mode 100644
index 0000000..dc4bcd9
--- /dev/null
+++ b/runtime/internal/flow/conn/types.vdl.go
@@ -0,0 +1,37 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package conn
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/security"
+)
+
+// Blessings is used to transport blessings and their discharges
+// between the two ends of a Conn. Since these objects can be large
+// we try not to send them more than once, therefore whenever we send
+// new blessings or discharges we associate them with an integer
+// key (BKey and DKey). Thereafter we refer to them by their key.
+type Blessings struct {
+ Blessings security.Blessings
+ Discharges []security.Discharge
+ BKey uint64
+ DKey uint64
+}
+
+func (Blessings) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/runtime/internal/flow/conn.Blessings"`
+}) {
+}
+
+func init() {
+ vdl.Register((*Blessings)(nil))
+}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 5694830..140515a 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -12,8 +12,10 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
+ "v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/x/ref/internal/logger"
)
type wire struct {
@@ -56,6 +58,11 @@
for _, d := range data {
buf = append(buf, d...)
}
+ logbuf := buf
+ if len(buf) > 128 {
+ logbuf = buf[:128]
+ }
+ logger.Global().VI(2).Infof("Writing %d bytes to the wire: %#v", len(buf), logbuf)
defer f.wire.mu.Unlock()
f.wire.mu.Lock()
for f.peer.in != nil && !f.wire.closed {
@@ -79,6 +86,11 @@
}
buf, f.in = f.in, nil
f.wire.c.Broadcast()
+ logbuf := buf
+ if len(buf) > 128 {
+ logbuf = buf[:128]
+ }
+ logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
return buf, nil
}
func (f *mRW) Close() error {
@@ -103,15 +115,31 @@
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
- d, err := NewDialed(dctx, dmrw, ep, ep, versions, fh(dflows), nil)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- a, err := NewAccepted(actx, amrw, ep, security.Blessings{}, versions, fh(aflows))
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- return d, a, w
+ dch := make(chan *Conn)
+ ach := make(chan *Conn)
+ go func() {
+ var handler FlowHandler
+ if dflows != nil {
+ handler = fh(dflows)
+ }
+ d, err := NewDialed(dctx, dmrw, ep, ep, versions, handler)
+ if err != nil {
+ panic(err)
+ }
+ dch <- d
+ }()
+ go func() {
+ var handler FlowHandler
+ if aflows != nil {
+ handler = fh(aflows)
+ }
+ a, err := NewAccepted(actx, amrw, ep, versions, handler)
+ if err != nil {
+ panic(err)
+ }
+ ach <- a
+ }()
+ return <-dch, <-ach, w
}
func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
@@ -129,9 +157,29 @@
dialed = make([]flow.Flow, n)
for i := 0; i < n; i++ {
var err error
- if dialed[i], err = d.Dial(dctx); err != nil {
+ if dialed[i], err = d.Dial(dctx, testBFP); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
return dialed, aflows
}
+
+func testBFP(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+) (security.Blessings, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil
+}
+
+func makeBFP(in security.Blessings) flow.BlessingsForPeer {
+ return func(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+ ) (security.Blessings, error) {
+ return in, nil
+ }
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 4bdca655..ac26aa1 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -11,7 +11,6 @@
"syscall"
"time"
- "v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
@@ -90,7 +89,6 @@
ctx,
&framer{ReadWriteCloser: netConn},
local,
- v23.GetPrincipal(ctx).BlessingStore().Default(),
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
)
@@ -194,6 +192,9 @@
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
+ // TODO(mattr): We should only pass a flowHandler to NewDialed if there
+ // is a server attached to this flow manager. Perhaps we can signal
+ // "serving flow manager" by passing a 0 RID to non-serving flow managers?
c, err = conn.NewDialed(
ctx,
&framer{ReadWriteCloser: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
@@ -201,7 +202,6 @@
remote,
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
- fn,
)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
@@ -210,7 +210,7 @@
return nil, flow.NewErrBadState(ctx, err)
}
}
- f, err := c.Dial(ctx)
+ f, err := c.Dial(ctx, fn)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 9fe2d50..77d8c57 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -36,7 +36,14 @@
t.Fatal(err)
}
- bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+ bFn := func(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+ ) (security.Blessings, error) {
+ return p.BlessingStore().Default(), nil
+ }
eps := m.ListeningEndpoints()
if len(eps) == 0 {
t.Fatalf("no endpoints listened on")
@@ -70,7 +77,14 @@
t.Fatal(err)
}
- bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+ bFn := func(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+ ) (security.Blessings, error) {
+ return p.BlessingStore().Default(), nil
+ }
eps := am.ListeningEndpoints()
if len(eps) == 0 {
t.Fatalf("no endpoints listened on")
diff --git a/services/device/device/glob.go b/services/device/device/glob.go
index 9f4ee8b..7b9912a 100644
--- a/services/device/device/glob.go
+++ b/services/device/device/glob.go
@@ -159,6 +159,9 @@
results := glob(ctx, env, args)
sort.Sort(byTypeAndName(results))
results = filterResults(results, s)
+ if len(results) == 0 {
+ return fmt.Errorf("no objects found")
+ }
stdouts, stderrs := make([]bytes.Buffer, len(results)), make([]bytes.Buffer, len(results))
var errorCounter uint32 = 0
perResult := func(r *GlobResult, index int) {
diff --git a/services/device/device/glob_test.go b/services/device/device/glob_test.go
index 37eb7b2..e0def3d 100644
--- a/services/device/device/glob_test.go
+++ b/services/device/device/glob_test.go
@@ -418,7 +418,7 @@
allGlobArgs,
"",
"",
- "",
+ "no objects found",
},
// No glob arguments.
{
@@ -429,7 +429,7 @@
[]string{},
"",
"",
- "",
+ "no objects found",
},
// No glob results.
{
@@ -440,7 +440,7 @@
allGlobArgs,
"",
"",
- "",
+ "no objects found",
},
// Error in glob.
{