Merge "services/device/util_darwin_test.go: devmgr multiuser tests on darwin"
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index b5ea1e9..f31bd9a 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -35,12 +35,17 @@
 		return nil, nil, func() {}, err
 	}
 	ctx = context.WithValue(ctx, principalKey, p)
+	ctx = context.WithLogger(ctx, logger.Global())
 	return &Runtime{ns: tnaming.NewSimpleNamespace()}, ctx, func() {}, nil
 }
 
 func (r *Runtime) Init(ctx *context.T) error {
 	// nologcall
-	return logger.Manager(ctx).ConfigureFromFlags()
+	err := logger.Manager(ctx).ConfigureFromFlags()
+	if err != nil && !logger.IsAlreadyConfiguredError(err) {
+		return err
+	}
+	return nil
 }
 
 func (r *Runtime) WithPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 16ece68..14457eb 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -5,11 +5,30 @@
 package conn
 
 import (
+	"sync"
+
+	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/flow/flowcontrol"
+)
+
+// flowID is a number assigned to identify a flow.
+// Each flow on a given conn will have a unique number.
+type flowID uint64
+
+const mtu = 1 << 16
+const defaultBufferSize = 1 << 20
+const reservedFlows = 10
+
+const (
+	expressPriority = iota
+	flowPriority
+	tearDownPriority
 )
 
 // FlowHandlers process accepted flows.
@@ -20,6 +39,18 @@
 
 // Conns are a multiplexing encrypted channels that can host Flows.
 type Conn struct {
+	fc                *flowcontrol.FlowController
+	mp                *messagePipe
+	handler           FlowHandler
+	versions          version.RPCVersionRange
+	acceptorBlessings security.Blessings
+	dialerPublicKey   security.PublicKey
+	local, remote     naming.Endpoint
+	closed            chan struct{}
+
+	mu      sync.Mutex
+	nextFid flowID
+	flows   map[flowID]*flw
 }
 
 // Ensure that *Conn implements flow.Conn
@@ -29,47 +60,155 @@
 func NewDialed(
 	ctx *context.T,
 	conn flow.MsgReadWriter,
-	principal security.Principal,
 	local, remote naming.Endpoint,
 	versions version.RPCVersionRange,
 	handler FlowHandler,
 	fn flow.BlessingsForPeer) (*Conn, error) {
-	return nil, nil
+	principal := v23.GetPrincipal(ctx)
+	c := &Conn{
+		fc:              flowcontrol.New(defaultBufferSize, mtu),
+		mp:              newMessagePipe(conn),
+		handler:         handler,
+		versions:        versions,
+		dialerPublicKey: principal.PublicKey(),
+		local:           local,
+		remote:          remote,
+		nextFid:         reservedFlows,
+		flows:           map[flowID]*flw{},
+	}
+	go c.readLoop(ctx)
+	return c, nil
 }
 
 // NewAccepted accepts a new Conn on the given conn.
 func NewAccepted(
 	ctx *context.T,
 	conn flow.MsgReadWriter,
-	principal security.Principal,
 	local naming.Endpoint,
 	lBlessings security.Blessings,
 	versions version.RPCVersionRange,
-	handler FlowHandler,
-	skipDischarges bool) (*Conn, error) {
-	return nil, nil
+	handler FlowHandler) (*Conn, error) {
+	c := &Conn{
+		fc:                flowcontrol.New(defaultBufferSize, mtu),
+		mp:                newMessagePipe(conn),
+		handler:           handler,
+		versions:          versions,
+		acceptorBlessings: lBlessings,
+		local:             local,
+		nextFid:           reservedFlows + 1,
+		flows:             map[flowID]*flw{},
+	}
+	go c.readLoop(ctx)
+	return c, nil
 }
 
 // Dial dials a new flow on the Conn.
-func (c *Conn) Dial() (flow.Flow, error) { return nil, nil }
+func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+
+	id := c.nextFid
+	c.nextFid++
+
+	return c.newFlowLocked(ctx, id), nil
+}
 
 // Closed returns a channel that will be closed after the Conn is shutdown.
 // After this channel is closed it is guaranteed that all Dial calls will fail
 // with an error and no more flows will be sent to the FlowHandler.
-func (c *Conn) Closed() <-chan struct{} { return nil }
+func (c *Conn) Closed() <-chan struct{} { return c.closed }
 
 // LocalEndpoint returns the local vanadium Endpoint
-func (c *Conn) LocalEndpoint() naming.Endpoint { return nil }
+func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
 
 // RemoteEndpoint returns the remote vanadium Endpoint
-func (c *Conn) RemoteEndpoint() naming.Endpoint { return nil }
+func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
 
 // DialerPublicKey returns the public key presented by the dialer during authentication.
-func (c *Conn) DialerPublicKey() security.PublicKey { return nil }
+func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
 
 // AcceptorBlessings returns the blessings presented by the acceptor during authentication.
-func (c *Conn) AcceptorBlessings() security.Blessings { return security.Blessings{} }
+func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
 
 // AcceptorDischarges returns the discharges presented by the acceptor during authentication.
 // Discharges are organized in a map keyed by the discharge-identifier.
 func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
