runtime/internal/flow/conn: An initial implementation of Conn.

This does not yet include:
1. Security of any kind
2. Cleanup: flows and the Conn cannot be closed
3. Proper error wrapping.  All public API functions should have
   errors wrapped in errors from the v23/flow package.
4. Extensive testing: many cases are not covered.

I'm sending an early commit to make testing work for higher levels
of the stack, and to keep the code reviews managably small.

Change-Id: Iccd5160b5103c43f05be458840bf974cc8c7a607
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index b5ea1e9..f31bd9a 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -35,12 +35,17 @@
 		return nil, nil, func() {}, err
 	}
 	ctx = context.WithValue(ctx, principalKey, p)
+	ctx = context.WithLogger(ctx, logger.Global())
 	return &Runtime{ns: tnaming.NewSimpleNamespace()}, ctx, func() {}, nil
 }
 
 func (r *Runtime) Init(ctx *context.T) error {
 	// nologcall
-	return logger.Manager(ctx).ConfigureFromFlags()
+	err := logger.Manager(ctx).ConfigureFromFlags()
+	if err != nil && !logger.IsAlreadyConfiguredError(err) {
+		return err
+	}
+	return nil
 }
 
 func (r *Runtime) WithPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 6f440e8..14457eb 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -5,17 +5,32 @@
 package conn
 
 import (
+	"sync"
+
+	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/flow/flowcontrol"
 )
 
 // flowID is a number assigned to identify a flow.
 // Each flow on a given conn will have a unique number.
 type flowID uint64
 
+const mtu = 1 << 16
+const defaultBufferSize = 1 << 20
+const reservedFlows = 10
+
+const (
+	expressPriority = iota
+	flowPriority
+	tearDownPriority
+)
+
 // FlowHandlers process accepted flows.
 type FlowHandler interface {
 	// HandleFlow processes an accepted flow.
@@ -24,6 +39,18 @@
 
 // Conns are a multiplexing encrypted channels that can host Flows.
 type Conn struct {
+	fc                *flowcontrol.FlowController
+	mp                *messagePipe
+	handler           FlowHandler
+	versions          version.RPCVersionRange
+	acceptorBlessings security.Blessings
+	dialerPublicKey   security.PublicKey
+	local, remote     naming.Endpoint
+	closed            chan struct{}
+
+	mu      sync.Mutex
+	nextFid flowID
+	flows   map[flowID]*flw
 }
 
 // Ensure that *Conn implements flow.Conn
@@ -33,47 +60,155 @@
 func NewDialed(
 	ctx *context.T,
 	conn flow.MsgReadWriter,
-	principal security.Principal,
 	local, remote naming.Endpoint,
 	versions version.RPCVersionRange,
 	handler FlowHandler,
 	fn flow.BlessingsForPeer) (*Conn, error) {
-	return nil, nil
+	principal := v23.GetPrincipal(ctx)
+	c := &Conn{
+		fc:              flowcontrol.New(defaultBufferSize, mtu),
+		mp:              newMessagePipe(conn),
+		handler:         handler,
+		versions:        versions,
+		dialerPublicKey: principal.PublicKey(),
+		local:           local,
+		remote:          remote,
+		nextFid:         reservedFlows,
+		flows:           map[flowID]*flw{},
+	}
+	go c.readLoop(ctx)
+	return c, nil
 }
 
 // NewAccepted accepts a new Conn on the given conn.
 func NewAccepted(
 	ctx *context.T,
 	conn flow.MsgReadWriter,
-	principal security.Principal,
 	local naming.Endpoint,
 	lBlessings security.Blessings,
 	versions version.RPCVersionRange,
-	handler FlowHandler,
-	skipDischarges bool) (*Conn, error) {
-	return nil, nil
+	handler FlowHandler) (*Conn, error) {
+	c := &Conn{
+		fc:                flowcontrol.New(defaultBufferSize, mtu),
+		mp:                newMessagePipe(conn),
+		handler:           handler,
+		versions:          versions,
+		acceptorBlessings: lBlessings,
+		local:             local,
+		nextFid:           reservedFlows + 1,
+		flows:             map[flowID]*flw{},
+	}
+	go c.readLoop(ctx)
+	return c, nil
 }
 
 // Dial dials a new flow on the Conn.
-func (c *Conn) Dial() (flow.Flow, error) { return nil, nil }
+func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+
+	id := c.nextFid
+	c.nextFid++
+
+	return c.newFlowLocked(ctx, id), nil
+}
 
 // Closed returns a channel that will be closed after the Conn is shutdown.
 // After this channel is closed it is guaranteed that all Dial calls will fail
 // with an error and no more flows will be sent to the FlowHandler.
