blob: 4188ca22b2c002de4fa9049446fd054977f462b6 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package message
import (
// TODO(mattr): Link to protocol doc.
// Append serializes the message m and appends it to the given byte slice.
// The resulting slice is returned, as well as any error that occurs during
// serialization.
func Append(ctx *context.T, m Message, to []byte) ([]byte, error) {
return m.append(ctx, to)
// Read reads a message contained in the byte slice 'from'.
func Read(ctx *context.T, from []byte) (Message, error) {
if len(from) == 0 {
return nil, NewErrInvalidMsg(ctx, invalidType, 0, 0, nil)
var m Message
msgType, from := from[0], from[1:]
switch msgType {
case setupType:
m = &Setup{}
case tearDownType:
m = &TearDown{}
case enterLameDuckType:
m = &EnterLameDuck{}
case ackLameDuckType:
m = &AckLameDuck{}
case authType:
m = &Auth{}
case openFlowType:
m = &OpenFlow{}
case releaseType:
m = &Release{}
case dataType:
m = &Data{}
case multiProxyType:
m = &MultiProxyRequest{}
case proxyServerType:
m = &ProxyServerRequest{}
case proxyResponseType:
m = &ProxyResponse{}
case proxyErrorReponseType:
m = &ProxyErrorResponse{}
case healthCheckRequestType:
m = &HealthCheckRequest{}
case healthCheckResponseType:
m = &HealthCheckResponse{}
return nil, NewErrUnknownMsg(ctx, msgType)
return m,, from)
// Message is implemented by all low-level messages defined by this package.
type Message interface {
append(ctx *context.T, data []byte) ([]byte, error)
read(ctx *context.T, data []byte) error
// message types.
// Note that our types start from 0x7f and work their way down.
// This is important for the RPC transition. The old system had
// messages that started from zero and worked their way up.
const (
invalidType = 0x7f - iota
// setup options.
const (
invalidOption = iota
// data flags.
const (
// CloseFlag, when set on a Data message, indicates that the flow
// should be closed after the payload for that message has been
// processed.
CloseFlag = 1 << iota
// DisableEncryptionFlag, when set on a Data message, causes Append
// and Read to behave specially. During Append everything but the
// Payload is serialized normally, but the Payload is left for the
// caller to deal with. Typically the caller will encrypt the
// serialized result and send the unencrypted bytes as a raw follow
// on message. Read will expect to not find a payload, and the
// caller will typically attach it by examining the next message on
// the Conn.
// random consts.
const (
maxVarUint64Size = 9
// Setup is the first message over the wire. It negotiates protocol version
// and encryption options for connection.
// New fields to Setup must be added in order of creation. i.e. the order of the fields
// should not be changed.
type Setup struct {
Versions version.RPCVersionRange
PeerNaClPublicKey *[32]byte
PeerRemoteEndpoint naming.Endpoint
PeerLocalEndpoint naming.Endpoint
uninterpretedOptions []option
type option struct {
opt uint64
payload []byte
func appendSetupOption(option uint64, payload, buf []byte) []byte {
return appendLenBytes(payload, writeVarUint64(option, buf))
func readSetupOption(ctx *context.T, orig []byte) (opt uint64, p, d []byte, err error) {
var valid bool
if opt, d, valid = readVarUint64(ctx, orig); !valid {
err = NewErrInvalidSetupOption(ctx, invalidOption, 0)
} else if p, d, valid = readLenBytes(ctx, d); !valid {
err = NewErrInvalidSetupOption(ctx, opt, 1)
func (m *Setup) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, setupType)
data = writeVarUint64(uint64(m.Versions.Min), data)
data = writeVarUint64(uint64(m.Versions.Max), data)
if m.PeerNaClPublicKey != nil {
data = appendSetupOption(peerNaClPublicKeyOption,
m.PeerNaClPublicKey[:], data)
if m.PeerRemoteEndpoint != nil {
data = appendSetupOption(peerRemoteEndpointOption,
[]byte(m.PeerRemoteEndpoint.String()), data)
if m.PeerLocalEndpoint != nil {
data = appendSetupOption(peerLocalEndpointOption,
[]byte(m.PeerLocalEndpoint.String()), data)
for _, o := range m.uninterpretedOptions {
data = appendSetupOption(o.opt, o.payload, data)
return data, nil
func (m *Setup) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
v uint64
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, setupType, uint64(len(orig)), 0, nil)
m.Versions.Min = version.RPCVersion(v)
if v, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, setupType, uint64(len(orig)), 1, nil)
m.Versions.Max = version.RPCVersion(v)
for field := uint64(2); len(data) > 0; field++ {
var (
payload []byte
opt uint64
err error
if opt, payload, data, err = readSetupOption(ctx, data); err != nil {
return NewErrInvalidMsg(ctx, setupType, uint64(len(orig)), field, err)
switch opt {
case peerNaClPublicKeyOption:
m.PeerNaClPublicKey = new([32]byte)
copy(m.PeerNaClPublicKey[:], payload)
case peerRemoteEndpointOption:
m.PeerRemoteEndpoint, err = v23.NewEndpoint(string(payload))
case peerLocalEndpointOption:
m.PeerLocalEndpoint, err = v23.NewEndpoint(string(payload))
m.uninterpretedOptions = append(m.uninterpretedOptions, option{opt, payload})
if err != nil {
return NewErrInvalidMsg(ctx, setupType, uint64(len(orig)), field, err)
return nil
func (m *Setup) String() string {
return fmt.Sprintf("Versions:[%d,%d] PeerNaClPublicKey:%v PeerRemoteEndpoint:%v PeerLocalEndpoint:%v options:%v",
// TearDown is sent over the wire before a connection is closed.
type TearDown struct {
Message string
func (m *TearDown) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, tearDownType)
return append(data, []byte(m.Message)...), nil
func (m *TearDown) read(ctx *context.T, data []byte) error {
m.Message = string(data)
return nil
func (m *TearDown) String() string { return m.Message }
// EnterLameDuck is sent as notification that the sender is entering lameduck mode.
// The receiver should stop opening new flows on this connection and respond
// with an AckLameDuck message.
type EnterLameDuck struct{}
func (m *EnterLameDuck) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, enterLameDuckType), nil
func (m *EnterLameDuck) read(ctx *context.T, data []byte) error {
return nil
// AckLameDuck is sent in response to an EnterLameDuck message. After
// this message is received no more OpenFlow messages should arrive.
type AckLameDuck struct{}
func (m *AckLameDuck) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, ackLameDuckType), nil
func (m *AckLameDuck) read(ctx *context.T, data []byte) error {
return nil
// auth is used to complete the auth handshake.
type Auth struct {
BlessingsKey, DischargeKey uint64
ChannelBinding security.Signature
func (m *Auth) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, authType)
data = writeVarUint64(m.BlessingsKey, data)
data = writeVarUint64(m.DischargeKey, data)
data = appendLenBytes(m.ChannelBinding.Purpose, data)
data = appendLenBytes([]byte(m.ChannelBinding.Hash), data)
data = appendLenBytes(m.ChannelBinding.R, data)
data = appendLenBytes(m.ChannelBinding.S, data)
return data, nil
func (m *Auth) read(ctx *context.T, orig []byte) error {
var data, tmp []byte
var valid bool
if m.BlessingsKey, data, valid = readVarUint64(ctx, orig); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 0, nil)
if m.BlessingsKey == 0 {
return NewErrMissingBlessings(ctx, openFlowType)
if m.DischargeKey, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 1, nil)
if m.ChannelBinding.Purpose, data, valid = readLenBytes(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 2, nil)
if tmp, data, valid = readLenBytes(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 3, nil)
m.ChannelBinding.Hash = security.Hash(tmp)
if m.ChannelBinding.R, data, valid = readLenBytes(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 4, nil)
if m.ChannelBinding.S, data, valid = readLenBytes(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 5, nil)
return nil
// OpenFlow is sent at the beginning of every new flow, it optionally contains payload.
type OpenFlow struct {
ID uint64
InitialCounters uint64
BlessingsKey, DischargeKey uint64
Flags uint64
Payload [][]byte
func (m *OpenFlow) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, openFlowType)
data = writeVarUint64(m.ID, data)
data = writeVarUint64(m.InitialCounters, data)
data = writeVarUint64(m.BlessingsKey, data)
data = writeVarUint64(m.DischargeKey, data)
data = writeVarUint64(m.Flags, data)
if m.Flags&DisableEncryptionFlag == 0 {
for _, p := range m.Payload {
data = append(data, p...)
return data, nil
func (m *OpenFlow) read(ctx *context.T, orig []byte) error {
var (
data []byte
valid bool
if m.ID, data, valid = readVarUint64(ctx, orig); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 0, nil)
if m.InitialCounters, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 1, nil)
if m.BlessingsKey, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 2, nil)
if m.BlessingsKey == 0 {
return NewErrMissingBlessings(ctx, openFlowType)
if m.DischargeKey, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, openFlowType, uint64(len(orig)), 3, nil)
if m.Flags, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, dataType, uint64(len(orig)), 1, nil)
if m.Flags&DisableEncryptionFlag == 0 && len(data) > 0 {
m.Payload = [][]byte{data}
return nil
func (m *OpenFlow) String() string {
return fmt.Sprintf("ID:%d InitialCounters:%d BlessingsKey:0x%x DischargeKey:0x%x Flags:0x%x Payload:(%d bytes in %d slices)",
// Release is sent as flows are read from locally. The counters
// inform remote writers that there is local buffer space available.
type Release struct {
Counters map[uint64]uint64
func (m *Release) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, releaseType)
for fid, val := range m.Counters {
data = writeVarUint64(fid, data)
data = writeVarUint64(val, data)
return data, nil
func (m *Release) read(ctx *context.T, orig []byte) error {
var (
data = orig
valid bool
fid, val uint64
n uint64
if len(data) == 0 {
return nil
m.Counters = map[uint64]uint64{}
for len(data) > 0 {
if fid, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, releaseType, uint64(len(orig)), n, nil)
if val, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, releaseType, uint64(len(orig)), n+1, nil)
m.Counters[fid] = val
n += 2
return nil
// Data carries encrypted data for a specific flow.
type Data struct {
ID uint64
Flags uint64
Payload [][]byte
func (m *Data) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, dataType)
data = writeVarUint64(m.ID, data)
data = writeVarUint64(m.Flags, data)
if m.Flags&DisableEncryptionFlag == 0 {
for _, p := range m.Payload {
data = append(data, p...)
return data, nil
func (m *Data) read(ctx *context.T, orig []byte) error {
var (
data []byte
valid bool
if m.ID, data, valid = readVarUint64(ctx, orig); !valid {
return NewErrInvalidMsg(ctx, dataType, uint64(len(orig)), 0, nil)
if m.Flags, data, valid = readVarUint64(ctx, data); !valid {
return NewErrInvalidMsg(ctx, dataType, uint64(len(orig)), 1, nil)
if m.Flags&DisableEncryptionFlag == 0 && len(data) > 0 {
m.Payload = [][]byte{data}
return nil
func (m *Data) String() string {
return fmt.Sprintf("ID:%d Flags:0x%x Payload:(%d bytes in %d slices)", m.ID, m.Flags, payloadSize(m.Payload), len(m.Payload))
// MultiProxyRequest is sent when a proxy wants to accept connections from another proxy.
type MultiProxyRequest struct{}
func (m *MultiProxyRequest) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, multiProxyType), nil
func (m *MultiProxyRequest) read(ctx *context.T, orig []byte) error {
return nil
// ProxyServerRequest is sent when a server wants to listen through a proxy.
type ProxyServerRequest struct{}
func (m *ProxyServerRequest) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, proxyServerType), nil
func (m *ProxyServerRequest) read(ctx *context.T, orig []byte) error {
return nil
// ProxyResponse is sent by a proxy in response to a ProxyServerRequest or
// MultiProxyRequest. It notifies the server of the endpoints it should publish.
// Or, it notifies a requesting proxy of the endpoints it accepts connections from.
type ProxyResponse struct {
Endpoints []naming.Endpoint
func (m *ProxyResponse) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, proxyResponseType)
for _, ep := range m.Endpoints {
data = appendLenBytes([]byte(ep.String()), data)
return data, nil
func (m *ProxyResponse) read(ctx *context.T, orig []byte) error {
var (
data = orig
epBytes []byte
valid bool
for i := 0; len(data) > 0; i++ {
if epBytes, data, valid = readLenBytes(ctx, data); !valid {
return NewErrInvalidMsg(ctx, proxyResponseType, uint64(len(orig)), uint64(i), nil)
ep, err := v23.NewEndpoint(string(epBytes))
if err != nil {
return NewErrInvalidMsg(ctx, proxyResponseType, uint64(len(orig)), uint64(i), err)
m.Endpoints = append(m.Endpoints, ep)
return nil
func (m *ProxyResponse) String() string {
strs := make([]string, len(m.Endpoints))
for i, ep := range m.Endpoints {
strs[i] = ep.String()
return fmt.Sprintf("Endpoints:%v", strs)
// ProxyErrorResponse is send by a proxy in response to a ProxyServerRequest or
// MultiProxyRequest if the proxy encountered an error while responding to the
// request. It should be sent in lieu of a ProxyResponse.
type ProxyErrorResponse struct {
Error string
func (m *ProxyErrorResponse) append(ctx *context.T, data []byte) ([]byte, error) {
data = append(data, proxyErrorReponseType)
data = append(data, []byte(m.Error)...)
return data, nil
func (m *ProxyErrorResponse) read(ctx *context.T, orig []byte) error {
m.Error = string(orig)
return nil
func (m *ProxyErrorResponse) String() string {
return m.Error
// HealthCheckRequest is periodically sent to test the health of a channel.
type HealthCheckRequest struct{}
func (m *HealthCheckRequest) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, healthCheckRequestType), nil
func (m *HealthCheckRequest) read(ctx *context.T, data []byte) error {
return nil
// HealthCheckResponse messages are sent in response to HealthCheckRequest messages.
type HealthCheckResponse struct{}
func (m *HealthCheckResponse) append(ctx *context.T, data []byte) ([]byte, error) {
return append(data, healthCheckResponseType), nil
func (m *HealthCheckResponse) read(ctx *context.T, data []byte) error {
return nil
func appendLenBytes(b []byte, buf []byte) []byte {
buf = writeVarUint64(uint64(len(b)), buf)
return append(buf, b...)
func readLenBytes(ctx *context.T, data []byte) (b, rest []byte, valid bool) {
l, data, valid := readVarUint64(ctx, data)
if !valid || uint64(len(data)) < l {
return nil, data, false
if l > 0 {
return data[:l], data[l:], true
return nil, data, true
func readVarUint64(ctx *context.T, data []byte) (uint64, []byte, bool) {
if len(data) == 0 {
return 0, data, false
l := data[0]
if l <= 0x7f {
return uint64(l), data[1:], true
l = 0xff - l + 1
if l > 8 || len(data)-1 < int(l) {
return 0, data, false
var out uint64
for i := 1; i < int(l+1); i++ {
out = out<<8 | uint64(data[i])
return out, data[l+1:], true
func writeVarUint64(u uint64, buf []byte) []byte {
if u <= 0x7f {
return append(buf, byte(u))
shift, l := 56, byte(7)
for ; shift >= 0 && (u>>uint(shift))&0xff == 0; shift, l = shift-8, l-1 {
buf = append(buf, 0xff-l)
for ; shift >= 0; shift -= 8 {
buf = append(buf, byte(u>>uint(shift))&0xff)
return buf
func payloadSize(payload [][]byte) int {
sz := 0
for _, p := range payload {
sz += len(p)
return sz