runtime/internal/flow/conn: An initial implementation of Conn.
This does not yet include:
1. Security of any kind
2. Cleanup: flows and the Conn cannot be closed
3. Proper error wrapping. All public API functions should have
errors wrapped in errors from the v23/flow package.
4. Extensive testing: many cases are not covered.
I'm sending an early commit to make testing work for higher levels
of the stack, and to keep the code reviews managably small.
Change-Id: Iccd5160b5103c43f05be458840bf974cc8c7a607
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index b5ea1e9..f31bd9a 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -35,12 +35,17 @@
return nil, nil, func() {}, err
}
ctx = context.WithValue(ctx, principalKey, p)
+ ctx = context.WithLogger(ctx, logger.Global())
return &Runtime{ns: tnaming.NewSimpleNamespace()}, ctx, func() {}, nil
}
func (r *Runtime) Init(ctx *context.T) error {
// nologcall
- return logger.Manager(ctx).ConfigureFromFlags()
+ err := logger.Manager(ctx).ConfigureFromFlags()
+ if err != nil && !logger.IsAlreadyConfiguredError(err) {
+ return err
+ }
+ return nil
}
func (r *Runtime) WithPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 6f440e8..14457eb 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -5,17 +5,32 @@
package conn
import (
+ "sync"
+
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/flow/flowcontrol"
)
// flowID is a number assigned to identify a flow.
// Each flow on a given conn will have a unique number.
type flowID uint64
+const mtu = 1 << 16
+const defaultBufferSize = 1 << 20
+const reservedFlows = 10
+
+const (
+ expressPriority = iota
+ flowPriority
+ tearDownPriority
+)
+
// FlowHandlers process accepted flows.
type FlowHandler interface {
// HandleFlow processes an accepted flow.
@@ -24,6 +39,18 @@
// Conns are a multiplexing encrypted channels that can host Flows.
type Conn struct {
+ fc *flowcontrol.FlowController
+ mp *messagePipe
+ handler FlowHandler
+ versions version.RPCVersionRange
+ acceptorBlessings security.Blessings
+ dialerPublicKey security.PublicKey
+ local, remote naming.Endpoint
+ closed chan struct{}
+
+ mu sync.Mutex
+ nextFid flowID
+ flows map[flowID]*flw
}
// Ensure that *Conn implements flow.Conn
@@ -33,47 +60,155 @@
func NewDialed(
ctx *context.T,
conn flow.MsgReadWriter,
- principal security.Principal,
local, remote naming.Endpoint,
versions version.RPCVersionRange,
handler FlowHandler,
fn flow.BlessingsForPeer) (*Conn, error) {
- return nil, nil
+ principal := v23.GetPrincipal(ctx)
+ c := &Conn{
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ versions: versions,
+ dialerPublicKey: principal.PublicKey(),
+ local: local,
+ remote: remote,
+ nextFid: reservedFlows,
+ flows: map[flowID]*flw{},
+ }
+ go c.readLoop(ctx)
+ return c, nil
}
// NewAccepted accepts a new Conn on the given conn.
func NewAccepted(
ctx *context.T,
conn flow.MsgReadWriter,
- principal security.Principal,
local naming.Endpoint,
lBlessings security.Blessings,
versions version.RPCVersionRange,
- handler FlowHandler,
- skipDischarges bool) (*Conn, error) {
- return nil, nil
+ handler FlowHandler) (*Conn, error) {
+ c := &Conn{
+ fc: flowcontrol.New(defaultBufferSize, mtu),
+ mp: newMessagePipe(conn),
+ handler: handler,
+ versions: versions,
+ acceptorBlessings: lBlessings,
+ local: local,
+ nextFid: reservedFlows + 1,
+ flows: map[flowID]*flw{},
+ }
+ go c.readLoop(ctx)
+ return c, nil
}
// Dial dials a new flow on the Conn.
-func (c *Conn) Dial() (flow.Flow, error) { return nil, nil }
+func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+
+ id := c.nextFid
+ c.nextFid++
+
+ return c.newFlowLocked(ctx, id), nil
+}
// Closed returns a channel that will be closed after the Conn is shutdown.
// After this channel is closed it is guaranteed that all Dial calls will fail
// with an error and no more flows will be sent to the FlowHandler.
-func (c *Conn) Closed() <-chan struct{} { return nil }
+func (c *Conn) Closed() <-chan struct{} { return c.closed }
// LocalEndpoint returns the local vanadium Endpoint
-func (c *Conn) LocalEndpoint() naming.Endpoint { return nil }
+func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
// RemoteEndpoint returns the remote vanadium Endpoint
-func (c *Conn) RemoteEndpoint() naming.Endpoint { return nil }
+func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
// DialerPublicKey returns the public key presented by the dialer during authentication.
-func (c *Conn) DialerPublicKey() security.PublicKey { return nil }
+func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
// AcceptorBlessings returns the blessings presented by the acceptor during authentication.
-func (c *Conn) AcceptorBlessings() security.Blessings { return security.Blessings{} }
+func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
// AcceptorDischarges returns the discharges presented by the acceptor during authentication.
// Discharges are organized in a map keyed by the discharge-identifier.
func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
+
+func (c *Conn) readLoop(ctx *context.T) {
+ for {
+ x, err := c.mp.readMsg(ctx)
+ if err != nil {
+ ctx.Errorf("Error reading from connection to %s: %v", c.remote, err)
+ // TODO(mattr): tear down the conn.
+ }
+
+ switch msg := x.(type) {
+ case *tearDown:
+ // TODO(mattr): tear down the conn.
+
+ case *openFlow:
+ c.mu.Lock()
+ f := c.newFlowLocked(ctx, msg.id)
+ c.mu.Unlock()
+
+ c.handler.HandleFlow(f)
+ err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ err := c.mp.writeMsg(ctx, &addRecieveBuffers{
+ counters: map[flowID]uint64{msg.id: defaultBufferSize},
+ })
+ return 0, true, err
+ })
+ if err != nil {
+ // TODO(mattr): Maybe in this case we should close the conn.
+ ctx.Errorf("Error sending counters on connection to %s: %v", c.remote, err)
+ }
+
+ case *addRecieveBuffers:
+ release := make([]flowcontrol.Release, 0, len(msg.counters))
+ c.mu.Lock()
+ for fid, val := range msg.counters {
+ if f := c.flows[fid]; f != nil {
+ release = append(release, flowcontrol.Release{
+ Worker: f.worker,
+ Tokens: int(val),
+ })
+ }
+ }
+ c.mu.Unlock()
+ if err := c.fc.Release(release); err != nil {
+ ctx.Errorf("Error releasing counters from connection to %s: %v", c.remote, err)
+ }
+
+ case *data:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Errorf("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
+ continue
+ }
+ if err := f.q.Put(msg.payload); err != nil {
+ ctx.Errorf("Ignoring data message for closed flow on connection to %s: %d", c.remote, msg.id)
+ }
+ // TODO(mattr): perhaps close the flow.
+ // TODO(mattr): check if the q is full.
+
+ case *unencryptedData:
+ c.mu.Lock()
+ f := c.flows[msg.id]
+ c.mu.Unlock()
+ if f == nil {
+ ctx.Errorf("Ignoring data message for unknown flow: %d", msg.id)
+ continue
+ }
+ if err := f.q.Put(msg.payload); err != nil {
+ ctx.Errorf("Ignoring data message for closed flow: %d", msg.id)
+ }
+ // TODO(mattr): perhaps close the flow.
+ // TODO(mattr): check if the q is full.
+
+ default:
+ // TODO(mattr): tearDown the conn.
+ }
+ }
+}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
new file mode 100644
index 0000000..4fb158b
--- /dev/null
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/rpc/version"
+ "v.io/v23/security"
+ _ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test"
+)
+
+func init() {
+ test.Init()
+}
+
+func setupConns(t *testing.T, ctx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn) {
+ dmrw, amrw := newMRWPair(ctx)
+ versions := version.RPCVersionRange{Min: 3, Max: 5}
+ d, err := NewDialed(ctx, dmrw, nil, nil, versions, fh(dflows), nil)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ a, err := NewAccepted(ctx, amrw, nil, security.Blessings{}, versions, fh(aflows))
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ return d, a
+}
+
+func testWrite(t *testing.T, ctx *context.T, dialer *Conn, flows <-chan flow.Flow) {
+ df, err := dialer.Dial(ctx)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ want := "hello world"
+ df.WriteMsgAndClose([]byte(want[:5]), []byte(want[5:]))
+ af := <-flows
+ msg, err := af.ReadMsg()
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ if got := string(msg); got != want {
+ t.Errorf("Got: %s want %s", got, want)
+ }
+}
+
+func TestDailerDialsFlow(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ aflows := make(chan flow.Flow, 1)
+ d, _ := setupConns(t, ctx, nil, aflows)
+ testWrite(t, ctx, d, aflows)
+}
+
+func TestAcceptorDialsFlow(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ dflows := make(chan flow.Flow, 1)
+ _, a := setupConns(t, ctx, dflows, nil)
+ testWrite(t, ctx, a, dflows)
+}
+
+// TODO(mattr): List of tests to write
+// 1. multiple writes
+// 2. interleave writemsg and write
+// 3. interleave read and readmsg
+// 4. multiple reads
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
new file mode 100644
index 0000000..6ea0246
--- /dev/null
+++ b/runtime/internal/flow/conn/flow.go
@@ -0,0 +1,227 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/security"
+
+ "v.io/x/ref/runtime/internal/flow/flowcontrol"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type flw struct {
+ id flowID
+ ctx *context.T
+ conn *Conn
+ closed chan struct{}
+ worker *flowcontrol.Worker
+ opened bool
+ q *upcqueue.T
+ readBufs [][]byte
+ dialerBlessings security.Blessings
+ dialerDischarges map[string]security.Discharge
+}
+
+var _ flow.Flow = &flw{}
+
+func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
+ f := &flw{
+ id: id,
+ ctx: ctx,
+ conn: c,
+ closed: make(chan struct{}),
+ worker: c.fc.NewWorker(flowPriority),
+ q: upcqueue.New(),
+ }
+ c.flows[id] = f
+ return f
+}
+
+func (f *flw) dialed() bool {
+ return f.id%2 == 0
+}
+
+// Implement io.Reader.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) Read(p []byte) (n int, err error) {
+ for {
+ for len(f.readBufs) > 0 && len(f.readBufs[0]) == 0 {
+ f.readBufs = f.readBufs[1:]
+ }
+ if len(f.readBufs) > 0 {
+ break
+ }
+ var msg interface{}
+ msg, err = f.q.Get(f.ctx.Done())
+ f.readBufs = msg.([][]byte)
+ }
+ n = copy(p, f.readBufs[0])
+ f.readBufs[0] = f.readBufs[0][n:]
+ return
+}
+
+// ReadMsg is like read, but it reads bytes in chunks. Depending on the
+// implementation the batch boundaries might or might not be significant.
+// Read and ReadMsg should not be called concurrently with themselves
+// or each other.
+func (f *flw) ReadMsg() (buf []byte, err error) {
+ for {
+ for len(f.readBufs) > 0 {
+ buf, f.readBufs = f.readBufs[0], f.readBufs[1:]
+ if len(buf) > 0 {
+ return buf, nil
+ }
+ }
+ bufs, err := f.q.Get(f.ctx.Done())
+ if err != nil {
+ return nil, err
+ }
+ f.readBufs = bufs.([][]byte)
+ }
+}
+
+// Implement io.Writer.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) Write(p []byte) (n int, err error) {
+ return f.WriteMsg(p)
+}
+
+func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
+ sent := 0
+ var left []byte
+
+ f.ctx.VI(3).Infof("trying to write: %d.", f.id)
+ err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
+ f.ctx.VI(3).Infof("writing: %d.", f.id)
+ if !f.opened {
+ // TODO(mattr): we should be able to send multiple messages
+ // in a single writeMsg call.
+ err := f.conn.mp.writeMsg(f.ctx, &openFlow{
+ id: f.id,
+ initialCounters: defaultBufferSize,
+ })
+ if err != nil {
+ return 0, false, err
+ }
+ f.opened = true
+ }
+ size := 0
+ var bufs [][]byte
+ if len(left) > 0 {
+ size += len(left)
+ bufs = append(bufs, left)
+ }
+ for size <= tokens && len(parts) > 0 {
+ bufs = append(bufs, parts[0])
+ size += len(parts[0])
+ parts = parts[1:]
+ }
+ if size > tokens {
+ lidx := len(bufs) - 1
+ last := bufs[lidx]
+ take := len(last) - (size - tokens)
+ bufs[lidx] = last[:take]
+ left = last[take:]
+ }
+ d := &data{
+ id: f.id,
+ payload: bufs,
+ }
+ done := len(left) == 0 && len(parts) == 0
+ if alsoClose && done {
+ d.flags |= closeFlag
+ }
+ sent += size
+ return size, done, f.conn.mp.writeMsg(f.ctx, d)
+ })
+ return sent, err
+}
+
+// WriteMsg is like Write, but allows writing more than one buffer at a time.
+// The data in each buffer is written sequentially onto the flow. Returns the
+// number of bytes written. WriteMsg must return a non-nil error if it writes
+// less than the total number of bytes from all buffers.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
+ return f.writeMsg(false, parts...)
+}
+
+// WriteMsgAndClose performs WriteMsg and then closes the flow.
+// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
+// with themselves or each other.
+func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
+ return f.writeMsg(true, parts...)
+}
+
+// SetContext sets the context associated with the flow. Typically this is
+// used to set state that is only available after the flow is connected, such
+// as a more restricted flow timeout, or the language of the request.
+//
+// The flow.Manager associated with ctx must be the same flow.Manager that the
+// flow was dialed or accepted from, otherwise an error is returned.
+// TODO(mattr): enforce this restriction.
+//
+// TODO(mattr): update v23/flow documentation.
+// SetContext may not be called concurrently with other methods.
+func (f *flw) SetContext(ctx *context.T) error {
+ f.ctx = ctx
+ return nil
+}
+
+// LocalBlessings returns the blessings presented by the local end of the flow
+// during authentication.
+func (f *flw) LocalBlessings() security.Blessings {
+ if f.dialed() {
+ return f.dialerBlessings
+ }
+ return f.conn.AcceptorBlessings()
+}
+
+// RemoteBlessings returns the blessings presented by the remote end of the
+// flow during authentication.
+func (f *flw) RemoteBlessings() security.Blessings {
+ if f.dialed() {
+ return f.conn.AcceptorBlessings()
+ }
+ return f.dialerBlessings
+}
+
+// LocalDischarges returns the discharges presented by the local end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) LocalDischarges() map[string]security.Discharge {
+ if f.dialed() {
+ return f.dialerDischarges
+ }
+ return f.conn.AcceptorDischarges()
+}
+
+// RemoteDischarges returns the discharges presented by the remote end of the
+// flow during authentication.
+//
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (f *flw) RemoteDischarges() map[string]security.Discharge {
+ if f.dialed() {
+ return f.conn.AcceptorDischarges()
+ }
+ return f.dialerDischarges
+}
+
+// Conn returns the connection the flow is multiplexed on.
+func (f *flw) Conn() flow.Conn {
+ return f.conn
+}
+
+// Closed returns a channel that remains open until the flow has been closed or
+// the ctx to the Dial or Accept call used to create the flow has been cancelled.
+func (f *flw) Closed() <-chan struct{} {
+ return f.closed
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 7e9f335..c4f40eb 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -166,13 +166,14 @@
type data struct {
id flowID
flags uint64
- payload []byte
+ payload [][]byte
}
func (m *data) write(ctx *context.T, p *messagePipe) error {
p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
- return p.write([][]byte{{dataType}}, [][]byte{p.dataBuf, m.payload})
+ encrypted := append([][]byte{p.dataBuf}, m.payload...)
+ return p.write([][]byte{{dataType}}, encrypted)
}
func (m *data) read(ctx *context.T, orig []byte) error {
var (
@@ -187,7 +188,7 @@
if m.flags, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
}
- m.payload = data
+ m.payload = [][]byte{data}
return nil
}
@@ -195,17 +196,20 @@
type unencryptedData struct {
id flowID
flags uint64
- payload []byte
+ payload [][]byte
}
func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
// re-use the controlBuf for the data size.
- p.controlBuf = writeVarUint64(uint64(len(m.payload)), p.controlBuf[:0])
- return p.write(
- [][]byte{{unencryptedDataType}, p.controlBuf, m.payload},
- [][]byte{p.dataBuf})
+ size := uint64(0)
+ for _, b := range m.payload {
+ size += uint64(len(b))
+ }
+ p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
+ unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
+ return p.write(unencrypted, [][]byte{p.dataBuf})
}
func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
var (
@@ -220,7 +224,7 @@
if plen > len(data) {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
}
- m.payload, data = data[:plen], data[plen:]
+ m.payload, data = [][]byte{data[:plen]}, data[plen:]
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index 782a5a2..caf0d7e 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -9,10 +9,17 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc/version"
+ _ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test"
)
+func init() {
+ test.Init()
+}
+
func TestVarInt(t *testing.T) {
cases := []uint64{
0x00, 0x01,
@@ -42,39 +49,24 @@
}
}
-type fakeMsgReadWriter struct {
- bufs [][]byte
- t *testing.T
-}
-
-func (f *fakeMsgReadWriter) WriteMsg(data ...[]byte) (int, error) {
- buf := []byte{}
- for _, d := range data {
- buf = append(buf, d...)
- }
- f.bufs = append(f.bufs, buf)
- return len(buf), nil
-}
-
-func (f *fakeMsgReadWriter) ReadMsg() (buf []byte, err error) {
- // TODO(mattr): block if empty.
- buf, f.bufs = f.bufs[0], f.bufs[1:]
- f.t.Logf("reading: %v", buf)
- return buf, nil
-}
-
func testMessages(t *testing.T, cases []message) {
- ctx, cancel := context.RootContext()
- defer cancel()
- p := newMessagePipe(&fakeMsgReadWriter{t: t})
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ w, r := newMRWPair(ctx)
+ wp, rp := newMessagePipe(w), newMessagePipe(r)
for _, want := range cases {
- if err := p.writeMsg(ctx, want); err != nil {
- t.Errorf("unexpected error for %#v: %v", want, err)
- }
- got, err := p.readMsg(ctx)
+ ch := make(chan struct{})
+ go func() {
+ if err := wp.writeMsg(ctx, want); err != nil {
+ t.Errorf("unexpected error for %#v: %v", want, err)
+ }
+ close(ch)
+ }()
+ got, err := rp.readMsg(ctx)
if err != nil {
t.Errorf("unexpected error reading %#v: %v", want, err)
}
+ <-ch
if !reflect.DeepEqual(got, want) {
t.Errorf("got: %#v, want %#v", got, want)
}
@@ -111,12 +103,12 @@
func TestData(t *testing.T) {
testMessages(t, []message{
- &data{id: 1123, flags: 232, payload: []byte("fake payload")},
+ &data{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
})
}
func TestUnencryptedData(t *testing.T) {
testMessages(t, []message{
- &unencryptedData{id: 1123, flags: 232, payload: []byte("fake payload")},
+ &unencryptedData{id: 1123, flags: 232, payload: [][]byte{[]byte("fake payload")}},
})
}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
new file mode 100644
index 0000000..5123e3e
--- /dev/null
+++ b/runtime/internal/flow/conn/util_test.go
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/flow"
+)
+
+type mRW struct {
+ recieve <-chan []byte
+ send chan<- []byte
+ ctx *context.T
+}
+
+func newMRWPair(ctx *context.T) (flow.MsgReadWriter, flow.MsgReadWriter) {
+ ac, bc := make(chan []byte), make(chan []byte)
+ a := &mRW{recieve: ac, send: bc, ctx: ctx}
+ b := &mRW{recieve: bc, send: ac, ctx: ctx}
+ return a, b
+}
+
+func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
+ buf := []byte{}
+ for _, d := range data {
+ buf = append(buf, d...)
+ }
+ f.send <- buf
+ f.ctx.VI(5).Infof("Wrote: %v", buf)
+ return len(buf), nil
+}
+func (f *mRW) ReadMsg() (buf []byte, err error) {
+ buf = <-f.recieve
+ f.ctx.VI(5).Infof("Read: %v", buf)
+ return buf, nil
+}
+
+type fh chan<- flow.Flow
+
+func (fh fh) HandleFlow(f flow.Flow) error {
+ if fh == nil {
+ panic("writing to nil flow handler")
+ }
+ fh <- f
+ return nil
+}