-func (c *Conn) Closed() <-chan struct{} { return nil }
+func (c *Conn) Closed() <-chan struct{} { return c.closed }
 
 // LocalEndpoint returns the local vanadium Endpoint
-func (c *Conn) LocalEndpoint() naming.Endpoint { return nil }
+func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
 
 // RemoteEndpoint returns the remote vanadium Endpoint
-func (c *Conn) RemoteEndpoint() naming.Endpoint { return nil }
+func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
 
 // DialerPublicKey returns the public key presented by the dialer during authentication.
-func (c *Conn) DialerPublicKey() security.PublicKey { return nil }
+func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
 
 // AcceptorBlessings returns the blessings presented by the acceptor during authentication.
-func (c *Conn) AcceptorBlessings() security.Blessings { return security.Blessings{} }
+func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
 
 // AcceptorDischarges returns the discharges presented by the acceptor during authentication.
 // Discharges are organized in a map keyed by the discharge-identifier.
 func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
+
+func (c *Conn) readLoop(ctx *context.T) {
+	for {
+		x, err := c.mp.readMsg(ctx)
+		if err != nil {
+			ctx.Errorf("Error reading from connection to %s: %v", c.remote, err)
+			// TODO(mattr): tear down the conn.
+		}
+
+		switch msg := x.(type) {
+		case *tearDown:
+			// TODO(mattr): tear down the conn.
+
+		case *openFlow:
+			c.mu.Lock()
+			f := c.newFlowLocked(ctx, msg.id)
+			c.mu.Unlock()
+
+			c.handler.HandleFlow(f)
+			err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+				err := c.mp.writeMsg(ctx, &addRecieveBuffers{
+					counters: map[flowID]uint64{msg.id: defaultBufferSize},
+				})
+				return 0, true, err
+			})
+			if err != nil {
+				// TODO(mattr): Maybe in this case we should close the conn.
+				ctx.Errorf("Error sending counters on connection to %s: %v", c.remote, err)
+			}
+
+		case *addRecieveBuffers:
+			release := make([]flowcontrol.Release, 0, len(msg.counters))
+			c.mu.Lock()
+			for fid, val := range msg.counters {
+				if f := c.flows[fid]; f != nil {
+					release = append(release, flowcontrol.Release{
+						Worker: f.worker,
+						Tokens: int(val),
+					})
+				}
+			}
+			c.mu.Unlock()
+			if err := c.fc.Release(release); err != nil {
+				ctx.Errorf("Error releasing counters from connection to %s: %v", c.remote, err)
+			}
+
+		case *data:
+			c.mu.Lock()
+			f := c.flows[msg.id]
+			c.mu.Unlock()
+			if f == nil {
+				ctx.Errorf("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
+				continue
+			}
+			if err := f.q.Put(msg.payload); err != nil {
+				ctx.Errorf("Ignoring data message for closed flow on connection to %s: %d", c.remote, msg.id)
+			}
+			// TODO(mattr): perhaps close the flow.
+			// TODO(mattr): check if the q is full.
+
+		case *unencryptedData:
+			c.mu.Lock()
+			f := c.flows[msg.id]
+			c.mu.Unlock()
+			if f == nil {
+				ctx.Errorf("Ignoring data message for unknown flow: %d", msg.id)
+				continue
+			}
+			if err := f.q.Put(msg.payload); err != nil {
+				ctx.Errorf("Ignoring data message for closed flow: %d", msg.id)
+			}
+			// TODO(mattr): perhaps close the flow.
+			// TODO(mattr): check if the q is full.
+
+		default:
+			// TODO(mattr): tearDown the conn.
+		}
+	}
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
new file mode 100644
index 0000000..4fb158b
--- /dev/null
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/rpc/version"
+	"v.io/v23/security"
+	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test"
+)
+
+func init() {
+	test.Init()
+}
+
+func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn) {
+	dmrw, amrw := newMRWPair(ctx)
+	versions := version.RPCVersionRange{Min: 3, Max: 5}
+	d, err := NewDialed(ctx, dmrw, nil, nil, versions, fh(dflows), nil)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	a, err := NewAccepted(ctx, amrw, nil, security.Blessings{}, versions, fh(aflows))
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	return d, a
+}
+
+func testWrite(t *testing.T, ctx *context.T, dialer *Conn, flows <-chan flow.Flow) {
+	df, err := dialer.Dial(ctx)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	want := "hello world"
+	df.WriteMsgAndClose([]byte(want[:5]), []byte(want[5:]))
+	af := <-flows
+	msg, err := af.ReadMsg()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if got := string(msg); got != want {
+		t.Errorf("Got: %s want %s", got, want)
+	}
+}
+
+func TestDailerDialsFlow(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	aflows := make(chan flow.Flow, 1)
+	d, _ := setupConns(t, ctx, nil, aflows)
+	testWrite(t, ctx, d, aflows)
+}
+
+func TestAcceptorDialsFlow(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	dflows := make(chan flow.Flow, 1)
+	_, a := setupConns(t, ctx, dflows, nil)
+	testWrite(t, ctx, a, dflows)
+}
+
+// TODO(mattr): List of tests to write
+// 1. multiple writes
+// 2. interleave writemsg and write
+// 3. interleave read and readmsg
+// 4. multiple reads
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
new file mode 100644
index 0000000..6ea0246
--- /dev/null
+++ b/runtime/internal/flow/conn/flow.go
@@ -0,0 +1,227 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/flow/flowcontrol"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type flw struct {
+	id               flowID
+	ctx              *context.T
+	conn             *Conn
+	closed           chan struct{}
+	worker           *flowcontrol.Worker
+	opened           bool
+	q                *upcqueue.T
+	readBufs         [][]byte
+	dialerBlessings  security.Blessings
+	dialerDischarges map[string]security.Discharge
+}
+
+var _ flow.Flow = &flw{}
+
+func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
+	f := &flw{
+		id:     id,
+		ctx:    ctx,
+		conn:   c,
+		closed: make(chan struct{}),
+		worker: c.fc.NewWorker(flowPriority),
+		q:      upcqueue.New(),
+	}
+	c.flows[id] = f
+	return f
+}
+
+func (f *flw) dialed() bool {
+	return f.id%2 == 0
+}
+
+// Implement io.Reader.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) Read(p []byte) (n int, err error) {
+	for {
+		for len(f.readBufs) > 0 && len(f.readBufs[0]) == 0 {
+			f.readBufs = f.readBufs[1:]
+		}
+		if len(f.readBufs) > 0 {
+			break
+		}
+		var msg interface{}
+		msg, err = f.q.Get(f.ctx.Done())
+		f.readBufs = msg.([][]byte)
+	}
+	n = copy(p, f.readBufs[0])
+	f.readBufs[0] = f.readBufs[0][n:]
+	return
+}
+
+// ReadMsg is like read, but it reads bytes in chunks.  Depending on the
+// implementation the batch boundaries might or might not be significant.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) ReadMsg() (buf []byte, err error) {
+	for {
+		for len(f.readBufs) > 0 {
+			buf, f.readBufs = f.readBufs[0], f.readBufs[1:]
+			if len(buf) > 0 {
+				return buf, nil
+			}
+		}
+		bufs, err := f.q.Get(f.ctx.Done())
+		if err != nil {
+			return nil, err
+		}
+		f.readBufs = bufs.([][]byte)
+	}
+}
+
+// Implement io.Writer.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) Write(p []byte) (n int, err error) {
+	return f.WriteMsg(p)
+}
+
+func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+	sent := 0
+	var left []byte
+
+	f.ctx.VI(3).Infof("trying to write: %d.", f.id)
+	err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
+		f.ctx.VI(3).Infof("writing: %d.", f.id)
+		if !f.opened {
+			// TODO(mattr): we should be able to send multiple messages
+			// in a single writeMsg call.
+			err := f.conn.mp.writeMsg(f.ctx, &openFlow{
+				id:              f.id,
+				initialCounters: defaultBufferSize,
+			})
+			if err != nil {
+				return 0, false, err
+			}
+			f.opened = true
+		}
+		size := 0
+		var bufs [][]byte
+		if len(left) > 0 {
+			size += len(left)
+			bufs = append(bufs, left)
+		}
+		for size <= tokens && len(parts) > 0 {
+			bufs = append(bufs, parts[0])
+			size += len(parts[0])
+			parts = parts[1:]
+		}
+		if size > tokens {
+			lidx := len(bufs) - 1
+			last := bufs[lidx]
+			take := len(last) - (size - tokens)
+			bufs[lidx] = last[:take]
+			left = last[take:]
+		}
+		d := &data{
+			id:      f.id,
+			payload: bufs,
+		}
+		done := len(left) == 0 && len(parts) == 0
+		if alsoClose && done {
+			d.flags |= closeFlag
+		}
+		sent += size
+		return size, done, f.conn.mp.writeMsg(f.ctx, d)
+	})
+	return sent, err
+}
+
+// WriteMsg is like Write, but allows writing more than one buffer at a time.
+// The data in each buffer is written sequentially onto the flow.  Returns the
+// number of bytes written.  WriteMsg must return a non-nil error if it writes
+// less than the total number of bytes from all buffers.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
+	return f.writeMsg(false, parts...)
+}
+
+// WriteMsgAndClose performs WriteMsg and then closes the flow.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
+	return f.writeMsg(true, parts...)
+}
+
+// SetContext sets the context associated with the flow.  Typically this is
+// used to set state that is only available after the flow is connected, such
+// as a more restricted flow timeout, or the language of the request.
+//
+// The flow.Manager associated with ctx must be the same flow.Manager that the
+// flow was dialed or accepted from, otherwise an error is returned.
+// TODO(mattr): enforce this restriction.
+//
+// TODO(mattr): update v23/flow documentation.
+// SetContext may not be called concurrently with other methods.
+func (f *flw) SetContext(ctx *context.T) error {
+	f.ctx = ctx
+	return nil
+}
+
+// LocalBlessings returns the blessings presented by the local end of the flow
+// during authentication.
+func (f *flw) LocalBlessings() security.Blessings {
+	if f.dialed() {
+		return f.dialerBlessings
+	}
+	return f.conn.AcceptorBlessings()
+}
+
+// RemoteBlessings returns the blessings presented by the remote end of the
+// flow during authentication.
+func (f *flw) RemoteBlessings() security.Blessings {
+	if f.dialed() {
+		return f.conn.AcceptorBlessings()
+	}
+	return f.dialerBlessings
+}
+
+// LocalDischarges returns the discharges presented by the local end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) LocalDischarges() map[string]security.Discharge {
+	if f.dialed() {
+		return f.dialerDischarges
+	}
+	return f.conn.AcceptorDischarges()
+}
+
+// RemoteDischarges returns the discharges presented by the remote end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) RemoteDischarges() map[string]security.Discharge {
+	if f.dialed() {
+		return f.conn.AcceptorDischarges()
+	}
+	return f.dialerDischarges
+}
+
+// Conn returns the connection the flow is multiplexed on.
+func (f *flw) Conn() flow.Conn {
+	return f.conn
+}
+
+// Closed returns a channel that remains open until the flow has been closed or
+// the ctx to the Dial or Accept call used to create the flow has been cancelled.
+func (f *flw) Closed() <-chan struct{} {
+	return f.closed
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 7e9f335..c4f40eb 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -166,13 +166,14 @@
 type data struct {
 	id      flowID
 	flags   uint64
-	payload []byte
+	payload [][]byte
 }
 
 func (m *data) write(ctx *context.T, p *messagePipe) error {
 	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
 	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
-	return p.write([][]byte{{dataType}}, [][]byte{p.dataBuf, m.payload})
+	encrypted := append([][]byte{p.dataBuf}, m.payload...)
+	return p.write([][]byte{{dataType}}, encrypted)
 }
 func (m *data) read(ctx *context.T, orig []byte) error {
 	var (
@@ -187,7 +188,7 @@
 	if m.flags, data, valid = readVarUint64(ctx, data); !valid {
 		return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
 	}
-	m.payload = data
+	m.payload = [][]byte{data}
 	return nil
 }
 
@@ -195,17 +196,20 @@
 type unencryptedData struct {
 	id      flowID
 	flags   uint64
-	payload []byte
+	payload [][]byte
 }
 
 func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
 	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
 	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
 	// re-use the controlBuf for the data size.
-	p.controlBuf = writeVarUint64(uint64(len(m.payload)), p.controlBuf[:0])
-	return p.write(
-		[][]byte{{unencryptedDataType}, p.controlBuf, m.payload},
-		[][]byte{p.dataBuf})
+	size := uint64(0)
+	for _, b := range m.payload {
+		size += uint64(len(b))
+	}
+	p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
+	unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
+	return p.write(unencrypted, [][]byte{p.dataBuf})
 }
 func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
 	var (
@@ -220,7 +224,7 @@
 	if plen > len(data) {
 		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
 	}
-	m.payload, data = data[:plen], data[plen:]
+	m.payload, data = [][]byte{data[:plen]}, data[plen:]
 	if v, data, valid = readVarUint64(ctx, data); !valid {
 		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
 	}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index 782a5a2..caf0d7e 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -9,10 +9,17 @@
 	"reflect"
 	"testing"
 
+	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/rpc/version"
+	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test"
 )
 
+func init() {
+	test.Init()
+}
+
 func TestVarInt(t *testing.T) {
 	cases := []uint64{
 		0x00, 0x01,
@@ -42,39 +49,24 @@
 	}
 }
 
-type fakeMsgReadWriter struct {
-	bufs [][]byte
-	t    *testing.T
-}
-
-func (f *fakeMsgReadWriter) WriteMsg(data ...[]byte) (int, error) {
-	buf := []byte{}
-	for _, d := range data {
-		buf = append(buf, d...)
-	}
-	f.bufs = append(f.bufs, buf)
-	return len(buf), nil
-}
-
-func (f *fakeMsgReadWriter) ReadMsg() (buf []byte, err error) {
-	// TODO(mattr): block if empty.
-	buf, f.bufs = f.bufs[0], f.bufs[1:]
-	f.t.Logf("reading: %v", buf)
-	return buf, nil
-}
-
 func testMessages(t *testing.T, cases []message) {
-	ctx, cancel := context.RootContext()
-	defer cancel()
-	p := newMessagePipe(&fakeMsgReadWriter{t: t})
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	w, r := newMRWPair(ctx)
+	wp, rp := newMessagePipe(w), newMessagePipe(r)
 	for _, want := range cases {
-		if err := p.writeMsg(ctx, want); err != nil {
-			t.Errorf("unexpected error for %#v: %v", want, err)
-		}
-		got, err := p.readMsg(ctx)
+		ch := make(chan struct{})
+		go func() {
+			if err := wp.writeMsg(ctx, want); err != nil {
+				t.Errorf("unexpected error for %#v: %v", want, err)
+			}
+			close(ch)
+		}()
+		got, err := rp.readMsg(ctx)
 		if err != nil {
 			t.Errorf("unexpected error reading %#v: %v", want, err)
 		}
+		<-ch
 		if !reflect.DeepEqual(got, want) {
 			t.Errorf("got: %#v, want %#v", got, want)
 		}
@@ -111,12 +103,12 @@
 
 func TestData(t *testing.T) {
 	testMessages(t, []message{
-		&data{id: 1123, flags: 232, payload: []byte("fake payload")},
+		&data{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
 	})
 }
 
 func TestUnencryptedData(t *testing.T) {
 	testMessages(t, []message{
-		&unencryptedData{id: 1123, flags: 232, payload: []byte("fake payload")},
+		&unencryptedData{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
 	})
 }
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
new file mode 100644
index 0000000..5123e3e
--- /dev/null
+++ b/runtime/internal/flow/conn/util_test.go
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/flow"
+)
+
+type mRW struct {
+	recieve <-chan []byte
+	send    chan<- []byte
+	ctx     *context.T
+}
+
+func newMRWPair(ctx *context.T) (flow.MsgReadWriter, flow.MsgReadWriter) {
+	ac, bc := make(chan []byte), make(chan []byte)
+	a := &mRW{recieve: ac, send: bc, ctx: ctx}
+	b := &mRW{recieve: bc, send: ac, ctx: ctx}
+	return a, b
+}
+
+func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
+	buf := []byte{}
+	for _, d := range data {
+		buf = append(buf, d...)
+	}
+	f.send <- buf
+	f.ctx.VI(5).Infof("Wrote: %v", buf)
+	return len(buf), nil
+}
+func (f *mRW) ReadMsg() (buf []byte, err error) {
+	buf = <-f.recieve
+	f.ctx.VI(5).Infof("Read: %v", buf)
+	return buf, nil
+}
+
+type fh chan<- flow.Flow
+
+func (fh fh) HandleFlow(f flow.Flow) error {
+	if fh == nil {
+		panic("writing to nil flow handler")
+	}
+	fh <- f
+	return nil
+}