+
+func (c *Conn) readLoop(ctx *context.T) {
+	for {
+		x, err := c.mp.readMsg(ctx)
+		if err != nil {
+			ctx.Errorf("Error reading from connection to %s: %v", c.remote, err)
+			// TODO(mattr): tear down the conn.
+		}
+
+		switch msg := x.(type) {
+		case *tearDown:
+			// TODO(mattr): tear down the conn.
+
+		case *openFlow:
+			c.mu.Lock()
+			f := c.newFlowLocked(ctx, msg.id)
+			c.mu.Unlock()
+
+			c.handler.HandleFlow(f)
+			err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+				err := c.mp.writeMsg(ctx, &addRecieveBuffers{
+					counters: map[flowID]uint64{msg.id: defaultBufferSize},
+				})
+				return 0, true, err
+			})
+			if err != nil {
+				// TODO(mattr): Maybe in this case we should close the conn.
+				ctx.Errorf("Error sending counters on connection to %s: %v", c.remote, err)
+			}
+
+		case *addRecieveBuffers:
+			release := make([]flowcontrol.Release, 0, len(msg.counters))
+			c.mu.Lock()
+			for fid, val := range msg.counters {
+				if f := c.flows[fid]; f != nil {
+					release = append(release, flowcontrol.Release{
+						Worker: f.worker,
+						Tokens: int(val),
+					})
+				}
+			}
+			c.mu.Unlock()
+			if err := c.fc.Release(release); err != nil {
+				ctx.Errorf("Error releasing counters from connection to %s: %v", c.remote, err)
+			}
+
+		case *data:
+			c.mu.Lock()
+			f := c.flows[msg.id]
+			c.mu.Unlock()
+			if f == nil {
+				ctx.Errorf("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
+				continue
+			}
+			if err := f.q.Put(msg.payload); err != nil {
+				ctx.Errorf("Ignoring data message for closed flow on connection to %s: %d", c.remote, msg.id)
+			}
+			// TODO(mattr): perhaps close the flow.
+			// TODO(mattr): check if the q is full.
+
+		case *unencryptedData:
+			c.mu.Lock()
+			f := c.flows[msg.id]
+			c.mu.Unlock()
+			if f == nil {
+				ctx.Errorf("Ignoring data message for unknown flow: %d", msg.id)
+				continue
+			}
+			if err := f.q.Put(msg.payload); err != nil {
+				ctx.Errorf("Ignoring data message for closed flow: %d", msg.id)
+			}
+			// TODO(mattr): perhaps close the flow.
+			// TODO(mattr): check if the q is full.
+
+		default:
+			// TODO(mattr): tearDown the conn.
+		}
+	}
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
new file mode 100644
index 0000000..4fb158b
--- /dev/null
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/rpc/version"
+	"v.io/v23/security"
+	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test"
+)
+
+func init() {
+	test.Init()
+}
+
+func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn) {
+	dmrw, amrw := newMRWPair(ctx)
+	versions := version.RPCVersionRange{Min: 3, Max: 5}
+	d, err := NewDialed(ctx, dmrw, nil, nil, versions, fh(dflows), nil)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	a, err := NewAccepted(ctx, amrw, nil, security.Blessings{}, versions, fh(aflows))
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	return d, a
+}
+
+func testWrite(t *testing.T, ctx *context.T, dialer *Conn, flows <-chan flow.Flow) {
+	df, err := dialer.Dial(ctx)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	want := "hello world"
+	df.WriteMsgAndClose([]byte(want[:5]), []byte(want[5:]))
+	af := <-flows
+	msg, err := af.ReadMsg()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if got := string(msg); got != want {
+		t.Errorf("Got: %s want %s", got, want)
+	}
+}
+
+func TestDailerDialsFlow(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	aflows := make(chan flow.Flow, 1)
+	d, _ := setupConns(t, ctx, nil, aflows)
+	testWrite(t, ctx, d, aflows)
+}
+
+func TestAcceptorDialsFlow(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	dflows := make(chan flow.Flow, 1)
+	_, a := setupConns(t, ctx, dflows, nil)
+	testWrite(t, ctx, a, dflows)
+}
+
+// TODO(mattr): List of tests to write
+// 1. multiple writes
+// 2. interleave writemsg and write
+// 3. interleave read and readmsg
+// 4. multiple reads
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
new file mode 100644
index 0000000..253ec86
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -0,0 +1,19 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+  InvalidMsg(typ byte, size, field int64) {
+  "en":"message of type{:typ} and size{:size} failed decoding at field{:field}."}
+  InvalidControlMsg(cmd byte, size, field int64) {
+  "en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
+  UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
+  UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
new file mode 100644
index 0000000..15fb4cd
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -0,0 +1,49 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package conn
+
+import (
+	// VDL system imports
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/verror"
+)
+
+var (
+	ErrInvalidMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+	ErrUnknownMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+	ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+)
+
+func init() {
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidMsg.ID), "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+}
+
+// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
+func NewErrInvalidMsg(ctx *context.T, typ byte, size int64, field int64) error {
+	return verror.New(ErrInvalidMsg, ctx, typ, size, field)
+}
+
+// NewErrInvalidControlMsg returns an error with the ErrInvalidControlMsg ID.
+func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size int64, field int64) error {
+	return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field)
+}
+
+// NewErrUnknownMsg returns an error with the ErrUnknownMsg ID.
+func NewErrUnknownMsg(ctx *context.T, typ byte) error {
+	return verror.New(ErrUnknownMsg, ctx, typ)
+}
+
+// NewErrUnknownControlMsg returns an error with the ErrUnknownControlMsg ID.
+func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
+	return verror.New(ErrUnknownControlMsg, ctx, cmd)
+}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
new file mode 100644
index 0000000..6ea0246
--- /dev/null
+++ b/runtime/internal/flow/conn/flow.go
@@ -0,0 +1,227 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/flow/flowcontrol"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type flw struct {
+	id               flowID
+	ctx              *context.T
+	conn             *Conn
+	closed           chan struct{}
+	worker           *flowcontrol.Worker
+	opened           bool
+	q                *upcqueue.T
+	readBufs         [][]byte
+	dialerBlessings  security.Blessings
+	dialerDischarges map[string]security.Discharge
+}
+
+var _ flow.Flow = &flw{}
+
+func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
+	f := &flw{
+		id:     id,
+		ctx:    ctx,
+		conn:   c,
+		closed: make(chan struct{}),
+		worker: c.fc.NewWorker(flowPriority),
+		q:      upcqueue.New(),
+	}
+	c.flows[id] = f
+	return f
+}
+
+func (f *flw) dialed() bool {
+	return f.id%2 == 0
+}
+
+// Implement io.Reader.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) Read(p []byte) (n int, err error) {
+	for {
+		for len(f.readBufs) > 0 && len(f.readBufs[0]) == 0 {
+			f.readBufs = f.readBufs[1:]
+		}
+		if len(f.readBufs) > 0 {
+			break
+		}
+		var msg interface{}
+		msg, err = f.q.Get(f.ctx.Done())
+		f.readBufs = msg.([][]byte)
+	}
+	n = copy(p, f.readBufs[0])
+	f.readBufs[0] = f.readBufs[0][n:]
+	return
+}
+
+// ReadMsg is like read, but it reads bytes in chunks.  Depending on the
+// implementation the batch boundaries might or might not be significant.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) ReadMsg() (buf []byte, err error) {
+	for {
+		for len(f.readBufs) > 0 {
+			buf, f.readBufs = f.readBufs[0], f.readBufs[1:]
+			if len(buf) > 0 {
+				return buf, nil
+			}
+		}
+		bufs, err := f.q.Get(f.ctx.Done())
+		if err != nil {
+			return nil, err
+		}
+		f.readBufs = bufs.([][]byte)
+	}
+}
+
+// Implement io.Writer.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) Write(p []byte) (n int, err error) {
+	return f.WriteMsg(p)
+}
+
+func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+	sent := 0
+	var left []byte
+
+	f.ctx.VI(3).Infof("trying to write: %d.", f.id)
+	err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
+		f.ctx.VI(3).Infof("writing: %d.", f.id)
+		if !f.opened {
+			// TODO(mattr): we should be able to send multiple messages
+			// in a single writeMsg call.
+			err := f.conn.mp.writeMsg(f.ctx, &openFlow{
+				id:              f.id,
+				initialCounters: defaultBufferSize,
+			})
+			if err != nil {
+				return 0, false, err
+			}
+			f.opened = true
+		}
+		size := 0
+		var bufs [][]byte
+		if len(left) > 0 {
+			size += len(left)
+			bufs = append(bufs, left)
+		}
+		for size <= tokens && len(parts) > 0 {
+			bufs = append(bufs, parts[0])
+			size += len(parts[0])
+			parts = parts[1:]
+		}
+		if size > tokens {
+			lidx := len(bufs) - 1
+			last := bufs[lidx]
+			take := len(last) - (size - tokens)
+			bufs[lidx] = last[:take]
+			left = last[take:]
+		}
+		d := &data{
+			id:      f.id,
+			payload: bufs,
+		}
+		done := len(left) == 0 && len(parts) == 0
+		if alsoClose && done {
+			d.flags |= closeFlag
+		}
+		sent += size
+		return size, done, f.conn.mp.writeMsg(f.ctx, d)
+	})
+	return sent, err
+}
+
+// WriteMsg is like Write, but allows writing more than one buffer at a time.
+// The data in each buffer is written sequentially onto the flow.  Returns the
+// number of bytes written.  WriteMsg must return a non-nil error if it writes
+// less than the total number of bytes from all buffers.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
+	return f.writeMsg(false, parts...)
+}
+
+// WriteMsgAndClose performs WriteMsg and then closes the flow.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
+	return f.writeMsg(true, parts...)
+}
+
+// SetContext sets the context associated with the flow.  Typically this is
+// used to set state that is only available after the flow is connected, such
+// as a more restricted flow timeout, or the language of the request.
+//
+// The flow.Manager associated with ctx must be the same flow.Manager that the
+// flow was dialed or accepted from, otherwise an error is returned.
+// TODO(mattr): enforce this restriction.
+//
+// TODO(mattr): update v23/flow documentation.
+// SetContext may not be called concurrently with other methods.
+func (f *flw) SetContext(ctx *context.T) error {
+	f.ctx = ctx
+	return nil
+}
+
+// LocalBlessings returns the blessings presented by the local end of the flow
+// during authentication.
+func (f *flw) LocalBlessings() security.Blessings {
+	if f.dialed() {
+		return f.dialerBlessings
+	}
+	return f.conn.AcceptorBlessings()
+}
+
+// RemoteBlessings returns the blessings presented by the remote end of the
+// flow during authentication.
+func (f *flw) RemoteBlessings() security.Blessings {
+	if f.dialed() {
+		return f.conn.AcceptorBlessings()
+	}
+	return f.dialerBlessings
+}
+
+// LocalDischarges returns the discharges presented by the local end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) LocalDischarges() map[string]security.Discharge {
+	if f.dialed() {
+		return f.dialerDischarges
+	}
+	return f.conn.AcceptorDischarges()
+}
+
+// RemoteDischarges returns the discharges presented by the remote end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) RemoteDischarges() map[string]security.Discharge {
+	if f.dialed() {
+		return f.conn.AcceptorDischarges()
+	}
+	return f.dialerDischarges
+}
+
+// Conn returns the connection the flow is multiplexed on.
+func (f *flw) Conn() flow.Conn {
+	return f.conn
+}
+
+// Closed returns a channel that remains open until the flow has been closed or
+// the ctx to the Dial or Accept call used to create the flow has been cancelled.
+func (f *flw) Closed() <-chan struct{} {
+	return f.closed
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
new file mode 100644
index 0000000..c4f40eb
--- /dev/null
+++ b/runtime/internal/flow/conn/message.go
@@ -0,0 +1,333 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"errors"
+
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc/version"
+)
+
+// TODO(mattr): Link to protocol doc.
+
+type message interface {
+	write(ctx *context.T, p *messagePipe) error
+	read(ctx *context.T, data []byte) error
+}
+
+// message types.
+const (
+	invalidType = iota
+	controlType
+	dataType
+	unencryptedDataType
+)
+
+// control commands.
+const (
+	invalidCmd = iota
+	setupCmd
+	tearDownCmd
+	openFlowCmd
+	addRecieveBuffersCmd
+)
+
+// setup options.
+const (
+	invalidOption = iota
+	peerNaClPublicKeyOption
+	peerRemoteEndpointOption
+	peerLocalEndpointOption
+)
+
+// data flags.
+const (
+	closeFlag = 1 << iota
+	metadataFlag
+)
+
+// random consts.
+const (
+	maxVarUint64Size = 9
+)
+
+// setup is the first message over the wire.  It negotiates protocol version
+// and encryption options for connection.
+type setup struct {
+	versions           version.RPCVersionRange
+	PeerNaClPublicKey  *[32]byte
+	PeerRemoteEndpoint naming.Endpoint
+	PeerLocalEndpoint  naming.Endpoint
+}
+
+func (m *setup) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
+	p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
+	return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
+}
+func (m *setup) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
+	}
+	m.versions.Min = version.RPCVersion(v)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
+	}
+	m.versions.Max = version.RPCVersion(v)
+	return nil
+}
+
+// tearDown is sent over the wire before a connection is closed.
+type tearDown struct {
+	Err error
+}
+
+func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
+	return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Err.Error())})
+}
+func (m *tearDown) read(ctx *context.T, data []byte) error {
+	m.Err = errors.New(string(data))
+	return nil
+}
+
+// openFlow is sent at the beginning of every new flow.
+type openFlow struct {
+	id              flowID
+	initialCounters uint64
+}
+
+func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+	p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
+	return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
+}
+func (m *openFlow) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
+	}
+	m.id = flowID(v)
+	if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
+	}
+	return nil
+}
+
+// addRecieveBuffers is sent as flows are read from locally.  The counters
+// inform remote writers that there is local buffer space available.
+type addRecieveBuffers struct {
+	counters map[flowID]uint64
+}
+
+func (m *addRecieveBuffers) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = p.controlBuf[:0]
+	for fid, val := range m.counters {
+		p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
+		p.controlBuf = writeVarUint64(val, p.controlBuf)
+	}
+	return p.write([][]byte{{controlType}}, [][]byte{{addRecieveBuffersCmd}, p.controlBuf})
+}
+func (m *addRecieveBuffers) read(ctx *context.T, orig []byte) error {
+	var (
+		data     = orig
+		valid    bool
+		fid, val uint64
+		n        int64
+	)
+	m.counters = map[flowID]uint64{}
+	for len(data) > 0 {
+		if fid, data, valid = readVarUint64(ctx, data); !valid {
+			return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n)
+		}
+		if val, data, valid = readVarUint64(ctx, data); !valid {
+			return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n+1)
+		}
+		m.counters[flowID(fid)] = val
+		n += 2
+	}
+	return nil
+}
+
+// data carries encrypted data for a specific flow.
+type data struct {
+	id      flowID
+	flags   uint64
+	payload [][]byte
+}
+
+func (m *data) write(ctx *context.T, p *messagePipe) error {
+	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+	encrypted := append([][]byte{p.dataBuf}, m.payload...)
+	return p.write([][]byte{{dataType}}, encrypted)
+}
+func (m *data) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
+	}
+	m.id = flowID(v)
+	if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
+	}
+	m.payload = [][]byte{data}
+	return nil
+}
+
+// unencryptedData carries unencrypted data for a specific flow.
+type unencryptedData struct {
+	id      flowID
+	flags   uint64
+	payload [][]byte
+}
+
+func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
+	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+	// re-use the controlBuf for the data size.
+	size := uint64(0)
+	for _, b := range m.payload {
+		size += uint64(len(b))
+	}
+	p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
+	unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
+	return p.write(unencrypted, [][]byte{p.dataBuf})
+}
+func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
+	}
+	plen := int(v)
+	if plen > len(data) {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
+	}
+	m.payload, data = [][]byte{data[:plen]}, data[plen:]
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
+	}
+	m.id = flowID(v)
+	if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
+	}
+	return nil
+}
+
+type messagePipe struct {
+	rw         flow.MsgReadWriter
+	controlBuf []byte
+	dataBuf    []byte
+	outBuf     [][]byte
+}
+
+func newMessagePipe(rw flow.MsgReadWriter) *messagePipe {
+	return &messagePipe{
+		rw:         rw,
+		controlBuf: make([]byte, 256),
+		dataBuf:    make([]byte, 2*maxVarUint64Size),
+		outBuf:     make([][]byte, 5),
+	}
+}
+
+func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
+	p.outBuf = append(p.outBuf[:0], unencrypted...)
+	p.outBuf = append(p.outBuf, encrypted...)
+	_, err := p.rw.WriteMsg(p.outBuf...)
+	return err
+}
+
+func (p *messagePipe) writeMsg(ctx *context.T, m message) error {
+	return m.write(ctx, p)
+}
+
+func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
+	msg, err := p.rw.ReadMsg()
+	if len(msg) == 0 {
+		return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
+	}
+	if err != nil {
+		return nil, err
+	}
+	var m message
+	switch msg[0] {
+	case controlType:
+		if len(msg) == 1 {
+			return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
+		}
+		msg = msg[1:]
+		switch msg[0] {
+		case setupCmd:
+			m = &setup{}
+		case tearDownCmd:
+			m = &tearDown{}
+		case openFlowCmd:
+			m = &openFlow{}
+		case addRecieveBuffersCmd:
+			m = &addRecieveBuffers{}
+		default:
+			return nil, NewErrUnknownControlMsg(ctx, msg[0])
+		}
+	case dataType:
+		m = &data{}
+	case unencryptedDataType:
+		m = &unencryptedData{}
+	default:
+		return nil, NewErrUnknownMsg(ctx, msg[0])
+	}
+	return m, m.read(ctx, msg[1:])
+}
+
+func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
+	if len(data) == 0 {
+		return 0, data, false
+	}
+	l := data[0]
+	if l <= 0x7f {
+		return uint64(l), data[1:], true
+	}
+	l = 0xff - l + 1
+	if l > 8 || len(data)-1 < int(l) {
+		return 0, data, false
+	}
+	var out uint64
+	for i := 1; i < int(l+1); i++ {
+		out = out<<8 | uint64(data[i])
+	}
+	return out, data[l+1:], true
+}
+
+func writeVarUint64(u uint64, buf []byte) []byte {
+	if u <= 0x7f {
+		return append(buf, byte(u))
+	}
+	shift, l := 56, byte(7)
+	for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
+	}
+	buf = append(buf, 0xff-l)
+	for ; shift >= 0; shift -= 8 {
+		buf = append(buf, byte(u>>uint(shift))&0xff)
+	}
+	return buf
+}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
new file mode 100644
index 0000000..caf0d7e
--- /dev/null
+++ b/runtime/internal/flow/conn/message_test.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"errors"
+	"reflect"
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/rpc/version"
+	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test"
+)
+
+func init() {
+	test.Init()
+}
+
+func TestVarInt(t *testing.T) {
+	cases := []uint64{
+		0x00, 0x01,
+		0x7f, 0x80,
+		0xff, 0x100,
+		0xffff, 0x10000,
+		0xffffff, 0x1000000,
+		0xffffffff, 0x100000000,
+		0xffffffffff, 0x10000000000,
+		0xffffffffffff, 0x1000000000000,
+		0xffffffffffffff, 0x100000000000000,
+		0xffffffffffffffff,
+	}
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	for _, want := range cases {
+		got, b, valid := readVarUint64(ctx, writeVarUint64(want, []byte{}))
+		if !valid {
+			t.Fatalf("error reading %x", want)
+		}
+		if len(b) != 0 {
+			t.Errorf("unexpected buffer remaining for %x: %v", want, b)
+		}
+		if got != want {
+			t.Errorf("got: %d want: %d", got, want)
+		}
+	}
+}
+
+func testMessages(t *testing.T, cases []message) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	w, r := newMRWPair(ctx)
+	wp, rp := newMessagePipe(w), newMessagePipe(r)
+	for _, want := range cases {
+		ch := make(chan struct{})
+		go func() {
+			if err := wp.writeMsg(ctx, want); err != nil {
+				t.Errorf("unexpected error for %#v: %v", want, err)
+			}
+			close(ch)
+		}()
+		got, err := rp.readMsg(ctx)
+		if err != nil {
+			t.Errorf("unexpected error reading %#v: %v", want, err)
+		}
+		<-ch
+		if !reflect.DeepEqual(got, want) {
+			t.Errorf("got: %#v, want %#v", got, want)
+		}
+	}
+}
+
+func TestSetup(t *testing.T) {
+	testMessages(t, []message{
+		&setup{versions: version.RPCVersionRange{Min: 3, Max: 5}},
+	})
+}
+
+func TestTearDown(t *testing.T) {
+	testMessages(t, []message{
+		&tearDown{Err: errors.New("foobar")},
+	})
+}
+
+func TestOpenFlow(t *testing.T) {
+	testMessages(t, []message{
+		&openFlow{id: 23, initialCounters: 1 << 20},
+	})
+}
+
+func TestAddReceiveBuffers(t *testing.T) {
+	testMessages(t, []message{
+		&addRecieveBuffers{counters: map[flowID]uint64{}},
+		&addRecieveBuffers{counters: map[flowID]uint64{
+			4: 233,
+			9: 423242,
+		}},
+	})
+}
+
+func TestData(t *testing.T) {
+	testMessages(t, []message{
+		&data{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
+	})
+}
+
+func TestUnencryptedData(t *testing.T) {
+	testMessages(t, []message{
+		&unencryptedData{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
+	})
+}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
new file mode 100644
index 0000000..5123e3e
--- /dev/null
+++ b/runtime/internal/flow/conn/util_test.go
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/flow"
+)
+
+type mRW struct {
+	recieve <-chan []byte
+	send    chan<- []byte
+	ctx     *context.T
+}
+
+func newMRWPair(ctx *context.T) (flow.MsgReadWriter, flow.MsgReadWriter) {
+	ac, bc := make(chan []byte), make(chan []byte)
+	a := &mRW{recieve: ac, send: bc, ctx: ctx}
+	b := &mRW{recieve: bc, send: ac, ctx: ctx}
+	return a, b
+}
+
+func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
+	buf := []byte{}
+	for _, d := range data {
+		buf = append(buf, d...)
+	}
+	f.send <- buf
+	f.ctx.VI(5).Infof("Wrote: %v", buf)
+	return len(buf), nil
+}
+func (f *mRW) ReadMsg() (buf []byte, err error) {
+	buf = <-f.recieve
+	f.ctx.VI(5).Infof("Read: %v", buf)
+	return buf, nil
+}
+
+type fh chan<- flow.Flow
+
+func (fh fh) HandleFlow(f flow.Flow) error {
+	if fh == nil {
+		panic("writing to nil flow handler")
+	}
+	fh <- f
+	return nil
+}
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
 // TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
 error (
   LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+  UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+  ManagerClosed() {"en": "manager is already closed"}
+  AcceptFailed(err error) {"en": "accept failed{:err}"}
 )
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
 
 var (
 	ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+	ErrUnknownProtocol     = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+	ErrManagerClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+	ErrAcceptFailed        = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
 }
 
 // NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
 func NewErrLargerThan3ByteUInt(ctx *context.T) error {
 	return verror.New(ErrLargerThan3ByteUInt, ctx)
 }
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+	return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+	return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+	return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+	"net"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+
+	"v.io/x/ref/runtime/internal/flow/conn"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+	rid    naming.RoutingID
+	closed <-chan struct{}
+	q      *upcqueue.T
+
+	mu              *sync.Mutex
+	listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+	m := &manager{
+		rid:    rid,
+		closed: ctx.Done(),
+		mu:     &sync.Mutex{},
+		q:      upcqueue.New(),
+	}
+	return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+	netLn, err := listen(ctx, protocol, address)
+	if err != nil {
+		return flow.NewErrNetwork(ctx, err)
+	}
+	local := &inaming.Endpoint{
+		Protocol: protocol,
+		Address:  netLn.Addr().String(),
+		RID:      m.rid,
+	}
+	m.mu.Lock()
+	m.listenEndpoints = append(m.listenEndpoints, local)
+	m.mu.Unlock()
+	go m.netLnAcceptLoop(ctx, netLn, local)
+	return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+	const killConnectionsRetryDelay = 5 * time.Millisecond
+	for {
+		netConn, err := netLn.Accept()
+		for tokill := 1; isTemporaryError(err); tokill *= 2 {
+			if isTooManyOpenFiles(err) {
+				// TODO(suharshs): Find a way to kill connections here. We will need
+				// caching to be able to delete the connections.
+			} else {
+				tokill = 1
+			}
+			time.Sleep(killConnectionsRetryDelay)
+			netConn, err = netLn.Accept()
+		}
+		if err != nil {
+			ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+		}
+		// TODO(suharshs): This conn needs to be cached instead of ignored.
+		_, err = conn.NewAccepted(
+			ctx,
+			&framer{ReadWriter: netConn},
+			local,
+			v23.GetPrincipal(ctx).BlessingStore().Default(),
+			version.Supported,
+			&flowHandler{q: m.q, closed: m.closed},
+		)
+		if err != nil {
+			netConn.Close()
+			ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+		}
+	}
+}
+
+type flowHandler struct {
+	q      *upcqueue.T
+	closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+	select {
+	case <-h.closed:
+		// This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+		h.q.Close()
+	default:
+	}
+	return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+	m.mu.Lock()
+	ret := make([]naming.Endpoint, len(m.listenEndpoints))
+	copy(ret, m.listenEndpoints)
+	m.mu.Unlock()
+	return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+//   err := m.Listen(ctx, "tcp", ":0")
+//   for {
+//     flow, err := m.Accept(ctx)
+//     // process flow
+//   }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+	// TODO(suharshs): Ensure that m is attached to ctx.
+	item, err := m.q.Get(m.closed)
+	switch {
+	case err == upcqueue.ErrQueueIsClosed:
+		return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+	case err != nil:
+		return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+	default:
+		return item.(flow.Flow), nil
+	}
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+	// TODO(suharshs): Add caching of connections.
+	addr := remote.Addr()
+	d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+	netConn, err := dial(ctx, d, addr.Network(), addr.String())
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+
+	c, err := conn.NewDialed(
+		ctx,
+		&framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+		localEndpoint(netConn, m.rid),
+		remote,
+		version.Supported,
+		&flowHandler{q: m.q, closed: m.closed},
+		fn,
+	)
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+	return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+	return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+	if d != nil {
+		var timeout time.Duration
+		if dl, ok := ctx.Deadline(); ok {
+			timeout = dl.Sub(time.Now())
+		}
+		return d(protocol, address, timeout)
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+	if r != nil {
+		net, addr, err := r(protocol, address)
+		if err != nil {
+			return "", "", err
+		}
+		return net, addr, nil
+	}
+	return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+	if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+		ln, err := l(protocol, address)
+		if err != nil {
+			return nil, err
+		}
+		return ln, nil
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+	localAddr := conn.LocalAddr()
+	ep := &inaming.Endpoint{
+		Protocol: localAddr.Network(),
+		Address:  localAddr.String(),
+		RID:      rid,
+	}
+	return ep
+}
+
+func isTemporaryError(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+	"bufio"
+	"strings"
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/runtime/internal/flow/manager"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	p := testutil.NewPrincipal("test")
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rid := naming.FixedRoutingID(0x5555)
+	m := manager.New(ctx, rid)
+	want := "read this please"
+
+	if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+
+	bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+	eps := m.ListeningEndpoints()
+	if len(eps) == 0 {
+		t.Fatalf("no endpoints listened on")
+	}
+	flow, err := m.Dial(ctx, eps[0], bFn)
+	if err != nil {
+		t.Error(err)
+	}
+	writeLine(flow, want)
+
+	flow, err = m.Accept(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err := readLine(flow)
+	if err != nil {
+		t.Error(err)
+	}
+	if got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func readLine(f flow.Flow) (string, error) {
+	s, err := bufio.NewReader(f).ReadString('\n')
+	return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+	data += "\n"
+	_, err := f.Write([]byte(data))
+	return err
+}
diff --git a/runtime/internal/mojo_util.go b/runtime/internal/mojo_util.go
new file mode 100644
index 0000000..e434d68
--- /dev/null
+++ b/runtime/internal/mojo_util.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build mojo
+
+package internal
+
+import (
+	"flag"
+	"os"
+	"strings"
+
+	"v.io/x/ref/lib/flags"
+)
+
+// TODO(sadovsky): Terrible, terrible hack.
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+	// We expect that command-line flags have not been parsed. v23_util performs
+	// command-line parsing at this point. For Mojo, we instead parse command-line
+	// flags from the V23_MOJO_FLAGS env var.
+	// TODO(sadovsky): Maybe move this check to util.go, or drop it?
+	if flag.CommandLine.Parsed() {
+		panic("flag.CommandLine.Parse() has been called")
+	}
+	// TODO(sadovsky): Support argument quoting. More generally, parse this env
+	// var similar to how bash parses arguments.
+	return f.Parse(strings.Split(os.Getenv("V23_MOJO_FLAGS"), " "), config)
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
 // Min is incremented whenever we want to remove support for old protocol
 // versions.
 var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
 
 func init() {
 	metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))
diff --git a/runtime/internal/util.go b/runtime/internal/util.go
index 4df3db4..f5f6f6a 100644
--- a/runtime/internal/util.go
+++ b/runtime/internal/util.go
@@ -7,7 +7,6 @@
 import (
 	"fmt"
 	"net"
-	"os"
 	"strings"
 
 	"v.io/x/lib/netstate"
@@ -40,7 +39,7 @@
 	if handle != nil {
 		config = handle.Config.Dump()
 	}
-	return f.Parse(os.Args[1:], config)
+	return parseFlagsInternal(f, config)
 }
 
 // IPAddressChooser returns the preferred IP address, which is,
diff --git a/runtime/internal/v23_util.go b/runtime/internal/v23_util.go
new file mode 100644
index 0000000..5375851
--- /dev/null
+++ b/runtime/internal/v23_util.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !mojo
+
+package internal
+
+import (
+	"os"
+
+	"v.io/x/ref/lib/flags"
+)
+
+func parseFlagsInternal(f *flags.Flags, config map[string]string) error {
+	return f.Parse(os.Args[1:], config)
+}
diff --git a/services/device/deviced/internal/impl/globsuid/signature_match_test.go b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
index 8f7c3c5..8dbd380 100644
--- a/services/device/deviced/internal/impl/globsuid/signature_match_test.go
+++ b/services/device/deviced/internal/impl/globsuid/signature_match_test.go
@@ -28,9 +28,9 @@
 )
 
 func TestDownloadSignatureMatch(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	sh, deferFn := servicetest.CreateShellAndMountTable(t, ctx, nil)
 	defer deferFn()
@@ -39,7 +39,7 @@
 	pkgVON := naming.Join(binaryVON, "testpkg")
 	defer utiltest.StartRealBinaryRepository(t, ctx, binaryVON)()
 
-	up := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+	up := rg.RandomBytes(rg.RandomIntn(5 << 20))
 	mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 	sig, err := binarylib.Upload(ctx, naming.Join(binaryVON, "testbinary"), up, mediaInfo)
 	if err != nil {
@@ -52,7 +52,7 @@
 		t.Fatalf("ioutil.TempDir failed: %v", err)
 	}
 	defer os.RemoveAll(tmpdir)
-	pkgContents := testutil.RandomBytes(testutil.RandomIntn(5 << 20))
+	pkgContents := rg.RandomBytes(rg.RandomIntn(5 << 20))
 	if err := ioutil.WriteFile(filepath.Join(tmpdir, "pkg.txt"), pkgContents, 0600); err != nil {
 		t.Fatalf("ioutil.WriteFile failed: %v", err)
 	}
diff --git a/services/groups/internal/store/leveldb/store.go b/services/groups/internal/store/leveldb/store.go
index 16af2de..964883c 100644
--- a/services/groups/internal/store/leveldb/store.go
+++ b/services/groups/internal/store/leveldb/store.go
@@ -52,40 +52,40 @@
 }
 
 func (st *T) Insert(key string, value interface{}) error {
-	return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
-		if _, err := get(db, key); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+	return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+		if _, err := get(tx, key); verror.ErrorID(err) != store.ErrUnknownKey.ID {
 			if err != nil {
 				return err
 			}
 			return verror.New(store.ErrKeyExists, nil, key)
 		}
-		return put(db, key, &entry{Value: value})
+		return put(tx, key, &entry{Value: value})
 	})
 }
 
 func (st *T) Update(key string, value interface{}, version string) error {
-	return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
-		e, err := get(db, key)
+	return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+		e, err := get(tx, key)
 		if err != nil {
 			return err
 		}
 		if err := e.checkVersion(version); err != nil {
 			return err
 		}
-		return put(db, key, &entry{Value: value, Version: e.Version + 1})
+		return put(tx, key, &entry{Value: value, Version: e.Version + 1})
 	})
 }
 
 func (st *T) Delete(key string, version string) error {
-	return istore.RunInTransaction(st.db, func(db istore.StoreReadWriter) error {
-		e, err := get(db, key)
+	return istore.RunInTransaction(st.db, func(tx istore.Transaction) error {
+		e, err := get(tx, key)
 		if err != nil {
 			return err
 		}
 		if err := e.checkVersion(version); err != nil {
 			return err
 		}
-		return delete(db, key)
+		return delete(tx, key)
 	})
 }
 
@@ -93,8 +93,8 @@
 	return convertError(st.db.Close())
 }
 
-func get(db istore.StoreReadWriter, key string) (*entry, error) {
-	bytes, _ := db.Get([]byte(key), nil)
+func get(st istore.StoreReader, key string) (*entry, error) {
+	bytes, _ := st.Get([]byte(key), nil)
 	if bytes == nil {
 		return nil, verror.New(store.ErrUnknownKey, nil, key)
 	}
@@ -105,19 +105,19 @@
 	return e, nil
 }
 
-func put(db istore.StoreReadWriter, key string, e *entry) error {
+func put(stw istore.StoreWriter, key string, e *entry) error {
 	bytes, err := vom.Encode(e)
 	if err != nil {
 		return convertError(err)
 	}
-	if err := db.Put([]byte(key), bytes); err != nil {
+	if err := stw.Put([]byte(key), bytes); err != nil {
 		return convertError(err)
 	}
 	return nil
 }
 
-func delete(db istore.StoreReadWriter, key string) error {
-	if err := db.Delete([]byte(key)); err != nil {
+func delete(stw istore.StoreWriter, key string) error {
+	if err := stw.Delete([]byte(key)); err != nil {
 		return convertError(err)
 	}
 	return nil
diff --git a/services/identity/internal/rest_signer_test/main.go b/services/identity/internal/rest_signer_test/main.go
index 03995d6..15e55ff 100644
--- a/services/identity/internal/rest_signer_test/main.go
+++ b/services/identity/internal/rest_signer_test/main.go
@@ -4,7 +4,9 @@
 package main
 
 import (
+	"encoding/base64"
 	"fmt"
+	"math/big"
 
 	"v.io/x/ref/services/identity/internal/server"
 )
@@ -15,11 +17,19 @@
 		fmt.Printf("NewRestSigner error: %v\n", err)
 		return
 	}
+	der, err := signer.PublicKey().MarshalBinary()
+	if err != nil {
+		fmt.Printf("Failed to marshal public key: %v\n", err)
+		return
+	}
 	sig, err := signer.Sign([]byte("purpose"), []byte("message"))
 	if err != nil {
 		fmt.Printf("Sign error: %v\n", err)
 		return
 	}
 	ok := sig.Verify(signer.PublicKey(), []byte("message"))
+	fmt.Printf("PublicKey: %v\n", base64.URLEncoding.EncodeToString(der))
+	fmt.Printf("R: %v\n", big.NewInt(0).SetBytes(sig.R))
+	fmt.Printf("S: %v\n", big.NewInt(0).SetBytes(sig.S))
 	fmt.Printf("Verified: %v\n", ok)
 }
diff --git a/services/identity/internal/server/rest_signer_test.go b/services/identity/internal/server/rest_signer_test.go
index ef4cd11..20252a8 100644
--- a/services/identity/internal/server/rest_signer_test.go
+++ b/services/identity/internal/server/rest_signer_test.go
@@ -14,8 +14,9 @@
 )
 
 func TestDecode(t *testing.T) {
-	encodedKey := &signer.PublicKey{Base64: "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPqDbuT2B9Bb3JMcOGd2mm4bQuKSREeSKRt8_oofo0jRYiKFQ2ZVuCqssA-IUvFArT5KfXc6B9BNesgS10rPKrg=="}
-	encodedSig := &signer.VSignature{R: "0x42bca58e435f906c874536789cfc31656dd8f9ffbd3b7be84181611cc04eaf74", S: "0xa6f57e858a9f36b559e9cd9f13854b90fad49e0c5591ed66033fd286682b2078"}
+	// To generate encodedKey and encodedSig run the binary in v.io/x/ref/services/identity/internal/rest_signer_test
+	encodedKey := &signer.PublicKey{Base64: "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEx8xywmgl2_UmDUoJxrh2N9pAij7jg1kIqruKpnT6SNtcNubCG_PgdpWqiVLp3zBWlw1T3F2ecy4iGpi5N4Yj-A=="}
+	encodedSig := &signer.VSignature{R: "90128808689861327833210881969781001621382090117447023854233028840694123302875", S: "102696248968928040866906648206566376772954871370602978407028885005693672370943"}
 
 	key, err := server.DecodePublicKey(encodedKey)
 	if err != nil {
diff --git a/services/internal/binarylib/client_test.go b/services/internal/binarylib/client_test.go
index 000e8ca..7c83ca2 100644
--- a/services/internal/binarylib/client_test.go
+++ b/services/internal/binarylib/client_test.go
@@ -73,13 +73,13 @@
 // TestBufferAPI tests the binary repository client-side library
 // interface using buffers.
 func TestBufferAPI(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	von, cleanup := setupRepository(t, ctx)
 	defer cleanup()
-	data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+	data := rg.RandomBytes(rg.RandomIntn(10 << 20))
 	mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 	sig, err := Upload(ctx, von, data, mediaInfo)
 	if err != nil {
@@ -125,14 +125,14 @@
 // TestFileAPI tests the binary repository client-side library
 // interface using files.
 func TestFileAPI(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	von, cleanup := setupRepository(t, ctx)
 	defer cleanup()
 	// Create up to 10MB of random bytes.
-	data := testutil.RandomBytes(testutil.RandomIntn(10 << 20))
+	data := rg.RandomBytes(rg.RandomIntn(10 << 20))
 	dir, prefix := "", ""
 	src, err := ioutil.TempFile(dir, prefix)
 	if err != nil {
diff --git a/services/internal/binarylib/http_test.go b/services/internal/binarylib/http_test.go
index 73b253f..198cb75 100644
--- a/services/internal/binarylib/http_test.go
+++ b/services/internal/binarylib/http_test.go
@@ -21,9 +21,9 @@
 
 // TestHTTP checks that HTTP download works.
 func TestHTTP(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	// TODO(caprita): This is based on TestMultiPart (impl_test.go).  Share
 	// the code where possible.
@@ -34,8 +34,8 @@
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
 			// Random size, but at least 1 (avoid empty parts).
-			size := testutil.RandomIntn(1000*binarylib.BufferLength) + 1
-			data[i] = testutil.RandomBytes(size)
+			size := rg.RandomIntn(1000*binarylib.BufferLength) + 1
+			data[i] = rg.RandomBytes(size)
 		}
 		mediaInfo := repository.MediaInfo{Type: "application/octet-stream"}
 		if err := binary.Create(ctx, int32(length), mediaInfo); err != nil {
diff --git a/services/internal/binarylib/impl_test.go b/services/internal/binarylib/impl_test.go
index a614217..1ffd9a6 100644
--- a/services/internal/binarylib/impl_test.go
+++ b/services/internal/binarylib/impl_test.go
@@ -79,11 +79,12 @@
 func TestHierarchy(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for i := 0; i < md5.Size; i++ {
 		binary, ep, _, cleanup := startServer(t, ctx, i)
 		defer cleanup()
-		data := testData()
+		data := testData(rg)
 
 		// Test the binary repository interface.
 		if err := binary.Create(ctx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -131,6 +132,7 @@
 func TestMultiPart(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for length := 2; length < 5; length++ {
 		binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -138,7 +140,7 @@
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
-			data[i] = testData()
+			data[i] = testData(rg)
 		}
 		// Test the binary repository interface.
 		if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -181,9 +183,9 @@
 // resumption ranging the number of parts the uploaded binary consists
 // of.
 func TestResumption(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	for length := 2; length < 5; length++ {
 		binary, _, _, cleanup := startServer(t, ctx, 2)
@@ -191,7 +193,7 @@
 		// Create <length> chunks of up to 4MB of random bytes.
 		data := make([][]byte, length)
 		for i := 0; i < length; i++ {
-			data[i] = testData()
+			data[i] = testData(rg)
 		}
 		if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 			t.Fatalf("Create() failed: %v", err)
@@ -211,7 +213,7 @@
 				break
 			}
 			for i := 0; i < length; i++ {
-				fail := testutil.RandomIntn(2)
+				fail := rg.RandomIntn(2)
 				if parts[i] == binarylib.MissingPart && fail != 0 {
 					if streamErr, err := invokeUpload(t, ctx, binary, data[i], int32(i)); streamErr != nil || err != nil {
 						t.FailNow()
@@ -227,18 +229,18 @@
 
 // TestErrors checks that the binary interface correctly reports errors.
 func TestErrors(t *testing.T) {
-	testutil.InitRandGenerator(t.Logf)
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	binary, _, _, cleanup := startServer(t, ctx, 2)
 	defer cleanup()
 	const length = 2
 	data := make([][]byte, length)
 	for i := 0; i < length; i++ {
-		data[i] = testData()
+		data[i] = testData(rg)
 		for j := 0; j < len(data[i]); j++ {
-			data[i][j] = byte(testutil.RandomInt())
+			data[i][j] = byte(rg.RandomInt())
 		}
 	}
 	if err := binary.Create(ctx, int32(length), repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
@@ -295,10 +297,11 @@
 func TestGlob(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	_, ep, _, cleanup := startServer(t, ctx, 2)
 	defer cleanup()
-	data := testData()
+	data := testData(rg)
 
 	objects := []string{"foo", "bar", "hello world", "a/b/c"}
 	for _, obj := range objects {
diff --git a/services/internal/binarylib/perms_test.go b/services/internal/binarylib/perms_test.go
index b4b684a..0729b69 100644
--- a/services/internal/binarylib/perms_test.go
+++ b/services/internal/binarylib/perms_test.go
@@ -76,6 +76,7 @@
 func TestBinaryCreateAccessList(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	selfCtx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("self"))
 	if err != nil {
@@ -105,7 +106,7 @@
 	if err := b("bini/private").Create(childCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataPrivate := testData()
+	fakeDataPrivate := testData(rg)
 	if streamErr, err := invokeUpload(t, childCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -130,6 +131,7 @@
 func TestBinaryRootAccessList(t *testing.T) {
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
+	rg := testutil.NewRandGenerator(t.Logf)
 
 	selfPrincipal := testutil.NewPrincipal("self")
 	selfCtx, err := v23.WithPrincipal(ctx, selfPrincipal)
@@ -161,7 +163,7 @@
 	if err := b("bini/private").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataPrivate := testData()
+	fakeDataPrivate := testData(rg)
 	if streamErr, err := invokeUpload(t, selfCtx, b("bini/private"), fakeDataPrivate, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -169,7 +171,7 @@
 	if err := b("bini/shared").Create(selfCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataShared := testData()
+	fakeDataShared := testData(rg)
 	if streamErr, err := invokeUpload(t, selfCtx, b("bini/shared"), fakeDataShared, 0); streamErr != nil || err != nil {
 		t.Fatalf("invokeUpload() failed %v, %v", err, streamErr)
 	}
@@ -296,7 +298,7 @@
 	if err := b("bini/otherbinary").Create(otherCtx, 1, repository.MediaInfo{Type: "application/octet-stream"}); err != nil {
 		t.Fatalf("Create() failed %v", err)
 	}
-	fakeDataOther := testData()
+	fakeDataOther := testData(rg)
 	if streamErr, err := invokeUpload(t, otherCtx, b("bini/otherbinary"), fakeDataOther, 0); streamErr != nil || err != nil {
 		t.FailNow()
 	}
diff --git a/services/internal/binarylib/util_test.go b/services/internal/binarylib/util_test.go
index 30b414f..cd3d785 100644
--- a/services/internal/binarylib/util_test.go
+++ b/services/internal/binarylib/util_test.go
@@ -88,8 +88,8 @@
 }
 
 // testData creates up to 4MB of random bytes.
-func testData() []byte {
-	size := testutil.RandomIntn(1000 * binarylib.BufferLength)
-	data := testutil.RandomBytes(size)
+func testData(rg *testutil.Random) []byte {
+	size := rg.RandomIntn(1000 * binarylib.BufferLength)
+	data := rg.RandomBytes(size)
 	return data
 }
diff --git a/test/testutil/rand.go b/test/testutil/rand.go
index 3cb1526..8e21640 100644
--- a/test/testutil/rand.go
+++ b/test/testutil/rand.go
@@ -28,7 +28,6 @@
 // Random is a concurrent-access friendly source of randomness.
 type Random struct {
 	mu   sync.Mutex
-	seed int64
 	rand *rand.Rand
 }
 
@@ -72,9 +71,12 @@
 	return buffer
 }
 
-// Create a new pseudo-random number generator, the seed may be supplied
-// by V23_RNG_SEED to allow for reproducing a previous sequence.
-func NewRandGenerator() *Random {
+type loggingFunc func(format string, args ...interface{})
+
+// NewRandGenerator creates a new pseudo-random number generator; the seed may
+// be supplied by V23_RNG_SEED to allow for reproducing a previous sequence, and
+// is printed using the supplied logging function.
+func NewRandGenerator(logger loggingFunc) *Random {
 	seed := time.Now().UnixNano()
 	seedString := os.Getenv(SeedEnv)
 	if seedString != "" {
@@ -85,17 +87,35 @@
 			panic(fmt.Sprintf("ParseInt(%v, %v, %v) failed: %v", seedString, base, bitSize, err))
 		}
 	}
-	return &Random{seed: seed, rand: rand.New(rand.NewSource(seed))}
+	logger("Seeded pseudo-random number generator with %v", seed)
+	return &Random{rand: rand.New(rand.NewSource(seed))}
 }
 
+// TODO(caprita): Consider deprecating InitRandGenerator in favor of using
+// NewRandGenerator directly.  There are several drawbacks to using the global
+// singleton Random object:
+//
+//   - tests that do not call InitRandGenerator themselves could depend on
+//   InitRandGenerator having been called by other tests in the same package and
+//   stop working when run standalone with test --run
+//
+//   - conversely, a test case may call InitRandGenerator without actually
+//   needing to; it's hard to figure out if some library called by a test
+//   actually uses the Random object or not
+//
+//   - when several test cases share the same Random object, there is
+//   interference in the stream of random numbers generated for each test case
+//   if run in parallel
+//
+// All these issues can be trivially addressed if the Random object is created
+// and plumbed through the call stack explicitly.
+
 // InitRandGenerator creates an instance of Random in the public variable Rand
-// and returns a function intended to be defer'ed that prints out the
-// seed use when creating the number number generator using the supplied
-// logging function.
-func InitRandGenerator(loggingFunc func(format string, args ...interface{})) {
+// and prints out the seed use when creating the number number generator using
+// the supplied logging function.
+func InitRandGenerator(logger loggingFunc) {
 	once.Do(func() {
-		Rand = NewRandGenerator()
-		loggingFunc("Seeded pseudo-random number generator with %v", Rand.seed)
+		Rand = NewRandGenerator(logger)
 	})
 }