Merge "services/device/util_darwin_test.go: devmgr multiuser tests on darwin"
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index b5ea1e9..f31bd9a 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -35,12 +35,17 @@
return nil, nil, func() {}, err
}
ctx = context.WithValue(ctx, principalKey, p)
+ ctx = context.WithLogger(ctx, logger.Global())
return &Runtime{ns: tnaming.NewSimpleNamespace()}, ctx, func() {}, nil
}
func (r *Runtime) Init(ctx *context.T) error {
// nologcall
- return logger.Manager(ctx).ConfigureFromFlags()
+ err := logger.Manager(ctx).ConfigureFromFlags()
+ if err != nil && !logger.IsAlreadyConfiguredError(err) {
+ return err
+ }
+ return nil
}
func (r *Runtime) WithPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 16ece68..14457eb 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -5,11 +5,30 @@
package conn
import (
+ "sync"
+
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/flow/flowcontrol"
+)
+
+// flowID is a number assigned to identify a flow.
+// Each flow on a given conn will have a unique number.
+type flowID uint64
+
+const mtu = 1 << 16
+const defaultBufferSize = 1 << 20
+const reservedFlows = 10
+
+const (
+ expressPriority = iota
+ flowPriority
+ tearDownPriority
)
// FlowHandlers process accepted flows.
@@ -20,6 +39,18 @@
// Conns are a multiplexing encrypted channels that can host Flows.
type Conn struct {
+ fc *flowcontrol.FlowController
+ mp *messagePipe
+ handler FlowHandler
+ versions version.RPCVersionRange
+ acceptorBlessings security.Blessings
+ dialerPublicKey security.PublicKey
+ local, remote naming.Endpoint
+ closed chan struct{}
+
+ mu sync.Mutex
+ nextFid flowID
+ flows map[flowID]*flw
}
// Ensure that *Conn implements flow.Conn
@@ -29,47 +60,155 @@
func NewDialed(
ctx *context.T,
conn flow.MsgReadWriter,
- principal security.Principal,
local, remote naming.Endpoint,
versions version.RPCVersionRange,
handler FlowHandler,
fn flow.BlessingsForPeer) (*Conn, error) {
- return nil, nil
+ principal := v23.GetPrincipal(ctx)
+ c := &Conn{
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ versions: versions,
+ dialerPublicKey: principal.PublicKey(),
+ local: local,
+ remote: remote,
+ nextFid: reservedFlows,
+ flows: map[flowID]*flw{},
+ }
+ go c.readLoop(ctx)
+ return c, nil
}
// NewAccepted accepts a new Conn on the given conn.
func NewAccepted(
ctx *context.T,
conn flow.MsgReadWriter,
- principal security.Principal,
local naming.Endpoint,
lBlessings security.Blessings,
versions version.RPCVersionRange,
- handler FlowHandler,
- skipDischarges bool) (*Conn, error) {
- return nil, nil
+ handler FlowHandler) (*Conn, error) {
+ c := &Conn{
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ versions: versions,
+ acceptorBlessings: lBlessings,
+ local: local,
+ nextFid: reservedFlows + 1,
+ flows: map[flowID]*flw{},
+ }
+ go c.readLoop(ctx)
+ return c, nil
}
// Dial dials a new flow on the Conn.
-func (c *Conn) Dial() (flow.Flow, error) { return nil, nil }
+func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+
+ id := c.nextFid
+ c.nextFid++
+
+ return c.newFlowLocked(ctx, id), nil
+}
// Closed returns a channel that will be closed after the Conn is shutdown.
// After this channel is closed it is guaranteed that all Dial calls will fail
// with an error and no more flows will be sent to the FlowHandler.
-func (c *Conn) Closed() <-chan struct{} { return nil }
+func (c *Conn) Closed() <-chan struct{} { return c.closed }
// LocalEndpoint returns the local vanadium Endpoint
-func (c *Conn) LocalEndpoint() naming.Endpoint { return nil }
+func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
// RemoteEndpoint returns the remote vanadium Endpoint
-func (c *Conn) RemoteEndpoint() naming.Endpoint { return nil }
+func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
// DialerPublicKey returns the public key presented by the dialer during authentication.
-func (c *Conn) DialerPublicKey() security.PublicKey { return nil }
+func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
// AcceptorBlessings returns the blessings presented by the acceptor during authentication.
-func (c *Conn) AcceptorBlessings() security.Blessings { return security.Blessings{} }
+func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
// AcceptorDischarges returns the discharges presented by the acceptor during authentication.
// Discharges are organized in a map keyed by the discharge-identifier.
func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
+
+func (c *Conn) readLoop(ctx *context.T) {
+ for {
+ x, err := c.mp.readMsg(ctx)
+ if err != nil {
+ ctx.Errorf("Error reading from connection to %s: %v", c.remote, err)
+ // TODO(mattr): tear down the conn.
+ }
+
+ switch msg := x.(type) {
+ case *tearDown:
+ // TODO(mattr): tear down the conn.
+
+ case *openFlow:
+ c.mu.Lock()
+ f := c.newFlowLocked(ctx, msg.id)
+ c.mu.Unlock()
+
+ c.handler.HandleFlow(f)
+ err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ err := c.mp.writeMsg(ctx, &addRecieveBuffers{
+ counters: map[flowID]uint64{msg.id: defaultBufferSize},
+ })
+ return 0, true, err
+ })
+ if err != nil {
+ // TODO(mattr): Maybe in this case we should close the conn.
+ ctx.Errorf("Error sending counters on connection to %s: %v", c.remote, err)
+ }
+
+ case *addRecieveBuffers:
+ release := make([]flowcontrol.Release, 0, len(msg.counters))
+ c.mu.Lock()
+ for fid, val := range msg.counters {
+ if f := c.flows[fid]; f != nil {
+ release = append(release, flowcontrol.Release{
+ Worker: f.worker,
+ Tokens: int(val),
+ })
+ }
+ }
+ c.mu.Unlock()
+ if err := c.fc.Release(release); err != nil {
+ ctx.Errorf("Error releasing counters from connection to %s: %v", c.remote, err)
+ }
+
+ case *data:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Errorf("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
+ continue
+ }
+ if err := f.q.Put(msg.payload); err != nil {
+ ctx.Errorf("Ignoring data message for closed flow on connection to %s: %d", c.remote, msg.id)
+ }
+ // TODO(mattr): perhaps close the flow.
+ // TODO(mattr): check if the q is full.
+
+ case *unencryptedData:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Errorf("Ignoring data message for unknown flow: %d", msg.id)
+ continue
+ }
+ if err := f.q.Put(msg.payload); err != nil {
+ ctx.Errorf("Ignoring data message for closed flow: %d", msg.id)
+ }
+ // TODO(mattr): perhaps close the flow.
+ // TODO(mattr): check if the q is full.
+
+ default:
+ // TODO(mattr): tearDown the conn.
+ }
+ }
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
new file mode 100644
index 0000000..4fb158b
--- /dev/null
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/rpc/version"
+ "v.io/v23/security"
+ _ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test"
+)
+
+func init() {
+ test.Init()
+}
+
+func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn) {
+ dmrw, amrw := newMRWPair(ctx)
+ versions := version.RPCVersionRange{Min: 3, Max: 5}
+ d, err := NewDialed(ctx, dmrw, nil, nil, versions, fh(dflows), nil)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ a, err := NewAccepted(ctx, amrw, nil, security.Blessings{}, versions, fh(aflows))
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ return d, a
+}
+
+func testWrite(t *testing.T, ctx *context.T, dialer *Conn, flows <-chan flow.Flow) {
+ df, err := dialer.Dial(ctx)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ want := "hello world"
+ df.WriteMsgAndClose([]byte(want[:5]), []byte(want[5:]))
+ af := <-flows
+ msg, err := af.ReadMsg()
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ if got := string(msg); got != want {
+ t.Errorf("Got: %s want %s", got, want)
+ }
+}
+
+func TestDailerDialsFlow(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ aflows := make(chan flow.Flow, 1)
+ d, _ := setupConns(t, ctx, nil, aflows)
+ testWrite(t, ctx, d, aflows)
+}
+
+func TestAcceptorDialsFlow(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ dflows := make(chan flow.Flow, 1)
+ _, a := setupConns(t, ctx, dflows, nil)
+ testWrite(t, ctx, a, dflows)
+}
+
+// TODO(mattr): List of tests to write
+// 1. multiple writes
+// 2. interleave writemsg and write
+// 3. interleave read and readmsg
+// 4. multiple reads
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
new file mode 100644
index 0000000..253ec86
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -0,0 +1,19 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+ InvalidMsg(typ byte, size, field int64) {
+ "en":"message of type{:typ} and size{:size} failed decoding at field{:field}."}
+ InvalidControlMsg(cmd byte, size, field int64) {
+ "en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
+ UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
+ UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
new file mode 100644
index 0000000..15fb4cd
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -0,0 +1,49 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package conn
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/verror"
+)
+
+var (
+ ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+ ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidMsg.ID), "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+}
+
+// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
+func NewErrInvalidMsg(ctx *context.T, typ byte, size int64, field int64) error {
+ return verror.New(ErrInvalidMsg, ctx, typ, size, field)
+}
+
+// NewErrInvalidControlMsg returns an error with the ErrInvalidControlMsg ID.
+func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size int64, field int64) error {
+ return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field)
+}
+
+// NewErrUnknownMsg returns an error with the ErrUnknownMsg ID.
+func NewErrUnknownMsg(ctx *context.T, typ byte) error {
+ return verror.New(ErrUnknownMsg, ctx, typ)
+}
+
+// NewErrUnknownControlMsg returns an error with the ErrUnknownControlMsg ID.
+func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
+ return verror.New(ErrUnknownControlMsg, ctx, cmd)
+}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
new file mode 100644
index 0000000..6ea0246
--- /dev/null
+++ b/runtime/internal/flow/conn/flow.go
@@ -0,0 +1,227 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/flow/flowcontrol"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type flw struct {
+ id flowID
+ ctx *context.T
+ conn *Conn
+ closed chan struct{}
+ worker *flowcontrol.Worker
+ opened bool
+ q *upcqueue.T
+ readBufs [][]byte
+ dialerBlessings security.Blessings
+ dialerDischarges map[string]security.Discharge
+}
+
+var _ flow.Flow = &flw{}
+
+func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
+ f := &flw{
+ id: id,
+ ctx: ctx,
+ conn: c,
+ closed: make(chan struct{}),
+ worker: c.fc.NewWorker(flowPriority),
+ q: upcqueue.New(),
+ }
+ c.flows[id] = f
+ return f
+}
+
+func (f *flw) dialed() bool {
+ return f.id%2 == 0
+}
+
+// Implement io.Reader.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) Read(p []byte) (n int, err error) {
+ for {
+ for len(f.readBufs) > 0 && len(f.readBufs[0]) == 0 {
+ f.readBufs = f.readBufs[1:]
+ }
+ if len(f.readBufs) > 0 {
+ break
+ }
+ var msg interface{}
+ msg, err = f.q.Get(f.ctx.Done())
+ f.readBufs = msg.([][]byte)
+ }
+ n = copy(p, f.readBufs[0])
+ f.readBufs[0] = f.readBufs[0][n:]
+ return
+}
+
+// ReadMsg is like read, but it reads bytes in chunks. Depending on the
+// implementation the batch boundaries might or might not be significant.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) ReadMsg() (buf []byte, err error) {
+ for {
+ for len(f.readBufs) > 0 {
+ buf, f.readBufs = f.readBufs[0], f.readBufs[1:]
+ if len(buf) > 0 {
+ return buf, nil
+ }
+ }
+ bufs, err := f.q.Get(f.ctx.Done())
+ if err != nil {
+ return nil, err
+ }
+ f.readBufs = bufs.([][]byte)
+ }
+}
+
+// Implement io.Writer.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) Write(p []byte) (n int, err error) {
+ return f.WriteMsg(p)
+}
+
+func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+ sent := 0
+ var left []byte
+
+ f.ctx.VI(3).Infof("trying to write: %d.", f.id)
+ err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
+ f.ctx.VI(3).Infof("writing: %d.", f.id)
+ if !f.opened {
+ // TODO(mattr): we should be able to send multiple messages
+ // in a single writeMsg call.
+ err := f.conn.mp.writeMsg(f.ctx, &openFlow{
+ id: f.id,
+ initialCounters: defaultBufferSize,
+ })
+ if err != nil {
+ return 0, false, err
+ }
+ f.opened = true
+ }
+ size := 0
+ var bufs [][]byte
+ if len(left) > 0 {
+ size += len(left)
+ bufs = append(bufs, left)
+ }
+ for size <= tokens && len(parts) > 0 {
+ bufs = append(bufs, parts[0])
+ size += len(parts[0])
+ parts = parts[1:]
+ }
+ if size > tokens {
+ lidx := len(bufs) - 1
+ last := bufs[lidx]
+ take := len(last) - (size - tokens)
+ bufs[lidx] = last[:take]
+ left = last[take:]
+ }
+ d := &data{
+ id: f.id,
+ payload: bufs,
+ }
+ done := len(left) == 0 && len(parts) == 0
+ if alsoClose && done {
+ d.flags |= closeFlag
+ }
+ sent += size
+ return size, done, f.conn.mp.writeMsg(f.ctx, d)
+ })
+ return sent, err
+}
+
+// WriteMsg is like Write, but allows writing more than one buffer at a time.
+// The data in each buffer is written sequentially onto the flow. Returns the
+// number of bytes written. WriteMsg must return a non-nil error if it writes
+// less than the total number of bytes from all buffers.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
+ return f.writeMsg(false, parts...)
+}
+
+// WriteMsgAndClose performs WriteMsg and then closes the flow.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
+ return f.writeMsg(true, parts...)
+}
+
+// SetContext sets the context associated with the flow. Typically this is
+// used to set state that is only available after the flow is connected, such
+// as a more restricted flow timeout, or the language of the request.
+//
+// The flow.Manager associated with ctx must be the same flow.Manager that the
+// flow was dialed or accepted from, otherwise an error is returned.
+// TODO(mattr): enforce this restriction.
+//
+// TODO(mattr): update v23/flow documentation.
+// SetContext may not be called concurrently with other methods.
+func (f *flw) SetContext(ctx *context.T) error {
+ f.ctx = ctx
+ return nil
+}
+
+// LocalBlessings returns the blessings presented by the local end of the flow
+// during authentication.
+func (f *flw) LocalBlessings() security.Blessings {
+ if f.dialed() {
+ return f.dialerBlessings
+ }
+ return f.conn.AcceptorBlessings()
+}
+
+// RemoteBlessings returns the blessings presented by the remote end of the
+// flow during authentication.
+func (f *flw) RemoteBlessings() security.Blessings {
+ if f.dialed() {
+ return f.conn.AcceptorBlessings()
+ }
+ return f.dialerBlessings
+}
+
+// LocalDischarges returns the discharges presented by the local end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) LocalDischarges() map[string]security.Discharge {
+ if f.dialed() {
+ return f.dialerDischarges
+ }
+ return f.conn.AcceptorDischarges()
+}
+
+// RemoteDischarges returns the discharges presented by the remote end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) RemoteDischarges() map[string]security.Discharge {
+ if f.dialed() {
+ return f.conn.AcceptorDischarges()
+ }
+ return f.dialerDischarges
+}
+
+// Conn returns the connection the flow is multiplexed on.
+func (f *flw) Conn() flow.Conn {
+ return f.conn
+}
+
+// Closed returns a channel that remains open until the flow has been closed or
+// the ctx to the Dial or Accept call used to create the flow has been cancelled.
+func (f *flw) Closed() <-chan struct{} {
+ return f.closed
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
new file mode 100644
index 0000000..c4f40eb
--- /dev/null
+++ b/runtime/internal/flow/conn/message.go
@@ -0,0 +1,333 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "errors"
+
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc/version"
+)
+
+// TODO(mattr): Link to protocol doc.
+
+type message interface {
+ write(ctx *context.T, p *messagePipe) error
+ read(ctx *context.T, data []byte) error
+}
+
+// message types.
+const (
+ invalidType = iota
+ controlType
+ dataType
+ unencryptedDataType
+)
+
+// control commands.
+const (
+ invalidCmd = iota
+ setupCmd
+ tearDownCmd
+ openFlowCmd
+ addRecieveBuffersCmd
+)
+
+// setup options.
+const (
+ invalidOption = iota
+ peerNaClPublicKeyOption
+ peerRemoteEndpointOption
+ peerLocalEndpointOption
+)
+
+// data flags.
+const (
+ closeFlag = 1 << iota
+ metadataFlag
+)
+
+// random consts.
+const (
+ maxVarUint64Size = 9
+)
+
+// setup is the first message over the wire. It negotiates protocol version
+// and encryption options for connection.
+type setup struct {
+ versions version.RPCVersionRange
+ PeerNaClPublicKey *[32]byte
+ PeerRemoteEndpoint naming.Endpoint
+ PeerLocalEndpoint naming.Endpoint
+}
+
+func (m *setup) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
+ return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
+}
+func (m *setup) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
+ }
+ m.versions.Min = version.RPCVersion(v)
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
+ }
+ m.versions.Max = version.RPCVersion(v)
+ return nil
+}
+
+// tearDown is sent over the wire before a connection is closed.
+type tearDown struct {
+ Err error
+}
+
+func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
+ return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Err.Error())})
+}
+func (m *tearDown) read(ctx *context.T, data []byte) error {
+ m.Err = errors.New(string(data))
+ return nil
+}
+
+// openFlow is sent at the beginning of every new flow.
+type openFlow struct {
+ id flowID
+ initialCounters uint64
+}
+
+func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
+ return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
+}
+func (m *openFlow) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
+ }
+ m.id = flowID(v)
+ if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
+ }
+ return nil
+}
+
+// addRecieveBuffers is sent as flows are read from locally. The counters
+// inform remote writers that there is local buffer space available.
+type addRecieveBuffers struct {
+ counters map[flowID]uint64
+}
+
+func (m *addRecieveBuffers) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = p.controlBuf[:0]
+ for fid, val := range m.counters {
+ p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
+ p.controlBuf = writeVarUint64(val, p.controlBuf)
+ }
+ return p.write([][]byte{{controlType}}, [][]byte{{addRecieveBuffersCmd}, p.controlBuf})
+}
+func (m *addRecieveBuffers) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ fid, val uint64
+ n int64
+ )
+ m.counters = map[flowID]uint64{}
+ for len(data) > 0 {
+ if fid, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n)
+ }
+ if val, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n+1)
+ }
+ m.counters[flowID(fid)] = val
+ n += 2
+ }
+ return nil
+}
+
+// data carries encrypted data for a specific flow.
+type data struct {
+ id flowID
+ flags uint64
+ payload [][]byte
+}
+
+func (m *data) write(ctx *context.T, p *messagePipe) error {
+ p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+ p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+ encrypted := append([][]byte{p.dataBuf}, m.payload...)
+ return p.write([][]byte{{dataType}}, encrypted)
+}
+func (m *data) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
+ }
+ m.id = flowID(v)
+ if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
+ }
+ m.payload = [][]byte{data}
+ return nil
+}
+
+// unencryptedData carries unencrypted data for a specific flow.
+type unencryptedData struct {
+ id flowID
+ flags uint64
+ payload [][]byte
+}
+
+func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
+ p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+ p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+ // re-use the controlBuf for the data size.
+ size := uint64(0)
+ for _, b := range m.payload {
+ size += uint64(len(b))
+ }
+ p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
+ unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
+ return p.write(unencrypted, [][]byte{p.dataBuf})
+}
+func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
+ }
+ plen := int(v)
+ if plen > len(data) {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
+ }
+ m.payload, data = [][]byte{data[:plen]}, data[plen:]
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
+ }
+ m.id = flowID(v)
+ if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
+ }
+ return nil
+}
+
+type messagePipe struct {
+ rw flow.MsgReadWriter
+ controlBuf []byte
+ dataBuf []byte
+ outBuf [][]byte
+}
+
+func newMessagePipe(rw flow.MsgReadWriter) *messagePipe {
+ return &messagePipe{
+ rw: rw,
+ controlBuf: make([]byte, 256),
+ dataBuf: make([]byte, 2*maxVarUint64Size),
+ outBuf: make([][]byte, 5),
+ }
+}
+
+func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
+ p.outBuf = append(p.outBuf[:0], unencrypted...)
+ p.outBuf = append(p.outBuf, encrypted...)
+ _, err := p.rw.WriteMsg(p.outBuf...)
+ return err
+}
+
+func (p *messagePipe) writeMsg(ctx *context.T, m message) error {
+ return m.write(ctx, p)
+}
+
+func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
+ msg, err := p.rw.ReadMsg()
+ if len(msg) == 0 {
+ return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
+ }
+ if err != nil {
+ return nil, err
+ }
+ var m message
+ switch msg[0] {
+ case controlType:
+ if len(msg) == 1 {
+ return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
+ }
+ msg = msg[1:]
+ switch msg[0] {
+ case setupCmd:
+ m = &setup{}
+ case tearDownCmd:
+ m = &tearDown{}
+ case openFlowCmd:
+ m = &openFlow{}
+ case addRecieveBuffersCmd:
+ m = &addRecieveBuffers{}
+ default:
+ return nil, NewErrUnknownControlMsg(ctx, msg[0])
+ }
+ case dataType:
+ m = &data{}
+ case unencryptedDataType:
+ m = &unencryptedData{}
+ default:
+ return nil, NewErrUnknownMsg(ctx, msg[0])
+ }
+ return m, m.read(ctx, msg[1:])
+}
+
+func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
+ if len(data) == 0 {
+ return 0, data, false
+ }
+ l := data[0]
+ if l <= 0x7f {
+ return uint64(l), data[1:], true
+ }
+ l = 0xff - l + 1
+ if l > 8 || len(data)-1 < int(l) {
+ return 0, data, false
+ }
+ var out uint64
+ for i := 1; i < int(l+1); i++ {
+ out = out<<8 | uint64(data[i])
+ }
+ return out, data[l+1:], true
+}
+
+func writeVarUint64(u uint64, buf []byte) []byte {
+ if u <= 0x7f {
+ return append(buf, byte(u))
+ }
+ shift, l := 56, byte(7)
+ for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
+ }
+ buf = append(buf, 0xff-l)
+ for ; shift >= 0; shift -= 8 {
+ buf = append(buf, byte(u>>uint(shift))&0xff)
+ }
+ return buf
+}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
new file mode 100644
index 0000000..caf0d7e
--- /dev/null
+++ b/runtime/internal/flow/conn/message_test.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "errors"
+ "reflect"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc/version"
+ _ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test"
+)
+
+func init() {
+ test.Init()
+}
+
+func TestVarInt(t *testing.T) {
+ cases := []uint64{
+ 0x00, 0x01,
+ 0x7f, 0x80,
+ 0xff, 0x100,
+ 0xffff, 0x10000,
+ 0xffffff, 0x1000000,
+ 0xffffffff, 0x100000000,
+ 0xffffffffff, 0x10000000000,
+ 0xffffffffffff, 0x1000000000000,
+ 0xffffffffffffff, 0x100000000000000,
+ 0xffffffffffffffff,
+ }
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ for _, want := range cases {
+ got, b, valid := readVarUint64(ctx, writeVarUint64(want, []byte{}))
+ if !valid {
+ t.Fatalf("error reading %x", want)
+ }
+ if len(b) != 0 {
+ t.Errorf("unexpected buffer remaining for %x: %v", want, b)
+ }
+ if got != want {
+ t.Errorf("got: %d want: %d", got, want)
+ }
+ }
+}
+
+func testMessages(t *testing.T, cases []message) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ w, r := newMRWPair(ctx)
+ wp, rp := newMessagePipe(w), newMessagePipe(r)
+ for _, want := range cases {
+ ch := make(chan struct{})
+ go func() {
+ if err := wp.writeMsg(ctx, want); err != nil {
+ t.Errorf("unexpected error for %#v: %v", want, err)
+ }
+ close(ch)
+ }()
+ got, err := rp.readMsg(ctx)
+ if err != nil {
+ t.Errorf("unexpected error reading %#v: %v", want, err)
+ }
+ <-ch
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %#v, want %#v", got, want)
+ }
+ }
+}
+
+func TestSetup(t *testing.T) {
+ testMessages(t, []message{
+ &setup{versions: version.RPCVersionRange{Min: 3, Max: 5}},
+ })
+}
+
+func TestTearDown(t *testing.T) {
+ testMessages(t, []message{
+ &tearDown{Err: errors.New("foobar")},
+ })
+}
+
+func TestOpenFlow(t *testing.T) {
+ testMessages(t, []message{
+ &openFlow{id: 23, initialCounters: 1 << 20},
+ })
+}
+
+func TestAddReceiveBuffers(t *testing.T) {
+ testMessages(t, []message{
+ &addRecieveBuffers{counters: map[flowID]uint64{}},
+ &addRecieveBuffers{counters: map[flowID]uint64{
+ 4: 233,
+ 9: 423242,
+ }},
+ })
+}
+
+func TestData(t *testing.T) {
+ testMessages(t, []message{
+ &data{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
+ })
+}
+
+func TestUnencryptedData(t *testing.T) {
+ testMessages(t, []message{
+ &unencryptedData{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
+ })
+}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
new file mode 100644
index 0000000..5123e3e
--- /dev/null
+++ b/runtime/internal/flow/conn/util_test.go
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/flow"
+)
+
+type mRW struct {
+ recieve <-chan []byte
+ send chan<- []byte
+ ctx *context.T
+}
+
+func newMRWPair(ctx *context.T) (flow.MsgReadWriter, flow.MsgReadWriter) {
+ ac, bc := make(chan []byte), make(chan []byte)
+ a := &mRW{recieve: ac, send: bc, ctx: ctx}
+ b := &mRW{recieve: bc, send: ac, ctx: ctx}
+ return a, b
+}
+
+func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
+ buf := []byte{}
+ for _, d := range data {
+ buf = append(buf, d...)
+ }
+ f.send <- buf
+ f.ctx.VI(5).Infof("Wrote: %v", buf)
+ return len(buf), nil
+}
+func (f *mRW) ReadMsg() (buf []byte, err error) {
+ buf = <-f.recieve
+ f.ctx.VI(5).Infof("Read: %v", buf)
+ return buf, nil
+}
+
+type fh chan<- flow.Flow
+
+func (fh fh) HandleFlow(f flow.Flow) error {
+ if fh == nil {
+ panic("writing to nil flow handler")
+ }
+ fh <- f
+ return nil
+}
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
error (
LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+ UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+ ManagerClosed() {"en": "manager is already closed"}
+ AcceptFailed(err error) {"en": "accept failed{:err}"}
)
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
var (
ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+ ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+ ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+ ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
}
// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
func NewErrLargerThan3ByteUInt(ctx *context.T) error {
return verror.New(ErrLargerThan3ByteUInt, ctx)
}
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+ return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+ return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+ return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "net"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+
+ "v.io/x/ref/runtime/internal/flow/conn"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+ rid naming.RoutingID
+ closed <-chan struct{}
+ q *upcqueue.T
+
+ mu *sync.Mutex
+ listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+ m := &manager{
+ rid: rid,
+ closed: ctx.Done(),
+ mu: &sync.Mutex{},
+ q: upcqueue.New(),
+ }
+ return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+ netLn, err := listen(ctx, protocol, address)
+ if err != nil {
+ return flow.NewErrNetwork(ctx, err)
+ }
+ local := &inaming.Endpoint{
+ Protocol: protocol,
+ Address: netLn.Addr().String(),
+ RID: m.rid,
+ }
+ m.mu.Lock()
+ m.listenEndpoints = append(m.listenEndpoints, local)
+ m.mu.Unlock()
+ go m.netLnAcceptLoop(ctx, netLn, local)
+ return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+ const killConnectionsRetryDelay = 5 * time.Millisecond
+ for {
+ netConn, err := netLn.Accept()
+ for tokill := 1; isTemporaryError(err); tokill *= 2 {
+ if isTooManyOpenFiles(err) {
+ // TODO(suharshs): Find a way to kill connections here. We will need
+ // caching to be able to delete the connections.
+ } else {
+ tokill = 1
+ }
+ time.Sleep(killConnectionsRetryDelay)
+ netConn, err = netLn.Accept()
+ }
+ if err != nil {
+ ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+ }
+ // TODO(suharshs): This conn needs to be cached instead of ignored.
+ _, err = conn.NewAccepted(
+ ctx,
+ &framer{ReadWriter: netConn},
+ local,
+ v23.GetPrincipal(ctx).BlessingStore().Default(),
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ )
+ if err != nil {
+ netConn.Close()
+ ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+ }
+ }
+}
+
+type flowHandler struct {
+ q *upcqueue.T
+ closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+ select {
+ case <-h.closed:
+ // This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+ h.q.Close()
+ default:
+ }
+ return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+ m.mu.Lock()
+ ret := make([]naming.Endpoint, len(m.listenEndpoints))
+ copy(ret, m.listenEndpoints)
+ m.mu.Unlock()
+ return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+// err := m.Listen(ctx, "tcp", ":0")
+// for {
+// flow, err := m.Accept(ctx)
+// // process flow
+// }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+ // TODO(suharshs): Ensure that m is attached to ctx.
+ item, err := m.q.Get(m.closed)
+ switch {
+ case err == upcqueue.ErrQueueIsClosed:
+ return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+ case err != nil:
+ return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+ default:
+ return item.(flow.Flow), nil
+ }
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+ // TODO(suharshs): Add caching of connections.
+ addr := remote.Addr()
+ d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+ netConn, err := dial(ctx, d, addr.Network(), addr.String())
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+
+ c, err := conn.NewDialed(
+ ctx,
+ &framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+ localEndpoint(netConn, m.rid),
+ remote,
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ fn,
+ )
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+ return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+ if d != nil {
+ var timeout time.Duration
+ if dl, ok := ctx.Deadline(); ok {
+ timeout = dl.Sub(time.Now())
+ }
+ return d(protocol, address, timeout)
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+ if r != nil {
+ net, addr, err := r(protocol, address)
+ if err != nil {
+ return "", "", err
+ }
+ return net, addr, nil
+ }
+ return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+ if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+ ln, err := l(protocol, address)
+ if err != nil {
+ return nil, err
+ }
+ return ln, nil
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+ localAddr := conn.LocalAddr()
+ ep := &inaming.Endpoint{
+ Protocol: localAddr.Network(),
+ Address: localAddr.String(),
+ RID: rid,
+ }
+ return ep
+}
+
+func isTemporaryError(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/runtime/internal/flow/manager"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ p := testutil.NewPrincipal("test")
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ rid := naming.FixedRoutingID(0x5555)
+ m := manager.New(ctx, rid)
+ want := "read this please"
+
+ if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+
+ bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+ eps := m.ListeningEndpoints()
+ if len(eps) == 0 {
+ t.Fatalf("no endpoints listened on")
+ }
+ flow, err := m.Dial(ctx, eps[0], bFn)
+ if err != nil {
+ t.Error(err)
+ }
+ writeLine(flow, want)
+
+ flow, err = m.Accept(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := readLine(flow)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func readLine(f flow.Flow) (string, error) {
+ s, err := bufio.NewReader(f).ReadString('\n')
+ return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+ data += "\n"
+ _, err := f.Write([]byte(data))
+ return err
+}
diff --git a/runtime/internal/mojo_util.go b/runtime/internal/mojo_util.go
new file mode 100644
index 0000000..e434d68
--- /dev/null
+++ b/runtime/internal/mojo_util.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package internal
+
+import (
+ "flag"
+ "os"
+ "strings"
+
+ "v.io/x/ref/lib/flags"
+)
+
+// TODO(sadovsky): Terrible, terrible hack.
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+ // We expect that command-line flags have not been parsed. v23_util performs
+ // command-line parsing at this point. For Mojo, we instead parse command-line
+ // flags from the V23_MOJO_FLAGS env var.
+ // TODO(sadovsky): Maybe move this check to util.go, or drop it?
+ if flag.CommandLine.Parsed() {
+ panic("flag.CommandLine.Parse() has been called")
+ }
+ // TODO(sadovsky): Support argument quoting. More generally, parse this env
+ // var similar to how bash parses arguments.
+ return f.Parse(strings.Split(os.Getenv("V23_MOJO_FLAGS"), " "), config)
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
// Min is incremented whenever we want to remove support for old protocol
// versions.
var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
func init() {
metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))
diff --git a/runtime/internal/util.go b/runtime/internal/util.go
index 4df3db4..f5f6f6a 100644
--- a/runtime/internal/util.go
+++ b/runtime/internal/util.go
@@ -7,7 +7,6 @@
import (
"fmt"
"net"
- "os"
"strings"
"v.io/x/lib/netstate"
@@ -40,7 +39,7 @@
if handle != nil {
config = handle.Config.Dump()
}
- return f.Parse(os.Args[1:], config)
+ return parseFlagsInternal(f, config)
}
// IPAddressChooser returns the preferred IP address, which is,
diff --git a/runtime/internal/v23_util.go b/runtime/internal/v23_util.go
new file mode 100644
index 0000000..5375851
--- /dev/null
+++ b/runtime/internal/v23_util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !mojo
+
+package internal
+
+import (
+ "os"
+
+ "v.io/x/ref/lib/flags"
+)
+
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+ return f.Parse(os.Args[1:], config)
+}
diff --git a/services/device/deviced/internal/impl/globsuid/signature_match_test.go b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
index 8f7c3c5..8dbd380 100644
--- a/services/device/deviced/internal/impl/globsuid/signature_match_test.go
+++ b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
@@ -28,9 +28,9 @@
)
func TestDownloadSignatureMatch(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
sh, deferFn := servicetest.CreateShellAndMountTable(t, ctx, nil)
defer deferFn()
@@ -39,7 +39,7 @@
pkgVON := naming.Join(binaryVON, "testpkg")
defer utiltest.StartRealBinaryRepository(t, ctx, binaryVON)()
- up := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+ up := rg.RandomBytes(rg.RandomIntn(5 << 20))
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
sig, err := binarylib.Upload(ctx, naming.Join(binaryVON, "testbinary"), up, mediaInfo)
if err != nil {
@@ -52,7 +52,7 @@
t.Fatalf("ioutil.TempDir failed: %v", err)
}
defer os.RemoveAll(tmpdir)
- pkgContents := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+ pkgContents := rg.RandomBytes(rg.RandomIntn(5 << 20))
if err := ioutil.WriteFile(filepath.Join(tmpdir, "pkg.txt"), pkgContents, 0600); err != nil {
t.Fatalf("ioutil.WriteFile failed: %v", err)
}
diff --git a/services/groups/internal/store/leveldb/store.go b/services/groups/internal/store/leveldb/store.go
index 16af2de..964883c 100644
--- a/services/groups/internal/store/leveldb/store.go
+++ b/services/groups/internal/store/leveldb/store.go
@@ -52,40 +52,40 @@
}
func (st *T) Insert(key string, value interface{}) error {
- return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
- if _, err := get(db, key); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+ if _, err := get(tx, key); verror.ErrorID(err) != store.ErrUnknownKey.ID {
if err != nil {
return err
}
return verror.New(store.ErrKeyExists, nil, key)
}
- return put(db, key, &entry{Value: value})
+ return put(tx, key, &entry{Value: value})
})
}
func (st *T) Update(key string, value interface{}, version string) error {
- return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
- e, err := get(db, key)
+ return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+ e, err := get(tx, key)
if err != nil {
return err
}
if err := e.checkVersion(version); err != nil {
return err
}
- return put(db, key, &entry{Value: value, Version: e.Version + 1})
+ return put(tx, key, &entry{Value: value, Version: e.Version + 1})
})
}
func (st *T) Delete(key string, version string) error {
- return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
- e, err := get(db, key)
+ return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+ e, err := get(tx, key)
if err != nil {
return err
}
if err := e.checkVersion(version); err != nil {
return err
}
- return delete(db, key)
+ return delete(tx, key)
})
}
@@ -93,8 +93,8 @@
return convertError(st.db.Close())
}
-func get(db istore.StoreReadWriter, key string) (*entry, error) {
- bytes, _ := db.Get([]byte(key), nil)
+func get(st istore.StoreReader, key string) (*entry, error) {
+ bytes, _ := st.Get([]byte(key), nil)
if bytes == nil {
return nil, verror.New(store.ErrUnknownKey, nil, key)
}
@@ -105,19 +105,19 @@
return e, nil
}
-func put(db istore.StoreReadWriter, key string, e *entry) error {
+func put(stw istore.StoreWriter, key string, e *entry) error {
bytes, err := vom.Encode(e)
if err != nil {
return convertError(err)
}
- if err := db.Put([]byte(key), bytes); err != nil {
+ if err := stw.Put([]byte(key), bytes); err != nil {
return convertError(err)
}
return nil
}
-func delete(db istore.StoreReadWriter, key string) error {
- if err := db.Delete([]byte(key)); err != nil {
+func delete(stw istore.StoreWriter, key string) error {
+ if err := stw.Delete([]byte(key)); err != nil {
return convertError(err)
}
return nil
diff --git a/services/identity/internal/rest_signer_test/main.go b/services/identity/internal/rest_signer_test/main.go
index 03995d6..15e55ff 100644
--- a/services/identity/internal/rest_signer_test/main.go
+++ b/services/identity/internal/rest_signer_test/main.go
@@ -4,7 +4,9 @@
package main
import (
+ "encoding/base64"
"fmt"
+ "math/big"
"v.io/x/ref/services/identity/internal/server"
)
@@ -15,11 +17,19 @@
fmt.Printf("NewRestSigner error: %v\n", err)
return
}
+ der, err := signer.PublicKey().MarshalBinary()
+ if err != nil {
+ fmt.Printf("Failed to marshal public key: %v\n", err)
+ return
+ }
sig, err := signer.Sign([]byte("purpose"), []byte("message"))
if err != nil {
fmt.Printf("Sign error: %v\n", err)
return
}
ok := sig.Verify(signer.PublicKey(), []byte("message"))
+ fmt.Printf("PublicKey: %v\n", base64.URLEncoding.EncodeToString(der))
+ fmt.Printf("R: %v\n", big.NewInt(0).SetBytes(sig.R))
+ fmt.Printf("S: %v\n", big.NewInt(0).SetBytes(sig.S))
fmt.Printf("Verified: %v\n", ok)
}
diff --git a/services/identity/internal/server/rest_signer_test.go b/services/identity/internal/server/rest_signer_test.go
index ef4cd11..20252a8 100644
--- a/services/identity/internal/server/rest_signer_test.go
+++ b/services/identity/internal/server/rest_signer_test.go
@@ -14,8 +14,9 @@
)
func TestDecode(t *testing.T) {
- encodedKey := &signer.PublicKey{Base64: "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPqDbuT2B9Bb3JMcOGd2mm4bQuKSREeSKRt8_oofo0jRYiKFQ2ZVuCqssA-IUvFArT5KfXc6B9BNesgS10rPKrg=="}
- encodedSig := &signer.VSignature{R: "0x42bca58e435f906c874536789cfc31656dd8f9ffbd3b7be84181611cc04eaf74", S: "0xa6f57e858a9f36b559e9cd9f13854b90fad49e0c5591ed66033fd286682b2078"}
+ // To generate encodedKey and encodedSig run the binary in v.io/x/ref/services/identity/internal/rest_signer_test
+ encodedKey := &signer.PublicKey{Base64: "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEx8xywmgl2_UmDUoJxrh2N9pAij7jg1kIqruKpnT6SNtcNubCG_PgdpWqiVLp3zBWlw1T3F2ecy4iGpi5N4Yj-A=="}
+ encodedSig := &signer.VSignature{R: "90128808689861327833210881969781001621382090117447023854233028840694123302875", S: "102696248968928040866906648206566376772954871370602978407028885005693672370943"}
key, err := server.DecodePublicKey(encodedKey)
if err != nil {
diff --git a/services/internal/binarylib/client_test.go b/services/internal/binarylib/client_test.go
index 000e8ca..7c83ca2 100644
--- a/services/internal/binarylib/client_test.go
+++ b/services/internal/binarylib/client_test.go
@@ -73,13 +73,13 @@
// TestBufferAPI tests the binary repository client-side library
// interface using buffers.
func TestBufferAPI(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
von, cleanup := setupRepository(t, ctx)
defer cleanup()
- data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+ data := rg.RandomBytes(rg.RandomIntn(10 << 20))
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
sig, err := Upload(ctx, von, data, mediaInfo)
if err != nil {
@@ -125,14 +125,14 @@
// TestFileAPI tests the binary repository client-side library
// interface using files.
func TestFileAPI(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
von, cleanup := setupRepository(t, ctx)
defer cleanup()
// Create up to 10MB of random bytes.
- data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+ data := rg.RandomBytes(rg.RandomIntn(10 << 20))
dir, prefix := "", ""
src, err := ioutil.TempFile(dir, prefix)
if err != nil {
diff --git a/services/internal/binarylib/http_test.go b/services/internal/binarylib/http_test.go
index 73b253f..198cb75 100644
--- a/services/internal/binarylib/http_test.go
+++ b/services/internal/binarylib/http_test.go
@@ -21,9 +21,9 @@
// TestHTTP checks that HTTP download works.
func TestHTTP(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
// TODO(caprita): This is based on TestMultiPart (impl_test.go). Share
// the code where possible.
@@ -34,8 +34,8 @@
data := make([][]byte, length)
for i := 0; i < length; i++ {
// Random size, but at least 1 (avoid empty parts).
- size := testutil.RandomIntn(1000*binarylib.BufferLength) + 1
- data[i] = testutil.RandomBytes(size)
+ size := rg.RandomIntn(1000*binarylib.BufferLength) + 1
+ data[i] = rg.RandomBytes(size)
}
mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
if err := binary.Create(ctx, int32(length), mediaInfo); err != nil {
diff --git a/services/internal/binarylib/impl_test.go b/services/internal/binarylib/impl_test.go
index a614217..1ffd9a6 100644
--- a/services/internal/binarylib/impl_test.go
+++ b/services/internal/binarylib/impl_test.go
@@ -79,11 +79,12 @@
func TestHierarchy(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for i := 0; i < md5.Size; i++ {
binary, ep, _, cleanup := startServer(t, ctx, i)
defer cleanup()
- data := testData()
+ data := testData(rg)
// Test the binary repository interface.
if err := binary.Create(ctx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -131,6 +132,7 @@
func TestMultiPart(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for length := 2; length < 5; length++ {
binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -138,7 +140,7 @@
// Create <length> chunks of up to 4MB of random bytes.
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
}
// Test the binary repository interface.
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -181,9 +183,9 @@
// resumption ranging the number of parts the uploaded binary consists
// of.
func TestResumption(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
for length := 2; length < 5; length++ {
binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -191,7 +193,7 @@
// Create <length> chunks of up to 4MB of random bytes.
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
}
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed: %v", err)
@@ -211,7 +213,7 @@
break
}
for i := 0; i < length; i++ {
- fail := testutil.RandomIntn(2)
+ fail := rg.RandomIntn(2)
if parts[i] == binarylib.MissingPart && fail != 0 {
if streamErr, err := invokeUpload(t, ctx, binary, data[i], int32(i)); streamErr != nil || err != nil {
t.FailNow()
@@ -227,18 +229,18 @@
// TestErrors checks that the binary interface correctly reports errors.
func TestErrors(t *testing.T) {
- testutil.InitRandGenerator(t.Logf)
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
binary, _, _, cleanup := startServer(t, ctx, 2)
defer cleanup()
const length = 2
data := make([][]byte, length)
for i := 0; i < length; i++ {
- data[i] = testData()
+ data[i] = testData(rg)
for j := 0; j < len(data[i]); j++ {
- data[i][j] = byte(testutil.RandomInt())
+ data[i][j] = byte(rg.RandomInt())
}
}
if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -295,10 +297,11 @@
func TestGlob(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
_, ep, _, cleanup := startServer(t, ctx, 2)
defer cleanup()
- data := testData()
+ data := testData(rg)
objects := []string{"foo", "bar", "hello world", "a/b/c"}
for _, obj := range objects {
diff --git a/services/internal/binarylib/perms_test.go b/services/internal/binarylib/perms_test.go
index b4b684a..0729b69 100644
--- a/services/internal/binarylib/perms_test.go
+++ b/services/internal/binarylib/perms_test.go
@@ -76,6 +76,7 @@
func TestBinaryCreateAccessList(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
selfCtx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("self"))
if err != nil {
@@ -105,7 +106,7 @@
if err := b("bini/private").Create(childCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataPrivate := testData()
+ fakeDataPrivate := testData(rg)
if streamErr, err := invokeUpload(t, childCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -130,6 +131,7 @@
func TestBinaryRootAccessList(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
+ rg := testutil.NewRandGenerator(t.Logf)
selfPrincipal := testutil.NewPrincipal("self")
selfCtx, err := v23.WithPrincipal(ctx, selfPrincipal)
@@ -161,7 +163,7 @@
if err := b("bini/private").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataPrivate := testData()
+ fakeDataPrivate := testData(rg)
if streamErr, err := invokeUpload(t, selfCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -169,7 +171,7 @@
if err := b("bini/shared").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataShared := testData()
+ fakeDataShared := testData(rg)
if streamErr, err := invokeUpload(t, selfCtx, b("bini/shared"), fakeDataShared, 0); streamErr != nil || err != nil {
t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
}
@@ -296,7 +298,7 @@
if err := b("bini/otherbinary").Create(otherCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
t.Fatalf("Create() failed %v", err)
}
- fakeDataOther := testData()
+ fakeDataOther := testData(rg)
if streamErr, err := invokeUpload(t, otherCtx, b("bini/otherbinary"), fakeDataOther, 0); streamErr != nil || err != nil {
t.FailNow()
}
diff --git a/services/internal/binarylib/util_test.go b/services/internal/binarylib/util_test.go
index 30b414f..cd3d785 100644
--- a/services/internal/binarylib/util_test.go
+++ b/services/internal/binarylib/util_test.go
@@ -88,8 +88,8 @@
}
// testData creates up to 4MB of random bytes.
-func testData() []byte {
- size := testutil.RandomIntn(1000 * binarylib.BufferLength)
- data := testutil.RandomBytes(size)
+func testData(rg *testutil.Random) []byte {
+ size := rg.RandomIntn(1000 * binarylib.BufferLength)
+ data := rg.RandomBytes(size)
return data
}
diff --git a/test/testutil/rand.go b/test/testutil/rand.go
index 3cb1526..8e21640 100644
--- a/test/testutil/rand.go
+++ b/test/testutil/rand.go
@@ -28,7 +28,6 @@
// Random is a concurrent-access friendly source of randomness.
type Random struct {
mu sync.Mutex
- seed int64
rand *rand.Rand
}
@@ -72,9 +71,12 @@
return buffer
}
-// Create a new pseudo-random number generator, the seed may be supplied
-// by V23_RNG_SEED to allow for reproducing a previous sequence.
-func NewRandGenerator() *Random {
+type loggingFunc func(format string, args ...interface{})
+
+// NewRandGenerator creates a new pseudo-random number generator; the seed may
+// be supplied by V23_RNG_SEED to allow for reproducing a previous sequence, and
+// is printed using the supplied logging function.
+func NewRandGenerator(logger loggingFunc) *Random {
seed := time.Now().UnixNano()
seedString := os.Getenv(SeedEnv)
if seedString != "" {
@@ -85,17 +87,35 @@
panic(fmt.Sprintf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err))
}
}
- return &Random{seed: seed, rand: rand.New(rand.NewSource(seed))}
+ logger("Seeded pseudo-random number generator with %v", seed)
+ return &Random{rand: rand.New(rand.NewSource(seed))}
}
+// TODO(caprita): Consider deprecating InitRandGenerator in favor of using
+// NewRandGenerator directly. There are several drawbacks to using the global
+// singleton Random object:
+//
+// - tests that do not call InitRandGenerator themselves could depend on
+// InitRandGenerator having been called by other tests in the same package and
+// stop working when run standalone with test --run
+//
+// - conversely, a test case may call InitRandGenerator without actually
+// needing to; it's hard to figure out if some library called by a test
+// actually uses the Random object or not
+//
+// - when several test cases share the same Random object, there is
+// interference in the stream of random numbers generated for each test case
+// if run in parallel
+//
+// All these issues can be trivially addressed if the Random object is created
+// and plumbed through the call stack explicitly.
+
// InitRandGenerator creates an instance of Random in the public variable Rand
-// and returns a function intended to be defer'ed that prints out the
-// seed use when creating the number number generator using the supplied
-// logging function.
-func InitRandGenerator(loggingFunc func(format string, args ...interface{})) {
+// and prints out the seed use when creating the number number generator using
+// the supplied logging function.
+func InitRandGenerator(logger loggingFunc) {
once.Do(func() {
- Rand = NewRandGenerator()
- loggingFunc("Seeded pseudo-random number generator with %v", Rand.seed)
+ Rand = NewRandGenerator(logger)
})
}