runtime/internal/flow/conn: Read and write low-level messages.
This does not yet include encryption or blessings information.
Change-Id: If72e61b6d07f6d54e3a885112e6b7f0b80170afa
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 16ece68..6f440e8 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -12,6 +12,10 @@
"v.io/v23/security"
)
+// flowID is a number assigned to identify a flow.
+// Each flow on a given conn will have a unique number.
+type flowID uint64
+
// FlowHandlers process accepted flows.
type FlowHandler interface {
// HandleFlow processes an accepted flow.
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
new file mode 100644
index 0000000..253ec86
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -0,0 +1,19 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+ InvalidMsg(typ byte, size, field int64) {
+ "en":"message of type{:typ} and size{:size} failed decoding at field{:field}."}
+ InvalidControlMsg(cmd byte, size, field int64) {
+ "en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
+ UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
+ UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
new file mode 100644
index 0000000..15fb4cd
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -0,0 +1,49 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package conn
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/verror"
+)
+
+var (
+ ErrInvalidMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ ErrUnknownMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+ ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidMsg.ID), "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+}
+
+// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
+func NewErrInvalidMsg(ctx *context.T, typ byte, size int64, field int64) error {
+ return verror.New(ErrInvalidMsg, ctx, typ, size, field)
+}
+
+// NewErrInvalidControlMsg returns an error with the ErrInvalidControlMsg ID.
+func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size int64, field int64) error {
+ return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field)
+}
+
+// NewErrUnknownMsg returns an error with the ErrUnknownMsg ID.
+func NewErrUnknownMsg(ctx *context.T, typ byte) error {
+ return verror.New(ErrUnknownMsg, ctx, typ)
+}
+
+// NewErrUnknownControlMsg returns an error with the ErrUnknownControlMsg ID.
+func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
+ return verror.New(ErrUnknownControlMsg, ctx, cmd)
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
new file mode 100644
index 0000000..7e9f335
--- /dev/null
+++ b/runtime/internal/flow/conn/message.go
@@ -0,0 +1,329 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "errors"
+
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc/version"
+)
+
+// TODO(mattr): Link to protocol doc.
+
+type message interface {
+ write(ctx *context.T, p *messagePipe) error
+ read(ctx *context.T, data []byte) error
+}
+
+// message types.
+const (
+ invalidType = iota
+ controlType
+ dataType
+ unencryptedDataType
+)
+
+// control commands.
+const (
+ invalidCmd = iota
+ setupCmd
+ tearDownCmd
+ openFlowCmd
+ addRecieveBuffersCmd
+)
+
+// setup options.
+const (
+ invalidOption = iota
+ peerNaClPublicKeyOption
+ peerRemoteEndpointOption
+ peerLocalEndpointOption
+)
+
+// data flags.
+const (
+ closeFlag = 1 << iota
+ metadataFlag
+)
+
+// random consts.
+const (
+ maxVarUint64Size = 9
+)
+
+// setup is the first message over the wire. It negotiates protocol version
+// and encryption options for connection.
+type setup struct {
+ versions version.RPCVersionRange
+ PeerNaClPublicKey *[32]byte
+ PeerRemoteEndpoint naming.Endpoint
+ PeerLocalEndpoint naming.Endpoint
+}
+
+func (m *setup) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
+ return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
+}
+func (m *setup) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
+ }
+ m.versions.Min = version.RPCVersion(v)
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
+ }
+ m.versions.Max = version.RPCVersion(v)
+ return nil
+}
+
+// tearDown is sent over the wire before a connection is closed.
+type tearDown struct {
+ Err error
+}
+
+func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
+ return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Err.Error())})
+}
+func (m *tearDown) read(ctx *context.T, data []byte) error {
+ m.Err = errors.New(string(data))
+ return nil
+}
+
+// openFlow is sent at the beginning of every new flow.
+type openFlow struct {
+ id flowID
+ initialCounters uint64
+}
+
+func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+ p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
+ return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
+}
+func (m *openFlow) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
+ }
+ m.id = flowID(v)
+ if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
+ }
+ return nil
+}
+
+// addRecieveBuffers is sent as flows are read from locally. The counters
+// inform remote writers that there is local buffer space available.
+type addRecieveBuffers struct {
+ counters map[flowID]uint64
+}
+
+func (m *addRecieveBuffers) write(ctx *context.T, p *messagePipe) error {
+ p.controlBuf = p.controlBuf[:0]
+ for fid, val := range m.counters {
+ p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
+ p.controlBuf = writeVarUint64(val, p.controlBuf)
+ }
+ return p.write([][]byte{{controlType}}, [][]byte{{addRecieveBuffersCmd}, p.controlBuf})
+}
+func (m *addRecieveBuffers) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ fid, val uint64
+ n int64
+ )
+ m.counters = map[flowID]uint64{}
+ for len(data) > 0 {
+ if fid, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n)
+ }
+ if val, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n+1)
+ }
+ m.counters[flowID(fid)] = val
+ n += 2
+ }
+ return nil
+}
+
+// data carries encrypted data for a specific flow.
+type data struct {
+ id flowID
+ flags uint64
+ payload []byte
+}
+
+func (m *data) write(ctx *context.T, p *messagePipe) error {
+ p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+ p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+ return p.write([][]byte{{dataType}}, [][]byte{p.dataBuf, m.payload})
+}
+func (m *data) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
+ }
+ m.id = flowID(v)
+ if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
+ }
+ m.payload = data
+ return nil
+}
+
+// unencryptedData carries unencrypted data for a specific flow.
+type unencryptedData struct {
+ id flowID
+ flags uint64
+ payload []byte
+}
+
+func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
+ p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+ p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+ // re-use the controlBuf for the data size.
+ p.controlBuf = writeVarUint64(uint64(len(m.payload)), p.controlBuf[:0])
+ return p.write(
+ [][]byte{{unencryptedDataType}, p.controlBuf, m.payload},
+ [][]byte{p.dataBuf})
+}
+func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
+ var (
+ data = orig
+ valid bool
+ v uint64
+ )
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
+ }
+ plen := int(v)
+ if plen > len(data) {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
+ }
+ m.payload, data = data[:plen], data[plen:]
+ if v, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
+ }
+ m.id = flowID(v)
+ if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+ return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
+ }
+ return nil
+}
+
+type messagePipe struct {
+ rw flow.MsgReadWriter
+ controlBuf []byte
+ dataBuf []byte
+ outBuf [][]byte
+}
+
+func newMessagePipe(rw flow.MsgReadWriter) *messagePipe {
+ return &messagePipe{
+ rw: rw,
+ controlBuf: make([]byte, 256),
+ dataBuf: make([]byte, 2*maxVarUint64Size),
+ outBuf: make([][]byte, 5),
+ }
+}
+
+func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
+ p.outBuf = append(p.outBuf[:0], unencrypted...)
+ p.outBuf = append(p.outBuf, encrypted...)
+ _, err := p.rw.WriteMsg(p.outBuf...)
+ return err
+}
+
+func (p *messagePipe) writeMsg(ctx *context.T, m message) error {
+ return m.write(ctx, p)
+}
+
+func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
+ msg, err := p.rw.ReadMsg()
+ if len(msg) == 0 {
+ return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
+ }
+ if err != nil {
+ return nil, err
+ }
+ var m message
+ switch msg[0] {
+ case controlType:
+ if len(msg) == 1 {
+ return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
+ }
+ msg = msg[1:]
+ switch msg[0] {
+ case setupCmd:
+ m = &setup{}
+ case tearDownCmd:
+ m = &tearDown{}
+ case openFlowCmd:
+ m = &openFlow{}
+ case addRecieveBuffersCmd:
+ m = &addRecieveBuffers{}
+ default:
+ return nil, NewErrUnknownControlMsg(ctx, msg[0])
+ }
+ case dataType:
+ m = &data{}
+ case unencryptedDataType:
+ m = &unencryptedData{}
+ default:
+ return nil, NewErrUnknownMsg(ctx, msg[0])
+ }
+ return m, m.read(ctx, msg[1:])
+}
+
+func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
+ if len(data) == 0 {
+ return 0, data, false
+ }
+ l := data[0]
+ if l <= 0x7f {
+ return uint64(l), data[1:], true
+ }
+ l = 0xff - l + 1
+ if l > 8 || len(data)-1 < int(l) {
+ return 0, data, false
+ }
+ var out uint64
+ for i := 1; i < int(l+1); i++ {
+ out = out<<8 | uint64(data[i])
+ }
+ return out, data[l+1:], true
+}
+
+func writeVarUint64(u uint64, buf []byte) []byte {
+ if u <= 0x7f {
+ return append(buf, byte(u))
+ }
+ shift, l := 56, byte(7)
+ for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
+ }
+ buf = append(buf, 0xff-l)
+ for ; shift >= 0; shift -= 8 {
+ buf = append(buf, byte(u>>uint(shift))&0xff)
+ }
+ return buf
+}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
new file mode 100644
index 0000000..782a5a2
--- /dev/null
+++ b/runtime/internal/flow/conn/message_test.go
@@ -0,0 +1,122 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "errors"
+ "reflect"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc/version"
+)
+
+func TestVarInt(t *testing.T) {
+ cases := []uint64{
+ 0x00, 0x01,
+ 0x7f, 0x80,
+ 0xff, 0x100,
+ 0xffff, 0x10000,
+ 0xffffff, 0x1000000,
+ 0xffffffff, 0x100000000,
+ 0xffffffffff, 0x10000000000,
+ 0xffffffffffff, 0x1000000000000,
+ 0xffffffffffffff, 0x100000000000000,
+ 0xffffffffffffffff,
+ }
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ for _, want := range cases {
+ got, b, valid := readVarUint64(ctx, writeVarUint64(want, []byte{}))
+ if !valid {
+ t.Fatalf("error reading %x", want)
+ }
+ if len(b) != 0 {
+ t.Errorf("unexpected buffer remaining for %x: %v", want, b)
+ }
+ if got != want {
+ t.Errorf("got: %d want: %d", got, want)
+ }
+ }
+}
+
+type fakeMsgReadWriter struct {
+ bufs [][]byte
+ t *testing.T
+}
+
+func (f *fakeMsgReadWriter) WriteMsg(data ...[]byte) (int, error) {
+ buf := []byte{}
+ for _, d := range data {
+ buf = append(buf, d...)
+ }
+ f.bufs = append(f.bufs, buf)
+ return len(buf), nil
+}
+
+func (f *fakeMsgReadWriter) ReadMsg() (buf []byte, err error) {
+ // TODO(mattr): block if empty.
+ buf, f.bufs = f.bufs[0], f.bufs[1:]
+ f.t.Logf("reading: %v", buf)
+ return buf, nil
+}
+
+func testMessages(t *testing.T, cases []message) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ p := newMessagePipe(&fakeMsgReadWriter{t: t})
+ for _, want := range cases {
+ if err := p.writeMsg(ctx, want); err != nil {
+ t.Errorf("unexpected error for %#v: %v", want, err)
+ }
+ got, err := p.readMsg(ctx)
+ if err != nil {
+ t.Errorf("unexpected error reading %#v: %v", want, err)
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %#v, want %#v", got, want)
+ }
+ }
+}
+
+func TestSetup(t *testing.T) {
+ testMessages(t, []message{
+ &setup{versions: version.RPCVersionRange{Min: 3, Max: 5}},
+ })
+}
+
+func TestTearDown(t *testing.T) {
+ testMessages(t, []message{
+ &tearDown{Err: errors.New("foobar")},
+ })
+}
+
+func TestOpenFlow(t *testing.T) {
+ testMessages(t, []message{
+ &openFlow{id: 23, initialCounters: 1 << 20},
+ })
+}
+
+func TestAddReceiveBuffers(t *testing.T) {
+ testMessages(t, []message{
+ &addRecieveBuffers{counters: map[flowID]uint64{}},
+ &addRecieveBuffers{counters: map[flowID]uint64{
+ 4: 233,
+ 9: 423242,
+ }},
+ })
+}
+
+func TestData(t *testing.T) {
+ testMessages(t, []message{
+ &data{id: 1123, flags: 232, payload: []byte("fake payload")},
+ })
+}
+
+func TestUnencryptedData(t *testing.T) {
+ testMessages(t, []message{
+ &unencryptedData{id: 1123, flags: 232, payload: []byte("fake payload")},
+ })
+}