runtime/internal/flow/conn: Read and write low-level messages.

This does not yet include encryption or blessings information.

Change-Id: If72e61b6d07f6d54e3a885112e6b7f0b80170afa
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 16ece68..6f440e8 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -12,6 +12,10 @@
 	"v.io/v23/security"
 )
 
+// flowID is a number assigned to identify a flow.
+// Each flow on a given conn will have a unique number.
+type flowID uint64
+
 // FlowHandlers process accepted flows.
 type FlowHandler interface {
 	// HandleFlow processes an accepted flow.
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
new file mode 100644
index 0000000..253ec86
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -0,0 +1,19 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+  InvalidMsg(typ byte, size, field int64) {
+  "en":"message of type{:typ} and size{:size} failed decoding at field{:field}."}
+  InvalidControlMsg(cmd byte, size, field int64) {
+  "en":"control message of cmd{:cmd} and size{:size} failed decoding at field{:field}."}
+  UnknownMsg(typ byte) {"en":"unknown message type{:typ}."}
+  UnknownControlMsg(cmd byte) {"en":"unknown control command{:cmd}."}
+)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
new file mode 100644
index 0000000..15fb4cd
--- /dev/null
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -0,0 +1,49 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package conn
+
+import (
+	// VDL system imports
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/verror"
+)
+
+var (
+	ErrInvalidMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	ErrInvalidControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+	ErrUnknownMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+	ErrUnknownControlMsg = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+)
+
+func init() {
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidMsg.ID), "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidControlMsg.ID), "{1:}{2:} control message of cmd{:3} and size{:4} failed decoding at field{:5}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownMsg.ID), "{1:}{2:} unknown message type{:3}.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
+}
+
+// NewErrInvalidMsg returns an error with the ErrInvalidMsg ID.
+func NewErrInvalidMsg(ctx *context.T, typ byte, size int64, field int64) error {
+	return verror.New(ErrInvalidMsg, ctx, typ, size, field)
+}
+
+// NewErrInvalidControlMsg returns an error with the ErrInvalidControlMsg ID.
+func NewErrInvalidControlMsg(ctx *context.T, cmd byte, size int64, field int64) error {
+	return verror.New(ErrInvalidControlMsg, ctx, cmd, size, field)
+}
+
+// NewErrUnknownMsg returns an error with the ErrUnknownMsg ID.
+func NewErrUnknownMsg(ctx *context.T, typ byte) error {
+	return verror.New(ErrUnknownMsg, ctx, typ)
+}
+
+// NewErrUnknownControlMsg returns an error with the ErrUnknownControlMsg ID.
+func NewErrUnknownControlMsg(ctx *context.T, cmd byte) error {
+	return verror.New(ErrUnknownControlMsg, ctx, cmd)
+}
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
new file mode 100644
index 0000000..7e9f335
--- /dev/null
+++ b/runtime/internal/flow/conn/message.go
@@ -0,0 +1,329 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"errors"
+
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc/version"
+)
+
+// TODO(mattr): Link to protocol doc.
+
+type message interface {
+	write(ctx *context.T, p *messagePipe) error
+	read(ctx *context.T, data []byte) error
+}
+
+// message types.
+const (
+	invalidType = iota
+	controlType
+	dataType
+	unencryptedDataType
+)
+
+// control commands.
+const (
+	invalidCmd = iota
+	setupCmd
+	tearDownCmd
+	openFlowCmd
+	addRecieveBuffersCmd
+)
+
+// setup options.
+const (
+	invalidOption = iota
+	peerNaClPublicKeyOption
+	peerRemoteEndpointOption
+	peerLocalEndpointOption
+)
+
+// data flags.
+const (
+	closeFlag = 1 << iota
+	metadataFlag
+)
+
+// random consts.
+const (
+	maxVarUint64Size = 9
+)
+
+// setup is the first message over the wire.  It negotiates protocol version
+// and encryption options for connection.
+type setup struct {
+	versions           version.RPCVersionRange
+	PeerNaClPublicKey  *[32]byte
+	PeerRemoteEndpoint naming.Endpoint
+	PeerLocalEndpoint  naming.Endpoint
+}
+
+func (m *setup) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
+	p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
+	return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
+}
+func (m *setup) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
+	}
+	m.versions.Min = version.RPCVersion(v)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
+	}
+	m.versions.Max = version.RPCVersion(v)
+	return nil
+}
+
+// tearDown is sent over the wire before a connection is closed.
+type tearDown struct {
+	Err error
+}
+
+func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
+	return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Err.Error())})
+}
+func (m *tearDown) read(ctx *context.T, data []byte) error {
+	m.Err = errors.New(string(data))
+	return nil
+}
+
+// openFlow is sent at the beginning of every new flow.
+type openFlow struct {
+	id              flowID
+	initialCounters uint64
+}
+
+func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
+	p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
+	return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
+}
+func (m *openFlow) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
+	}
+	m.id = flowID(v)
+	if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
+	}
+	return nil
+}
+
+// addRecieveBuffers is sent as flows are read from locally.  The counters
+// inform remote writers that there is local buffer space available.
+type addRecieveBuffers struct {
+	counters map[flowID]uint64
+}
+
+func (m *addRecieveBuffers) write(ctx *context.T, p *messagePipe) error {
+	p.controlBuf = p.controlBuf[:0]
+	for fid, val := range m.counters {
+		p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
+		p.controlBuf = writeVarUint64(val, p.controlBuf)
+	}
+	return p.write([][]byte{{controlType}}, [][]byte{{addRecieveBuffersCmd}, p.controlBuf})
+}
+func (m *addRecieveBuffers) read(ctx *context.T, orig []byte) error {
+	var (
+		data     = orig
+		valid    bool
+		fid, val uint64
+		n        int64
+	)
+	m.counters = map[flowID]uint64{}
+	for len(data) > 0 {
+		if fid, data, valid = readVarUint64(ctx, data); !valid {
+			return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n)
+		}
+		if val, data, valid = readVarUint64(ctx, data); !valid {
+			return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n+1)
+		}
+		m.counters[flowID(fid)] = val
+		n += 2
+	}
+	return nil
+}
+
+// data carries encrypted data for a specific flow.
+type data struct {
+	id      flowID
+	flags   uint64
+	payload []byte
+}
+
+func (m *data) write(ctx *context.T, p *messagePipe) error {
+	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+	return p.write([][]byte{{dataType}}, [][]byte{p.dataBuf, m.payload})
+}
+func (m *data) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
+	}
+	m.id = flowID(v)
+	if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
+	}
+	m.payload = data
+	return nil
+}
+
+// unencryptedData carries unencrypted data for a specific flow.
+type unencryptedData struct {
+	id      flowID
+	flags   uint64
+	payload []byte
+}
+
+func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
+	p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
+	p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
+	// re-use the controlBuf for the data size.
+	p.controlBuf = writeVarUint64(uint64(len(m.payload)), p.controlBuf[:0])
+	return p.write(
+		[][]byte{{unencryptedDataType}, p.controlBuf, m.payload},
+		[][]byte{p.dataBuf})
+}
+func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
+	var (
+		data  = orig
+		valid bool
+		v     uint64
+	)
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
+	}
+	plen := int(v)
+	if plen > len(data) {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
+	}
+	m.payload, data = data[:plen], data[plen:]
+	if v, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
+	}
+	m.id = flowID(v)
+	if m.flags, data, valid = readVarUint64(ctx, data); !valid {
+		return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
+	}
+	return nil
+}
+
+type messagePipe struct {
+	rw         flow.MsgReadWriter
+	controlBuf []byte
+	dataBuf    []byte
+	outBuf     [][]byte
+}
+
+func newMessagePipe(rw flow.MsgReadWriter) *messagePipe {
+	return &messagePipe{
+		rw:         rw,
+		controlBuf: make([]byte, 256),
+		dataBuf:    make([]byte, 2*maxVarUint64Size),
+		outBuf:     make([][]byte, 5),
+	}
+}
+
+func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
+	p.outBuf = append(p.outBuf[:0], unencrypted...)
+	p.outBuf = append(p.outBuf, encrypted...)
+	_, err := p.rw.WriteMsg(p.outBuf...)
+	return err
+}
+
+func (p *messagePipe) writeMsg(ctx *context.T, m message) error {
+	return m.write(ctx, p)
+}
+
+func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
+	msg, err := p.rw.ReadMsg()
+	if len(msg) == 0 {
+		return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
+	}
+	if err != nil {
+		return nil, err
+	}
+	var m message
+	switch msg[0] {
+	case controlType:
+		if len(msg) == 1 {
+			return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
+		}
+		msg = msg[1:]
+		switch msg[0] {
+		case setupCmd:
+			m = &setup{}
+		case tearDownCmd:
+			m = &tearDown{}
+		case openFlowCmd:
+			m = &openFlow{}
+		case addRecieveBuffersCmd:
+			m = &addRecieveBuffers{}
+		default:
+			return nil, NewErrUnknownControlMsg(ctx, msg[0])
+		}
+	case dataType:
+		m = &data{}
+	case unencryptedDataType:
+		m = &unencryptedData{}
+	default:
+		return nil, NewErrUnknownMsg(ctx, msg[0])
+	}
+	return m, m.read(ctx, msg[1:])
+}
+
+func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
+	if len(data) == 0 {
+		return 0, data, false
+	}
+	l := data[0]
+	if l <= 0x7f {
+		return uint64(l), data[1:], true
+	}
+	l = 0xff - l + 1
+	if l > 8 || len(data)-1 < int(l) {
+		return 0, data, false
+	}
+	var out uint64
+	for i := 1; i < int(l+1); i++ {
+		out = out<<8 | uint64(data[i])
+	}
+	return out, data[l+1:], true
+}
+
+func writeVarUint64(u uint64, buf []byte) []byte {
+	if u <= 0x7f {
+		return append(buf, byte(u))
+	}
+	shift, l := 56, byte(7)
+	for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
+	}
+	buf = append(buf, 0xff-l)
+	for ; shift >= 0; shift -= 8 {
+		buf = append(buf, byte(u>>uint(shift))&0xff)
+	}
+	return buf
+}
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
new file mode 100644
index 0000000..782a5a2
--- /dev/null
+++ b/runtime/internal/flow/conn/message_test.go
@@ -0,0 +1,122 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+	"errors"
+	"reflect"
+	"testing"
+
+	"v.io/v23/context"
+	"v.io/v23/rpc/version"
+)
+
+func TestVarInt(t *testing.T) {
+	cases := []uint64{
+		0x00, 0x01,
+		0x7f, 0x80,
+		0xff, 0x100,
+		0xffff, 0x10000,
+		0xffffff, 0x1000000,
+		0xffffffff, 0x100000000,
+		0xffffffffff, 0x10000000000,
+		0xffffffffffff, 0x1000000000000,
+		0xffffffffffffff, 0x100000000000000,
+		0xffffffffffffffff,
+	}
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	for _, want := range cases {
+		got, b, valid := readVarUint64(ctx, writeVarUint64(want, []byte{}))
+		if !valid {
+			t.Fatalf("error reading %x", want)
+		}
+		if len(b) != 0 {
+			t.Errorf("unexpected buffer remaining for %x: %v", want, b)
+		}
+		if got != want {
+			t.Errorf("got: %d want: %d", got, want)
+		}
+	}
+}
+
+type fakeMsgReadWriter struct {
+	bufs [][]byte
+	t    *testing.T
+}
+
+func (f *fakeMsgReadWriter) WriteMsg(data ...[]byte) (int, error) {
+	buf := []byte{}
+	for _, d := range data {
+		buf = append(buf, d...)
+	}
+	f.bufs = append(f.bufs, buf)
+	return len(buf), nil
+}
+
+func (f *fakeMsgReadWriter) ReadMsg() (buf []byte, err error) {
+	// TODO(mattr): block if empty.
+	buf, f.bufs = f.bufs[0], f.bufs[1:]
+	f.t.Logf("reading: %v", buf)
+	return buf, nil
+}
+
+func testMessages(t *testing.T, cases []message) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	p := newMessagePipe(&fakeMsgReadWriter{t: t})
+	for _, want := range cases {
+		if err := p.writeMsg(ctx, want); err != nil {
+			t.Errorf("unexpected error for %#v: %v", want, err)
+		}
+		got, err := p.readMsg(ctx)
+		if err != nil {
+			t.Errorf("unexpected error reading %#v: %v", want, err)
+		}
+		if !reflect.DeepEqual(got, want) {
+			t.Errorf("got: %#v, want %#v", got, want)
+		}
+	}
+}
+
+func TestSetup(t *testing.T) {
+	testMessages(t, []message{
+		&setup{versions: version.RPCVersionRange{Min: 3, Max: 5}},
+	})
+}
+
+func TestTearDown(t *testing.T) {
+	testMessages(t, []message{
+		&tearDown{Err: errors.New("foobar")},
+	})
+}
+
+func TestOpenFlow(t *testing.T) {
+	testMessages(t, []message{
+		&openFlow{id: 23, initialCounters: 1 << 20},
+	})
+}
+
+func TestAddReceiveBuffers(t *testing.T) {
+	testMessages(t, []message{
+		&addRecieveBuffers{counters: map[flowID]uint64{}},
+		&addRecieveBuffers{counters: map[flowID]uint64{
+			4: 233,
+			9: 423242,
+		}},
+	})
+}
+
+func TestData(t *testing.T) {
+	testMessages(t, []message{
+		&data{id: 1123, flags: 232, payload: []byte("fake payload")},
+	})
+}
+
+func TestUnencryptedData(t *testing.T) {
+	testMessages(t, []message{
+		&unencryptedData{id: 1123, flags: 232, payload: []byte("fake payload")},
+	})
+}