blob: 428dcf80e426369368cb0ebb8fc9a6f1204c225d [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vdl
import (
"errors"
"fmt"
"math"
"reflect"
"sync"
)
var (
errEmptyPipeStack = errors.New("vdl: empty pipe stack")
errEncCallDuringDecPhase = errors.New("vdl: pipe encoder method called during decode phase")
errInvalidPipeState = errors.New("vdl: invalid pipe state")
)
func convertPipe(dst, src interface{}) error {
enc, dec := newPipe()
go func() {
enc.Close(Write(enc, src))
}()
return dec.Close(Read(dec, dst))
}
func convertPipeReflect(dst, src reflect.Value) error {
enc, dec := newPipe()
go func() {
enc.Close(WriteReflect(enc, src))
}()
return dec.Close(ReadReflect(dec, dst))
}
type pipeStackEntry struct {
Type *Type
NextOp pipeOp
LenHint int
Index int
NumStarted int
IsOptional bool
IsNil bool
}
// pipeState represents the current state of the pipe controller
// pipeState is used to control our blocking, which ping-pongs between the encoder and decoder.
// Once the closed state is reached, everything unblocks.
type pipeState int
const (
pipeStateEncoder pipeState = iota
pipeStateDecoder
pipeStateClosed
)
func (x pipeState) String() string {
switch x {
case pipeStateEncoder:
return "Encoder"
case pipeStateDecoder:
return "Decoder"
case pipeStateClosed:
return "Closed"
default:
panic("bad controller state")
}
}
type pipeOp int
// pipeOp is used to check our invariants for state transitions.
const (
pipeStartEnc pipeOp = iota
pipeStartDec
pipeFinishEnc
pipeFinishDec
)
func (op pipeOp) String() string {
switch op {
case pipeStartEnc:
return "StartEnc"
case pipeStartDec:
return "StartDec"
case pipeFinishEnc:
return "FinishEnc"
case pipeFinishDec:
return "FinishDec"
default:
panic("bad op")
}
}
func (op pipeOp) Next() pipeOp {
if op == pipeFinishDec {
op = pipeStartEnc
} else {
op++
}
return op
}
// We can only determine whether the next value is AnyType
// by checking the next type of the entry.
func (entry *pipeStackEntry) nextValueIsAny() bool {
if entry == nil {
return false
}
switch entry.Type.Kind() {
case List, Array:
return entry.Type.Elem() == AnyType
case Set:
return entry.Type.Key() == AnyType
case Map:
switch entry.NumStarted % 2 {
case 1:
return entry.Type.Key() == AnyType
case 0:
return entry.Type.Elem() == AnyType
}
case Struct, Union:
return entry.Type.Field(entry.Index).Type == AnyType
}
return false
}
type numberType int
const (
numberUint numberType = iota
numberInt
numberFloat
)
type pipeEncoder struct {
Stack []pipeStackEntry
NextEntryDone bool
NextFieldName string
NextStartValueIsOptional bool // The StartValue refers to an optional type.
NumberType numberType // The number type X in EncodeX.
IsBytes bool // Is the current list from EncodeBytes().
DecHandlingBytes bool // When decoding from []byte received from EncodeBytes, skip
// the calls to the Decoder's StartValue/FinishValue
// because the encoder will not receive matching calls.
DecStarted bool // Decoding has started
Err error
Mu sync.Mutex
Cond sync.Cond
State pipeState
// Arguments from Encode* to be passed to Decode*:
ArgBool bool
ArgUint uint64
ArgInt int64
ArgFloat float64
ArgString string
ArgBytes []byte
ArgType *Type
}
type pipeDecoder struct {
Enc pipeEncoder
IgnoringNextStartValue bool
}
func (e *pipeEncoder) top() *pipeStackEntry {
if len(e.Stack) == 0 {
return nil
}
return &e.Stack[len(e.Stack)-1]
}
func (d *pipeDecoder) top() *pipeStackEntry {
if len(d.Enc.Stack) == 0 {
return nil
}
return &d.Enc.Stack[len(d.Enc.Stack)-1]
}
func newPipe() (*pipeEncoder, *pipeDecoder) {
dec := &pipeDecoder{Enc: pipeEncoder{}}
dec.Enc.Cond.L = &dec.Enc.Mu
return &dec.Enc, dec
}
func (e *pipeEncoder) closeLocked(err error) error {
if err != nil && e.Err == nil {
e.Err = err
}
e.State = pipeStateClosed
e.Cond.Broadcast()
return e.Err
}
func (e *pipeEncoder) Close(err error) error {
e.Mu.Lock()
defer e.Mu.Unlock()
return e.closeLocked(err)
}
func (d *pipeDecoder) Close(err error) error {
return d.Enc.Close(err)
}
func (e *pipeEncoder) SetNextStartValueIsOptional() {
e.NextStartValueIsOptional = true
}
func (e *pipeEncoder) NilValue(tt *Type) error {
switch tt.Kind() {
case Any:
case Optional:
e.SetNextStartValueIsOptional()
default:
return fmt.Errorf("concrete types disallowed for NilValue (type was %v)", tt)
}
if err := e.StartValue(tt); err != nil {
return err
}
top := e.top()
if top == nil {
return e.closeLocked(errEmptyPipeStack)
}
top.IsNil = true
return e.FinishValue()
}
func (e *pipeEncoder) StartValue(tt *Type) error {
e.Mu.Lock()
defer e.Mu.Unlock()
if e.State != pipeStateEncoder {
return e.closeLocked(errInvalidPipeState)
}
if err := e.waitLocked(); err != nil {
return err
}
top := e.top()
if top != nil {
top.NumStarted++
}
e.Stack = append(e.Stack, pipeStackEntry{
Type: tt,
NextOp: pipeStartDec,
LenHint: -1,
IsOptional: e.NextStartValueIsOptional,
Index: -1,
})
e.NextStartValueIsOptional = false
return e.Err
}
func (e *pipeEncoder) FinishValue() error {
e.Mu.Lock()
defer e.Mu.Unlock()
if e.State != pipeStateEncoder {
return e.closeLocked(errInvalidPipeState)
}
if err := e.waitLocked(); err != nil {
return err
}
top := e.top()
if top == nil {
return e.closeLocked(errEmptyPipeStack)
}
if got, want := top.NextOp, pipeFinishEnc; got != want {
return e.closeLocked(fmt.Errorf("vdl: pipe got state %v, want %v", got, want))
}
top.NextOp = top.NextOp.Next()
return e.Err
}
func (e *pipeEncoder) waitLocked() error {
top := e.top()
if e.State == pipeStateClosed {
return e.Err
}
if top != nil {
e.State = pipeStateDecoder
e.Cond.Broadcast()
for e.State == pipeStateDecoder {
e.Cond.Wait()
}
if e.State == pipeStateClosed {
return e.Err
}
}
return nil
}
func (d *pipeDecoder) StartValue(want *Type) error {
d.Enc.Mu.Lock()
defer d.Enc.Mu.Unlock()
if d.Enc.DecStarted && d.Enc.State != pipeStateDecoder {
return d.Enc.closeLocked(errInvalidPipeState)
}
if d.IgnoringNextStartValue {
d.IgnoringNextStartValue = false
return d.Enc.Err
}
if d.Enc.DecHandlingBytes {
return d.Enc.Err
}
if err := d.waitLocked(false); err != nil {
return err
}
top := d.top()
if top == nil {
return d.Enc.closeLocked(errEmptyPipeStack)
}
// Check compatibility between the actual type and the want type. Since
// compatibility applies to the entire static type, we only need to perform
// this check for top-level decoded values, and subsequently for decoded any
// values. We skip checking non-composite want types, since those will be
// naturally caught by the Decode* calls anyways.
if len(d.Enc.Stack) == 1 || d.IsAny() {
switch want.Kind() {
case Optional, Array, List, Set, Map, Struct, Union:
if !Compatible2(d.Type(), want) {
return d.Enc.closeLocked(fmt.Errorf("vdl: pipe incompatible decode from %v into %v", d.Type(), want))
}
}
}
if got, want := top.NextOp, pipeStartDec; got != want {
return d.Enc.closeLocked(fmt.Errorf("vdl: pipe got state %v, want %v", got, want))
}
top.NextOp = top.NextOp.Next()
return d.Enc.Err
}
func (d *pipeDecoder) FinishValue() error {
d.Enc.Mu.Lock()
defer d.Enc.Mu.Unlock()
switch {
case d.Enc.State == pipeStateEncoder:
return d.Enc.closeLocked(errInvalidPipeState)
case d.Enc.State == pipeStateClosed:
return d.Enc.Err
}
if d.Enc.DecHandlingBytes {
return d.Enc.Err
}
if err := d.waitLocked(true); err != nil {
return err
}
top := d.top()
if top == nil {
return d.Enc.closeLocked(errEmptyPipeStack)
}
if got, want := top.NextOp, pipeFinishDec; got != want {
return d.Enc.closeLocked(fmt.Errorf("vdl: pipe got state %v, want %v", got, want))
}
d.Enc.Stack = d.Enc.Stack[:len(d.Enc.Stack)-1]
return d.Enc.Err
}
func (d *pipeDecoder) waitLocked(isFinish bool) error {
if d.Enc.State == pipeStateClosed {
return d.Enc.Err
}
if isFinish || d.Enc.DecStarted {
d.Enc.State = pipeStateEncoder
d.Enc.Cond.Broadcast()
}
d.Enc.DecStarted = true
for d.Enc.State == pipeStateEncoder {
d.Enc.Cond.Wait()
}
if d.Enc.State == pipeStateClosed {
return d.Enc.Err
}
return nil
}
func (d *pipeDecoder) SkipValue() error {
if err := d.StartValue(AnyType); err != nil {
return err
}
return d.FinishValue()
}
func (d *pipeDecoder) IgnoreNextStartValue() {
d.IgnoringNextStartValue = true
}
func (e *pipeEncoder) SetLenHint(lenHint int) error {
top := e.top()
if top == nil {
return e.closeLocked(errEmptyPipeStack)
}
top.LenHint = lenHint
return e.Err
}
func (e *pipeEncoder) NextEntry(done bool) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.NextEntryDone = done
e.IsBytes = false
return e.Err
}
func (e *pipeEncoder) NextField(name string) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.NextFieldName = name
return e.Err
}
func (d *pipeDecoder) NextEntry() (bool, error) {
top := d.top()
if top == nil {
return false, d.Enc.closeLocked(errEmptyPipeStack)
}
top.Index++
var done bool
if d.Enc.IsBytes {
done = top.Index >= len(d.Enc.ArgBytes)
// End handling bytes mode once done.
d.Enc.DecHandlingBytes = !done
} else {
done = d.Enc.NextEntryDone
}
d.Enc.NextEntryDone = false
return done, d.Enc.Err
}
func (d *pipeDecoder) NextField() (string, error) {
top := d.top()
if top == nil {
return "", d.Enc.closeLocked(errEmptyPipeStack)
}
top.Index++
name := d.Enc.NextFieldName
d.Enc.NextFieldName = ""
return name, d.Enc.Err
}
func (d *pipeDecoder) Type() *Type {
top := d.top()
if top == nil {
return nil
}
if d.Enc.IsBytes && d.Enc.DecHandlingBytes {
return ByteType
}
return top.Type
}
func (d *pipeDecoder) IsAny() bool {
if stackTop2 := len(d.Enc.Stack) - 2; stackTop2 >= 0 {
return d.Enc.Stack[stackTop2].nextValueIsAny()
}
return false
}
func (d *pipeDecoder) IsOptional() bool {
if top := d.top(); top != nil {
return top.IsOptional
}
return false
}
func (d *pipeDecoder) IsNil() bool {
if top := d.top(); top != nil {
return top.IsNil
}
return false
}
func (d *pipeDecoder) Index() int {
if top := d.top(); top != nil {
return top.Index
}
return -1
}
func (d *pipeDecoder) LenHint() int {
if top := d.top(); top != nil {
return top.LenHint
}
return -1
}
func (e *pipeEncoder) EncodeBool(v bool) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgBool = v
return e.Err
}
func (d *pipeDecoder) DecodeBool() (bool, error) {
return d.Enc.ArgBool, d.Enc.Err
}
func (e *pipeEncoder) EncodeString(v string) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgString = v
return e.Err
}
func (d *pipeDecoder) DecodeString() (string, error) {
return d.Enc.ArgString, d.Enc.Err
}
func (e *pipeEncoder) EncodeTypeObject(v *Type) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgType = v
return e.Err
}
func (d *pipeDecoder) DecodeTypeObject() (*Type, error) {
return d.Enc.ArgType, d.Enc.Err
}
func (e *pipeEncoder) EncodeUint(v uint64) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgUint = v
e.NumberType = numberUint
return e.Err
}
func (d *pipeDecoder) DecodeUint(bitlen int) (uint64, error) {
const errFmt = "vdl: conversion from %v into uint%d loses precision: %v"
top, tt := d.top(), d.Type()
if top == nil {
return 0, d.Enc.closeLocked(errEmptyPipeStack)
}
switch d.Enc.NumberType {
case numberUint:
x := d.Enc.ArgUint
if d.Enc.IsBytes {
x = uint64(d.Enc.ArgBytes[top.Index])
}
if shift := 64 - uint(bitlen); x != (x<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return x, d.Enc.Err
case numberInt:
x := d.Enc.ArgInt
ux := uint64(x)
if shift := 64 - uint(bitlen); x < 0 || ux != (ux<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return ux, d.Enc.Err
case numberFloat:
x := d.Enc.ArgFloat
ux := uint64(x)
if shift := 64 - uint(bitlen); x != float64(ux) || ux != (ux<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return ux, d.Enc.Err
}
return 0, d.Enc.closeLocked(fmt.Errorf("vdl: incompatible decode from %v into uint%d", tt, bitlen))
}
func (e *pipeEncoder) EncodeInt(v int64) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgInt = v
e.NumberType = numberInt
return e.Err
}
func (d *pipeDecoder) DecodeInt(bitlen int) (int64, error) {
const errFmt = "vdl: conversion from %v into int%d loses precision: %v"
top, tt := d.top(), d.Type()
if top == nil {
return 0, d.Enc.closeLocked(errEmptyPipeStack)
}
switch d.Enc.NumberType {
case numberUint:
x := d.Enc.ArgUint
if d.Enc.IsBytes {
x = uint64(d.Enc.ArgBytes[top.Index])
}
ix := int64(x)
// The shift uses 65 since the topmost bit is the sign bit. I.e. 32 bit
// numbers should be shifted by 33 rather than 32.
if shift := 65 - uint(bitlen); ix < 0 || x != (x<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return ix, d.Enc.Err
case numberInt:
x := d.Enc.ArgInt
if shift := 64 - uint(bitlen); x != (x<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return x, d.Enc.Err
case numberFloat:
x := d.Enc.ArgFloat
ix := int64(x)
if shift := 64 - uint(bitlen); x != float64(ix) || ix != (ix<<shift)>>shift {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return ix, d.Enc.Err
}
return 0, d.Enc.closeLocked(fmt.Errorf("vdl: incompatible decode from %v into int%d", tt, bitlen))
}
func (e *pipeEncoder) EncodeFloat(v float64) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.ArgFloat = v
e.NumberType = numberFloat
return e.Err
}
func (d *pipeDecoder) DecodeFloat(bitlen int) (float64, error) {
const errFmt = "vdl: conversion from %v into float%d loses precision: %v"
top, tt := d.top(), d.Type()
if top == nil {
return 0, d.Enc.closeLocked(errEmptyPipeStack)
}
switch d.Enc.NumberType {
case numberUint:
x := d.Enc.ArgUint
if d.Enc.IsBytes {
x = uint64(d.Enc.ArgBytes[top.Index])
}
var max uint64
if bitlen > 32 {
max = float64MaxInt
} else {
max = float32MaxInt
}
if x > max {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return float64(x), d.Enc.Err
case numberInt:
x := d.Enc.ArgInt
var min, max int64
if bitlen > 32 {
min, max = float64MinInt, float64MaxInt
} else {
min, max = float32MinInt, float32MaxInt
}
if x < min || x > max {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return float64(x), d.Enc.Err
case numberFloat:
x := d.Enc.ArgFloat
if bitlen <= 32 && (x < -math.MaxFloat32 || x > math.MaxFloat32) {
return 0, d.Enc.closeLocked(fmt.Errorf(errFmt, tt, bitlen, x))
}
return x, d.Enc.Err
}
return 0, d.Enc.closeLocked(fmt.Errorf("vdl: incompatible decode from %v into float%d", tt, bitlen))
}
func (e *pipeEncoder) EncodeBytes(v []byte) error {
if e.State == pipeStateDecoder {
return e.closeLocked(errEncCallDuringDecPhase)
}
e.IsBytes = true
e.NumberType = numberUint
e.ArgBytes = v
return e.Err
}
func (d *pipeDecoder) DecodeBytes(fixedLen int, b *[]byte) error {
top := d.top()
if top == nil {
return d.Enc.closeLocked(errEmptyPipeStack)
}
if !d.Enc.IsBytes {
if err := DecodeConvertedBytes(d, fixedLen, b); err != nil {
return d.Enc.closeLocked(err)
}
return nil
}
len := len(d.Enc.ArgBytes)
if fixedLen >= 0 && fixedLen != len {
return d.Enc.closeLocked(fmt.Errorf("vdl: %v got %d bytes, want fixed len %d", d.Type(), len, fixedLen))
}
if cap(*b) >= len {
*b = (*b)[:len]
} else {
*b = make([]byte, len)
}
copy(*b, d.Enc.ArgBytes)
return d.Enc.Err
}