blob: c4f40eb07f7bff47f2a3ed5f63a5870e054a02c7 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"errors"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
)
// TODO(mattr): Link to protocol doc.
type message interface {
write(ctx *context.T, p *messagePipe) error
read(ctx *context.T, data []byte) error
}
// message types.
const (
invalidType = iota
controlType
dataType
unencryptedDataType
)
// control commands.
const (
invalidCmd = iota
setupCmd
tearDownCmd
openFlowCmd
addRecieveBuffersCmd
)
// setup options.
const (
invalidOption = iota
peerNaClPublicKeyOption
peerRemoteEndpointOption
peerLocalEndpointOption
)
// data flags.
const (
closeFlag = 1 << iota
metadataFlag
)
// random consts.
const (
maxVarUint64Size = 9
)
// setup is the first message over the wire. It negotiates protocol version
// and encryption options for connection.
type setup struct {
versions version.RPCVersionRange
PeerNaClPublicKey *[32]byte
PeerRemoteEndpoint naming.Endpoint
PeerLocalEndpoint naming.Endpoint
}
func (m *setup) write(ctx *context.T, p *messagePipe) error {
p.controlBuf = writeVarUint64(uint64(m.versions.Min), p.controlBuf[:0])
p.controlBuf = writeVarUint64(uint64(m.versions.Max), p.controlBuf)
return p.write([][]byte{{controlType}}, [][]byte{{setupCmd}, p.controlBuf})
}
func (m *setup) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 0)
}
m.versions.Min = version.RPCVersion(v)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, setupCmd, int64(len(orig)), 1)
}
m.versions.Max = version.RPCVersion(v)
return nil
}
// tearDown is sent over the wire before a connection is closed.
type tearDown struct {
Err error
}
func (m *tearDown) write(ctx *context.T, p *messagePipe) error {
return p.write([][]byte{{controlType}}, [][]byte{{tearDownCmd}, []byte(m.Err.Error())})
}
func (m *tearDown) read(ctx *context.T, data []byte) error {
m.Err = errors.New(string(data))
return nil
}
// openFlow is sent at the beginning of every new flow.
type openFlow struct {
id flowID
initialCounters uint64
}
func (m *openFlow) write(ctx *context.T, p *messagePipe) error {
p.controlBuf = writeVarUint64(uint64(m.id), p.controlBuf[:0])
p.controlBuf = writeVarUint64(m.initialCounters, p.controlBuf)
return p.write([][]byte{{controlType}}, [][]byte{{openFlowCmd}, p.controlBuf})
}
func (m *openFlow) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 0)
}
m.id = flowID(v)
if m.initialCounters, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, openFlowCmd, int64(len(orig)), 1)
}
return nil
}
// addRecieveBuffers is sent as flows are read from locally. The counters
// inform remote writers that there is local buffer space available.
type addRecieveBuffers struct {
counters map[flowID]uint64
}
func (m *addRecieveBuffers) write(ctx *context.T, p *messagePipe) error {
p.controlBuf = p.controlBuf[:0]
for fid, val := range m.counters {
p.controlBuf = writeVarUint64(uint64(fid), p.controlBuf)
p.controlBuf = writeVarUint64(val, p.controlBuf)
}
return p.write([][]byte{{controlType}}, [][]byte{{addRecieveBuffersCmd}, p.controlBuf})
}
func (m *addRecieveBuffers) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
fid, val uint64
n int64
)
m.counters = map[flowID]uint64{}
for len(data) > 0 {
if fid, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n)
}
if val, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidControlMsg(ctx, addRecieveBuffersCmd, int64(len(orig)), n+1)
}
m.counters[flowID(fid)] = val
n += 2
}
return nil
}
// data carries encrypted data for a specific flow.
type data struct {
id flowID
flags uint64
payload [][]byte
}
func (m *data) write(ctx *context.T, p *messagePipe) error {
p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
encrypted := append([][]byte{p.dataBuf}, m.payload...)
return p.write([][]byte{{dataType}}, encrypted)
}
func (m *data) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 0)
}
m.id = flowID(v)
if m.flags, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, dataType, int64(len(orig)), 1)
}
m.payload = [][]byte{data}
return nil
}
// unencryptedData carries unencrypted data for a specific flow.
type unencryptedData struct {
id flowID
flags uint64
payload [][]byte
}
func (m *unencryptedData) write(ctx *context.T, p *messagePipe) error {
p.dataBuf = writeVarUint64(uint64(m.id), p.dataBuf[:0])
p.dataBuf = writeVarUint64(m.flags, p.dataBuf)
// re-use the controlBuf for the data size.
size := uint64(0)
for _, b := range m.payload {
size += uint64(len(b))
}
p.controlBuf = writeVarUint64(size, p.controlBuf[:0])
unencrypted := append([][]byte{[]byte{unencryptedDataType}, p.controlBuf}, m.payload...)
return p.write(unencrypted, [][]byte{p.dataBuf})
}
func (m *unencryptedData) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
v uint64
)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 0)
}
plen := int(v)
if plen > len(data) {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 1)
}
m.payload, data = [][]byte{data[:plen]}, data[plen:]
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 2)
}
m.id = flowID(v)
if m.flags, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, unencryptedDataType, int64(len(orig)), 3)
}
return nil
}
type messagePipe struct {
rw flow.MsgReadWriter
controlBuf []byte
dataBuf []byte
outBuf [][]byte
}
func newMessagePipe(rw flow.MsgReadWriter) *messagePipe {
return &messagePipe{
rw: rw,
controlBuf: make([]byte, 256),
dataBuf: make([]byte, 2*maxVarUint64Size),
outBuf: make([][]byte, 5),
}
}
func (p *messagePipe) write(unencrypted [][]byte, encrypted [][]byte) error {
p.outBuf = append(p.outBuf[:0], unencrypted...)
p.outBuf = append(p.outBuf, encrypted...)
_, err := p.rw.WriteMsg(p.outBuf...)
return err
}
func (p *messagePipe) writeMsg(ctx *context.T, m message) error {
return m.write(ctx, p)
}
func (p *messagePipe) readMsg(ctx *context.T) (message, error) {
msg, err := p.rw.ReadMsg()
if len(msg) == 0 {
return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0)
}
if err != nil {
return nil, err
}
var m message
switch msg[0] {
case controlType:
if len(msg) == 1 {
return nil, NewErrInvalidControlMsg(ctx, invalidCmd, 0, 1)
}
msg = msg[1:]
switch msg[0] {
case setupCmd:
m = &setup{}
case tearDownCmd:
m = &tearDown{}
case openFlowCmd:
m = &openFlow{}
case addRecieveBuffersCmd:
m = &addRecieveBuffers{}
default:
return nil, NewErrUnknownControlMsg(ctx, msg[0])
}
case dataType:
m = &data{}
case unencryptedDataType:
m = &unencryptedData{}
default:
return nil, NewErrUnknownMsg(ctx, msg[0])
}
return m, m.read(ctx, msg[1:])
}
func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
if len(data) == 0 {
return 0, data, false
}
l := data[0]
if l <= 0x7f {
return uint64(l), data[1:], true
}
l = 0xff - l + 1
if l > 8 || len(data)-1 < int(l) {
return 0, data, false
}
var out uint64
for i := 1; i < int(l+1); i++ {
out = out<<8 | uint64(data[i])
}
return out, data[l+1:], true
}
func writeVarUint64(u uint64, buf []byte) []byte {
if u <= 0x7f {
return append(buf, byte(u))
}
shift, l := 56, byte(7)
for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
}
buf = append(buf, 0xff-l)
for ; shift >= 0; shift -= 8 {
buf = append(buf, byte(u>>uint(shift))&0xff)
}
return buf
}