Merge "vom/fuzz: some minor changes to the vom fuzzer"
diff --git a/.api b/.api
index 4a83cd3..27ed809 100644
--- a/.api
+++ b/.api
@@ -14,6 +14,7 @@
pkg v23, func NewEndpoint(string) (naming.Endpoint, error)
pkg v23, func NewFlowManager(*context.T, time.Duration) (flow.Manager, error)
pkg v23, func RegisterRuntimeFactory(RuntimeFactory)
+pkg v23, func TryInit() (*context.T, Shutdown, error)
pkg v23, func WithBackgroundContext(*context.T) *context.T
pkg v23, func WithListenSpec(*context.T, rpc.ListenSpec) *context.T
pkg v23, func WithNewClient(*context.T, ...rpc.ClientOpt) (*context.T, rpc.Client, error)
diff --git a/.gitignore b/.gitignore
index 5e2e933..4209dc0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1 @@
/.jiri
-#TODO(nlacasse): Get rid of .v23 below once v23->jiri transition is complete.
-/.v23
diff --git a/flow/.api b/flow/.api
index ececf65..49281c1 100644
--- a/flow/.api
+++ b/flow/.api
@@ -51,10 +51,11 @@
pkg flow, type ManagedConn interface, RemoteBlessings() security.Blessings
pkg flow, type ManagedConn interface, RemoteDischarges() map[string]security.Discharge
pkg flow, type ManagedConn interface, RemoteEndpoint() naming.Endpoint
-pkg flow, type Manager interface { Accept, Closed, Dial, Listen, ProxyListen, RoutingID, Status, StopListening }
+pkg flow, type Manager interface { Accept, Closed, Dial, DialCached, Listen, ProxyListen, RoutingID, Status, StopListening }
pkg flow, type Manager interface, Accept(*context.T) (Flow, error)
pkg flow, type Manager interface, Closed() <-chan struct{}
pkg flow, type Manager interface, Dial(*context.T, naming.Endpoint, PeerAuthorizer, time.Duration) (Flow, error)
+pkg flow, type Manager interface, DialCached(*context.T, naming.Endpoint, PeerAuthorizer, time.Duration) (Flow, error)
pkg flow, type Manager interface, Listen(*context.T, string, string) error
pkg flow, type Manager interface, ProxyListen(*context.T, string, naming.Endpoint) (<-chan struct{}, error)
pkg flow, type Manager interface, RoutingID() naming.RoutingID
diff --git a/flow/model.go b/flow/model.go
index 57300fc..75430a3 100644
--- a/flow/model.go
+++ b/flow/model.go
@@ -74,6 +74,17 @@
// closed.
Dial(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
+ // DialCached creates a Flow to the provided remote endpoint using only cached
+ // connections from previous Listen or Dial calls.
+ // If no cached connection exists, an error will be returned.
+ //
+ // 'auth' is used to determine the blessings that will be sent to the remote end.
+ //
+ // channelTimeout specifies the duration we are willing to wait before determining
+ // that connections managed by this Manager are unhealthy and should be
+ // closed.
+ DialCached(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
+
// RoutingID returns the naming.Routing of the flow.Manager.
// If the RoutingID of the manager is naming.NullRoutingID, the manager can
// only be used to Dial outgoing calls.
diff --git a/init.go b/init.go
index fd8b3a7..d913246 100644
--- a/init.go
+++ b/init.go
@@ -8,11 +8,21 @@
import "v.io/v23/context"
+// TryInit is like Init, except that it returns an error instead of panicking.
+func TryInit() (*context.T, Shutdown, error) {
+ return internalInit()
+}
+
// Init should be called once for each vanadium executable, providing
// the setup of the vanadium initial context.T and a Shutdown function
// that can be used to clean up the runtime. We allow calling Init
// multiple times (useful in tests), but only as long as you call the
// Shutdown returned previously before calling Init the second time.
+// Init panics if it encounters an error.
func Init() (*context.T, Shutdown) {
- return internalInit()
+ ctx, shutdown, err := internalInit()
+ if err != nil {
+ panic(err)
+ }
+ return ctx, shutdown
}
diff --git a/init_internal_test.go b/init_internal_test.go
index a83c1c6..fd45b9a 100644
--- a/init_internal_test.go
+++ b/init_internal_test.go
@@ -70,7 +70,8 @@
if r == nil {
t.Fatalf("recover returned nil")
}
- str := r.(string)
+ err := r.(error)
+ str := err.Error()
if !strings.Contains(str, "No RuntimeFactory has been registered") {
t.Fatalf("unexpected error: %s", str)
}
@@ -114,7 +115,8 @@
if r == nil {
t.Fatalf("recover returned nil")
}
- str := r.(string)
+ err := r.(error)
+ str := err.Error()
if !strings.Contains(str, "A runtime has already been initialized") {
t.Fatalf("unexpected error: %s", str)
}
diff --git a/init_mojo.go b/init_mojo.go
index e265169..c516674 100644
--- a/init_mojo.go
+++ b/init_mojo.go
@@ -28,6 +28,14 @@
// multiple times, but only as long as you call the Shutdown returned
// previously before calling Init the second time.
func Init(mctx application.Context) (*context.T, Shutdown) {
+ ctx, shutdown, err := TryInit(mctx)
+ if err != nil {
+ panic(err)
+ }
+ return ctx, shutdown
+}
+
+func TryInit(mctx application.Context) (*context.T, Shutdown, error) {
// mctx.Args() is a slice that contains the url of this mojo service
// followed by all arguments passed to the mojo service via the
// "--args-for" flag. Since the v23 runtime factories parse arguments
diff --git a/model.go b/model.go
index 4ae7fcd..edf9a22 100644
--- a/model.go
+++ b/model.go
@@ -444,12 +444,12 @@
return buf.String()
}
-func internalInit() (*context.T, Shutdown) {
+func internalInit() (*context.T, Shutdown, error) {
initState.mu.Lock()
runtimeFactory := initState.runtimeFactory
if initState.runtimeFactory == nil {
initState.mu.Unlock()
- panic("No RuntimeFactory has been registered nor specified. This is most" +
+ return nil, nil, fmt.Errorf("No RuntimeFactory has been registered nor specified. This is most" +
" likely because your main package has not imported a RuntimeFactory")
}
@@ -463,7 +463,7 @@
This registration is from:
%s
`
- panic(fmt.Sprintf(format, initState.runtimeStack, stack))
+ return nil, nil, fmt.Errorf(format, initState.runtimeStack, stack)
}
initState.runtimeStack = stack
initState.mu.Unlock()
@@ -480,7 +480,7 @@
if err != nil {
cancel()
rootcancel()
- panic(err)
+ return nil, nil, err
}
initState.mu.Lock()
@@ -503,8 +503,8 @@
if err := rt.Init(ctx); err != nil {
vshutdown()
- panic(err)
+ return nil, nil, err
}
- return ctx, vshutdown
+ return ctx, vshutdown, nil
}
diff --git a/options/.api b/options/.api
index 05d88ab..c2500d8 100644
--- a/options/.api
+++ b/options/.api
@@ -1,5 +1,6 @@
pkg options, method (ChannelTimeout) RPCCallOpt()
pkg options, method (ChannelTimeout) RPCServerOpt()
+pkg options, method (ConnectionTimeout) RPCCallOpt()
pkg options, method (IsLeaf) RPCServerOpt()
pkg options, method (LameDuckTimeout) RPCServerOpt()
pkg options, method (NameResolutionAuthorizer) NSOpt()
@@ -8,11 +9,11 @@
pkg options, method (NoRetry) RPCCallOpt()
pkg options, method (Preresolved) NSOpt()
pkg options, method (Preresolved) RPCCallOpt()
-pkg options, method (RetryTimeout) RPCCallOpt()
pkg options, method (ServerAuthorizer) RPCCallOpt()
pkg options, method (ServerPeers) RPCServerOpt()
pkg options, method (ServesMountTable) RPCServerOpt()
pkg options, type ChannelTimeout time.Duration
+pkg options, type ConnectionTimeout time.Duration
pkg options, type IsLeaf bool
pkg options, type LameDuckTimeout time.Duration
pkg options, type NameResolutionAuthorizer struct
@@ -20,7 +21,6 @@
pkg options, type NoRetry struct
pkg options, type Preresolved struct
pkg options, type Preresolved struct, Resolution *naming.MountEntry
-pkg options, type RetryTimeout time.Duration
pkg options, type ServerAuthorizer struct
pkg options, type ServerAuthorizer struct, embedded security.Authorizer
pkg options, type ServerPeers []security.BlessingPattern
diff --git a/options/options.go b/options/options.go
index be8219d..3f2e346 100644
--- a/options/options.go
+++ b/options/options.go
@@ -97,12 +97,6 @@
func (NameResolutionAuthorizer) RPCCallOpt() {}
func (NameResolutionAuthorizer) NSOpt() {}
-// RetryTimeout is the duration during which we will retry starting
-// an RPC call. Zero means don't retry.
-type RetryTimeout time.Duration
-
-func (RetryTimeout) RPCCallOpt() {}
-
// Preresolved specifies that the RPC call should not further Resolve the name.
// If a MountEntry is provided, use it. Otherwise use the name passed in the
// RPC call. If the name is relative, it will be made global using
@@ -144,3 +138,10 @@
func (ChannelTimeout) RPCCallOpt() {}
func (ChannelTimeout) RPCServerOpt() {}
+
+// ConnectionTimeout is the amount of time we will try establishing a connection
+// to the remote end during an RPC. Zero means only use cached connections and
+// do not attempt to retry if no connection exists in the cache.
+type ConnectionTimeout time.Duration
+
+func (ConnectionTimeout) RPCCallOpt() {}
diff --git a/rpc/reserved/reserved.go b/rpc/reserved/reserved.go
index 1dd8615..7138e15 100644
--- a/rpc/reserved/reserved.go
+++ b/rpc/reserved/reserved.go
@@ -14,8 +14,7 @@
)
// Signature invokes the reserved signature RPC on the given name, and returns
-// the results. The client will be used to invoke the RPC - if it is nil, the
-// default client from the runtime is used.
+// the results.
func Signature(ctx *context.T, name string, opts ...rpc.CallOpt) ([]signature.Interface, error) {
var sig []signature.Interface
res := []interface{}{&sig}
@@ -26,8 +25,7 @@
}
// MethodSignature invokes the reserved method signature RPC on the given name,
-// and returns the results. The client will be used to invoke the RPC - if it
-// is nil, the default client from the runtime is used.
+// and returns the results.
func MethodSignature(ctx *context.T, name, method string, opts ...rpc.CallOpt) (signature.Method, error) {
args := []interface{}{method}
var sig signature.Method
diff --git a/security/.api b/security/.api
index df579dd..d579bed 100644
--- a/security/.api
+++ b/security/.api
@@ -88,7 +88,7 @@
pkg security, type BlessingStore interface, CacheDischarge(Discharge, Caveat, DischargeImpetus)
pkg security, type BlessingStore interface, ClearDischarges(...Discharge)
pkg security, type BlessingStore interface, DebugString() string
-pkg security, type BlessingStore interface, Default() Blessings
+pkg security, type BlessingStore interface, Default() (Blessings, <-chan struct{})
pkg security, type BlessingStore interface, Discharge(Caveat, DischargeImpetus) (Discharge, time.Time)
pkg security, type BlessingStore interface, ForPeer(...string) Blessings
pkg security, type BlessingStore interface, PeerBlessings() map[BlessingPattern]Blessings
diff --git a/security/blessings.go b/security/blessings.go
index 9f95ead..a6b6732 100644
--- a/security/blessings.go
+++ b/security/blessings.go
@@ -356,7 +356,8 @@
// DefaultBlessingPatterns returns the BlessingsPatterns of the Default Blessings
// of the provided Principal.
func DefaultBlessingPatterns(p Principal) (patterns []BlessingPattern) {
- for _, b := range BlessingNames(p, p.BlessingStore().Default()) {
+ blessings, _ := p.BlessingStore().Default()
+ for _, b := range BlessingNames(p, blessings) {
patterns = append(patterns, BlessingPattern(b))
}
return
diff --git a/security/model.go b/security/model.go
index e1c464d..2b76cea 100644
--- a/security/model.go
+++ b/security/model.go
@@ -208,18 +208,19 @@
// hosts blessings.
SetDefault(blessings Blessings) error
- // Default returns the blessings to be shared with peers for which
- // no other information is available in order to select blessings
- // from the store.
+ // Default returns the blessings to be shared with peers for which no
+ // other information is available in order to select blessings from the
+ // store.
//
// For example, Default can be used by servers to identify themselves
// to clients before the client has identified itself.
//
// Default returns the blessings provided to the last call to
- // SetDefault.
+ // SetDefault and a channel which will be closed when the
+ // Default changes (i.e., SetDefault is called again).
//
// Returns the zero value if there is no usable blessings.
- Default() Blessings
+ Default() (Blessings, <-chan struct{})
// PublicKey returns the public key of the Principal for which
// this store hosts blessings.
diff --git a/security/principal.go b/security/principal.go
index b435cf3..e73889c 100644
--- a/security/principal.go
+++ b/security/principal.go
@@ -69,7 +69,7 @@
}
func (errStore) ForPeer(peerBlessings ...string) Blessings { return Blessings{} }
func (errStore) SetDefault(blessings Blessings) error { return verror.New(errNilStore, nil) }
-func (errStore) Default() Blessings { return Blessings{} }
+func (errStore) Default() (Blessings, <-chan struct{}) { return Blessings{}, nil }
func (errStore) PeerBlessings() map[BlessingPattern]Blessings { return nil }
func (errStore) CacheDischarge(Discharge, Caveat, DischargeImpetus) { return }
func (errStore) ClearDischarges(...Discharge) { return }
diff --git a/security/principal_blessings_test.go b/security/principal_blessings_test.go
index 769fa23..3c9e31a 100644
--- a/security/principal_blessings_test.go
+++ b/security/principal_blessings_test.go
@@ -314,7 +314,7 @@
if got := s.ForPeer(); !got.IsZero() {
t.Errorf("BlessingStore.ForPeer: got %v want empty", got)
}
- if got := s.Default(); !got.IsZero() {
+ if got, _ := s.Default(); !got.IsZero() {
t.Errorf("BlessingStore.Default: got %v want empty", got)
}
if got, want := s.PublicKey(), p.PublicKey(); !reflect.DeepEqual(got, want) {
diff --git a/syncbase/featuretests/client_v23_test.go b/syncbase/featuretests/client_v23_test.go
index 8793bcc..e38300b 100644
--- a/syncbase/featuretests/client_v23_test.go
+++ b/syncbase/featuretests/client_v23_test.go
@@ -6,8 +6,12 @@
import (
"testing"
+ "time"
+ "v.io/v23/context"
"v.io/v23/syncbase"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/vom"
"v.io/x/ref/test/v23test"
)
@@ -19,7 +23,9 @@
// Start syncbased.
serverCreds := sh.ForkCredentials("server")
- sh.StartSyncbase(serverCreds, testSbName, "", `{"Read": {"In":["root:server", "root:client"]}, "Write": {"In":["root:server", "root:client"]}}`)
+ // TODO(aghassemi): Resolve permission is currently needed for Watch.
+ // See https://github.com/vanadium/issues/issues/1110
+ sh.StartSyncbase(serverCreds, testSbName, "", `{"Resolve": {"In":["root:server", "root:client"]}, "Read": {"In":["root:server", "root:client"]}, "Write": {"In":["root:server", "root:client"]}}`)
// Create app, database and table.
// TODO(ivanpi): Use setupAppA.
@@ -36,17 +42,52 @@
if err := tb.Create(ctx, nil); err != nil {
t.Fatalf("unable to create a table: %v", err)
}
+ marker, err := d.GetResumeMarker(ctx)
+ if err != nil {
+ t.Fatalf("unable to get the resume marker: %v", err)
+ }
// Do a Put followed by a Get.
- r := tb.Row("r")
- if err := r.Put(ctx, "testkey"); err != nil {
+ r := tb.Row("testkey")
+ if err := r.Put(ctx, "testvalue"); err != nil {
t.Fatalf("r.Put() failed: %v", err)
}
var result string
if err := r.Get(ctx, &result); err != nil {
t.Fatalf("r.Get() failed: %v", err)
}
- if got, want := result, "testkey"; got != want {
+ if got, want := result, "testvalue"; got != want {
t.Fatalf("unexpected value: got %q, want %q", got, want)
}
+
+ // Do a watch from the resume marker before the put operation.
+ ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ stream, err := d.Watch(ctxWithTimeout, "tb", "", marker)
+ if err != nil {
+ t.Fatalf("unable to start a watch %v", err)
+ }
+ if !stream.Advance() {
+ t.Fatalf("watch stream unexpectedly reached the end: %v", stream.Err())
+ }
+ change := stream.Change()
+ if got, want := change.Table, "tb"; got != want {
+ t.Fatalf("unexpected watch table: got %q, want %q", got, want)
+ }
+ if got, want := change.Row, "testkey"; got != want {
+ t.Fatalf("unexpected watch row: got %q, want %q", got, want)
+ }
+ if got, want := change.ChangeType, nosql.PutChange; got != want {
+ t.Fatalf("unexpected watch change type: got %q, want %q", got, want)
+ }
+ if got, want := change.FromSync, false; got != want {
+ t.Fatalf("unexpected FromSync value: got %t, want %t", got, want)
+ }
+ if err := vom.Decode(change.ValueBytes, &result); err != nil {
+ t.Fatalf("couldn't decode watch value: %v", err)
+ }
+ if got, want := result, "testvalue"; got != want {
+ t.Fatalf("unexpected watch value: got %q, want %q", got, want)
+ }
+
}
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index 05dfd15..9d260db 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -570,7 +570,6 @@
}
return &nosql.Schema{
Metadata: metadata,
- Upgrader: nil,
Resolver: resolver,
}
}
diff --git a/syncbase/featuretests/ping_pong_test.go b/syncbase/featuretests/ping_pong_test.go
new file mode 100644
index 0000000..0473d27
--- /dev/null
+++ b/syncbase/featuretests/ping_pong_test.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package featuretests_test
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "v.io/v23/naming"
+ "v.io/v23/services/watch"
+ "v.io/v23/syncbase"
+ "v.io/v23/syncbase/nosql"
+ constants "v.io/x/ref/services/syncbase/server/util"
+ "v.io/x/ref/test/v23test"
+)
+
+// BenchmarkPingPongPair measures the round trip sync latency between a pair of
+// Syncbase instances that Ping Pong data to each other over a syncgroup.
+//
+// This benchmark performs the following operations:
+// - Create two syncbase instances and have them join the same syncgroup.
+// - Each watches the other syncbase's "section" of the table.
+// - A preliminary write by each syncbase ensures that clocks are synced.
+// - During Ping Pong, each syncbase instance finds its local dev time and
+// writes it to its section of the table. The other syncbase watches for
+// this value; once received, it does the same.
+//
+// After the benchmark completes, the "ns/op" value refers to the average time
+// per Ping Pong roundtrip completed by the two syncbases.
+//
+// Note: This benchmark can write simpler values (like int32) or use time.Now()
+// instead of using a DevModeGetTime RPC. This can affect the benchmark stats.
+func BenchmarkPingPongPair(b *testing.B) {
+ sh := v23test.NewShell(b, v23test.Opts{})
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 2 Syncbases with the dev mode clock on.
+ sbs := setupSyncbases(b, sh, 2, "--dev")
+
+ // Syncbase s0 is the creator.
+ sgName := naming.Join(sbs[0].sbName, constants.SyncbaseSuffix, "SG1")
+
+ // TODO(alexfandrianto): Was unable to use the empty prefix ("tb:").
+ // Observation: w0's watch isn't working with the empty prefix.
+ // Possible Explanation: The empty prefix ACL receives an initial value from
+ // the Table ACL. If this value is synced over from the opposing peer,
+ // conflict resolution can mean that s0 loses the ability to watch.
+ syncString := fmt.Sprintf("%s:p", testTable)
+ ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgName, syncString, "", sbBlessings(sbs), nil))
+
+ // Syncbase s1 will attempt to join the syncgroup.
+ ok(b, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sgName))
+
+ // Obtain the handles to the databases.
+ db0, _ := getDbAndTable(sbs[0].sbName)
+ db1, _ := getDbAndTable(sbs[1].sbName)
+
+ // Set up the watch streams (watching the other syncbase's prefix).
+ prefix0, prefix1 := "prefix0", "prefix1"
+ w0, err := db0.Watch(sbs[0].clientCtx, testTable, prefix1, watch.ResumeMarker("now"))
+ ok(b, err)
+ w1, err := db1.Watch(sbs[1].clientCtx, testTable, prefix0, watch.ResumeMarker("now"))
+ ok(b, err)
+
+ // The join has succeeded, so it's time to ensure clocks are synchronized.
+ // The strategy is: s0 sends to s1, and then s1 responds.
+ sendTimeSync(b, sbs[0], prefix0, w1)
+ sendTimeSync(b, sbs[1], prefix1, w0)
+
+ // The clocks are synchronized, so it is now time to really start our watch
+ // streams. This will allow us to ping and pong.
+ c0, c1 := make(chan time.Time), make(chan time.Time)
+ go watchTimes(b, w0, c0)
+ go watchTimes(b, w1, c1)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ suffix := fmt.Sprintf("%d", i)
+ ok(b, writeTime(sbs[0], prefix0, suffix))
+ <-c1
+ ok(b, writeTime(sbs[1], prefix1, suffix))
+ <-c0
+ }
+
+ // TODO(alexfandrianto): Should cancel these watch streams. Unfortunately, we
+ // cannot cancel while a watch stream's Advance() blocks.
+ //w0.Cancel()
+ //w1.Cancel()
+}
+
+// sendTimeSync sends data from 1 syncbase to another.
+// Be sure that the receiving watch stream sees the data and is still ok.
+func sendTimeSync(b *testing.B, ts *testSyncbase, senderPrefix string, w nosql.WatchStream) {
+ ok(b, writeTime(ts, senderPrefix, "synctime"))
+ if w.Advance() {
+ w.Change() // grab the change, but ignore the value.
+ }
+ watchStreamOk(b, w)
+}
+
+// watchTimes sends the value of each put through to the channel.
+func watchTimes(b *testing.B, w nosql.WatchStream, c chan time.Time) {
+ for w.Advance() {
+ var t time.Time
+ change := w.Change()
+ if change.ChangeType == nosql.DeleteChange {
+ b.Error("Received a delete change")
+ }
+ err := change.Value(&t)
+ if err != nil {
+ b.Error(err)
+ }
+ c <- t
+ watchStreamOk(b, w)
+ }
+}
+
+// getDbAndTable obtains the database and table handles for a syncbase name.
+func getDbAndTable(syncbaseName string) (d nosql.Database, tb nosql.Table) {
+ a := syncbase.NewService(syncbaseName).App(testApp)
+ d = a.NoSQLDatabase(testDb, nil)
+ tb = d.Table(testTable)
+ return
+}
+
+// writeTime writes a timestamp into syncbase at the combination of keyPrefix
+// and keySuffix.
+func writeTime(ts *testSyncbase, keyPrefix, keySuffix string) error {
+ ctx := ts.clientCtx
+ syncbaseName := ts.sbName
+ _, tb := getDbAndTable(syncbaseName)
+
+ key := fmt.Sprintf("%s/%s", keyPrefix, keySuffix)
+ // sc is a helper to obtain the syncbase wire service.
+ time, err := sc(syncbaseName).DevModeGetTime(ctx)
+ if err != nil {
+ return fmt.Errorf("sb.DevModeGetTime() failed: %v", err)
+ }
+
+ if err2 := tb.Put(ctx, key, time); err2 != nil {
+ return fmt.Errorf("tb.Put() failed: %v", err2)
+ }
+ return nil
+}
+
+// watchStreamOk emits an error if the watch stream has an error.
+func watchStreamOk(b *testing.B, w nosql.WatchStream) {
+ if w.Err() != nil {
+ b.Errorf("stream error: %v", w.Err())
+ }
+}
diff --git a/syncbase/featuretests/test_util_test.go b/syncbase/featuretests/test_util_test.go
index 180f0b2..8306421 100644
--- a/syncbase/featuretests/test_util_test.go
+++ b/syncbase/featuretests/test_util_test.go
@@ -15,6 +15,7 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/security/access"
+ wire_syncbase "v.io/v23/services/syncbase"
wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/syncbase"
"v.io/v23/syncbase/nosql"
@@ -52,7 +53,7 @@
}
// Spawns "num" Syncbase instances and returns handles to them.
-func setupSyncbases(t *testing.T, sh *v23test.Shell, num int, args ...string) []*testSyncbase {
+func setupSyncbases(t testing.TB, sh *v23test.Shell, num int, args ...string) []*testSyncbase {
sbs := make([]*testSyncbase, num)
for i, _ := range sbs {
sbName, clientId := fmt.Sprintf("s%d", i), fmt.Sprintf("c%d", i)
@@ -62,8 +63,8 @@
clientId: clientId,
clientCtx: sh.ForkContext(clientId),
}
- // Give RWA permissions to this Syncbase's client.
- acl := fmt.Sprintf(`{"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId)
+ // Give XRWA permissions to this Syncbase's client.
+ acl := fmt.Sprintf(`{"Resolve":{"In":["root:%s"]},"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId, clientId)
sh.StartSyncbase(sbs[i].sbCreds, sbs[i].sbName, "", acl, args...)
}
// Call setupHierarchy on each Syncbase.
@@ -327,27 +328,35 @@
}
////////////////////////////////////////////////////////////
+// Helpers to interact with the Syncbase service directly.
+
+// Obtain the Syncbase client stub.
+func sc(name string) wire_syncbase.ServiceClientStub {
+ return wire_syncbase.ServiceClient(name)
+}
+
+////////////////////////////////////////////////////////////
// Generic testing helpers
-func ok(t *testing.T, err error) {
+func ok(t testing.TB, err error) {
if err != nil {
tu.Fatal(t, err)
}
}
-func nok(t *testing.T, err error) {
+func nok(t testing.TB, err error) {
if err == nil {
tu.Fatal(t, "nil err")
}
}
-func eq(t *testing.T, got, want interface{}) {
+func eq(t testing.TB, got, want interface{}) {
if !reflect.DeepEqual(got, want) {
tu.Fatalf(t, "got %v, want %v", got, want)
}
}
-func neq(t *testing.T, got, notWant interface{}) {
+func neq(t testing.TB, got, notWant interface{}) {
if reflect.DeepEqual(got, notWant) {
tu.Fatalf(t, "got %v", got)
}
diff --git a/syncbase/featuretests/vclock_v23_test.go b/syncbase/featuretests/vclock_v23_test.go
index d77559c..a6be515 100644
--- a/syncbase/featuretests/vclock_v23_test.go
+++ b/syncbase/featuretests/vclock_v23_test.go
@@ -388,10 +388,6 @@
////////////////////////////////////////////////////////////////////////////////
// Helper functions
-func sc(name string) wire.ServiceClientStub {
- return wire.ServiceClient(name)
-}
-
// Creates a "chain" of syncgroups, where each adjacent pair of Syncbases {A,B}
// share a syncgroup with key prefix "AB".
func setupChain(t *testing.T, sbs []*testSyncbase) {
diff --git a/syncbase/nosql/.api b/syncbase/nosql/.api
index 8c2d318..72b9183 100644
--- a/syncbase/nosql/.api
+++ b/syncbase/nosql/.api
@@ -1,7 +1,6 @@
pkg nosql, const DeleteChange ChangeType
pkg nosql, const PutChange ChangeType
pkg nosql, func NewDatabase(string, string, *Schema) *database
-pkg nosql, func NewSchema(wire.SchemaMetadata, SchemaUpgrader) *Schema
pkg nosql, func NewValue(*context.T, interface{}) (*Value, error)
pkg nosql, func Prefix(string) PrefixRange
pkg nosql, func Range(string, string) RowRange
@@ -135,9 +134,6 @@
pkg nosql, type Schema struct
pkg nosql, type Schema struct, Metadata wire.SchemaMetadata
pkg nosql, type Schema struct, Resolver ConflictResolver
-pkg nosql, type Schema struct, Upgrader SchemaUpgrader
-pkg nosql, type SchemaUpgrader interface { Run }
-pkg nosql, type SchemaUpgrader interface, Run(Database, int32, int32) error
pkg nosql, type Stream interface { Advance, Cancel, Err }
pkg nosql, type Stream interface, Advance() bool
pkg nosql, type Stream interface, Cancel()
diff --git a/syncbase/nosql/benchmark_test.go b/syncbase/nosql/benchmark_test.go
index a8d663a..c0bc6a9 100644
--- a/syncbase/nosql/benchmark_test.go
+++ b/syncbase/nosql/benchmark_test.go
@@ -187,6 +187,7 @@
// Measures how long it takes to put and get notified about 'numRows' rows.
func runWatchPutsBenchmark(b *testing.B, value interface{}) {
+ b.Skip("Hangs on occasion, for unknown reasons - v.io/i/1134")
ctx, d, tb, cleanup := prepare(b)
defer cleanup()
b.ResetTimer()
@@ -215,6 +216,7 @@
// Measures how long it takes to put and get notified about a single value.
func runWatchOnePutBenchmark(b *testing.B, value interface{}) {
+ b.Skip("Hangs on occasion, for unknown reasons - v.io/i/1134")
ctx, d, tb, cleanup := prepare(b)
defer cleanup()
w, err := d.Watch(ctx, "tb", "", watch.ResumeMarker("now"))
diff --git a/syncbase/nosql/cr_connection_test.go b/syncbase/nosql/cr_connection_test.go
index 774fff2..27acb3a 100644
--- a/syncbase/nosql/cr_connection_test.go
+++ b/syncbase/nosql/cr_connection_test.go
@@ -127,7 +127,6 @@
func getSchema(cr ConflictResolver) *Schema {
return &Schema{
Metadata: wire.SchemaMetadata{},
- Upgrader: nil,
Resolver: cr,
}
}
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index 9deef44..d4b26b7 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -211,7 +211,7 @@
func (d *database) EnforceSchema(ctx *context.T) error {
var schema *Schema = d.schema
if schema == nil {
- return verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+ return verror.New(verror.ErrBadState, ctx, "EnforceSchema cannot be used since a nil *Schema was provided at Database handle creation time.")
}
if schema.Metadata.Version < 0 {
@@ -222,7 +222,7 @@
return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
}
- if _, err := d.upgradeIfOutdated(ctx); err != nil {
+ if _, err := d.updateSchemaMetadata(ctx); err != nil {
return err
}
@@ -243,7 +243,10 @@
d.crState.disconnect()
}
-func (d *database) upgradeIfOutdated(ctx *context.T) (bool, error) {
+// updateSchemaMetadata reads the current SchemaMetadata from db and checks
+// if the SchemaMetadata provided by the app is newer. If so, it updates the
+// db with the new SchemaMetadata.
+func (d *database) updateSchemaMetadata(ctx *context.T) (bool, error) {
var schema *Schema = d.schema
schemaMgr := d.getSchemaManager()
currMeta, err := schemaMgr.getSchemaMetadata(ctx)
@@ -267,21 +270,9 @@
if currMeta.Version >= schema.Metadata.Version {
return false, nil
}
- // Call the Upgrader provided by the app to upgrade the schema.
- //
- // TODO(jlodhia): disable sync before running Upgrader and reenable
- // once Upgrader is finished.
- //
- // TODO(jlodhia): prevent other processes (local/remote) from accessing
- // the database while upgrade is in progress.
- upgradeErr := schema.Upgrader.Run(d, currMeta.Version, schema.Metadata.Version)
- if upgradeErr != nil {
- vlog.Error(upgradeErr)
- return false, upgradeErr
- }
+
// Update the schema metadata in db to the latest version.
- metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
- if metadataErr != nil {
+ if metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata); metadataErr != nil {
vlog.Error(metadataErr)
return false, metadataErr
}
diff --git a/syncbase/nosql/model.go b/syncbase/nosql/model.go
index 0c475af..4d76141 100644
--- a/syncbase/nosql/model.go
+++ b/syncbase/nosql/model.go
@@ -140,13 +140,11 @@
Blob(br wire.BlobRef) Blob
// EnforceSchema compares the current schema version of the database
- // with the schema version provided while creating this database handle. If
- // the current database schema version is lower, then the SchemaUpdater is
- // called. If SchemaUpdater is successful this method stores the new schema
- // metadata in database.
+ // with the schema version provided while creating this database handle and
+ // updates the schema metadata if required.
// This method also registers a conflict resolver with syncbase to receive
- // conflicts. Note: schema can be nil, in which case this method skips
- // schema check and the caller is responsible for maintaining schema sanity.
+ // conflicts. Note: schema can be nil, in which case this method should not
+ // be called and the caller is responsible for maintaining schema sanity.
EnforceSchema(ctx *context.T) error
}
@@ -558,27 +556,18 @@
Cancel()
}
-// SchemaUpgrader interface must be implemented by the App in order to upgrade
-// the database schema from a lower version to a higher version.
-type SchemaUpgrader interface {
- // Takes an instance of database and upgrades data from old
- // schema to new schema. This method must be idempotent.
- Run(db Database, oldVersion, newVersion int32) error
-}
-
// Each database has a Schema associated with it which defines the current
// version of the database. When a new version of app wishes to change
// its data in a way that it is not compatible with the old app's data,
-// the app must change the schema version and provide relevant upgrade logic
-// in the Upgrader. The conflict resolution rules are also associated with the
+// the app must change the schema version and perform relevant upgrade logic.
+// The conflict resolution rules are also associated with the
// schema version. Hence if the conflict resolution rules change then the schema
// version also must be bumped.
//
-// Schema provides metadata and a SchemaUpgrader for a given database.
-// SchemaUpgrader is purely local and not persisted.
+// Schema provides metadata and a ConflictResolver for a given database.
+// ConflictResolver is purely local and not persisted.
type Schema struct {
Metadata wire.SchemaMetadata
- Upgrader SchemaUpgrader
Resolver ConflictResolver
}
diff --git a/syncbase/nosql/schema.go b/syncbase/nosql/schema.go
index 1e82324..10f54d8 100644
--- a/syncbase/nosql/schema.go
+++ b/syncbase/nosql/schema.go
@@ -9,14 +9,6 @@
wire "v.io/v23/services/syncbase/nosql"
)
-// NewSchema creates a new Schema object.
-func NewSchema(metadata wire.SchemaMetadata, upgrader SchemaUpgrader) *Schema {
- return &Schema{
- Metadata: metadata,
- Upgrader: upgrader,
- }
-}
-
//////////////////////////////////////////
// Implementation of SchemaManager (Not part of public client API)
diff --git a/syncbase/nosql/schema_test.go b/syncbase/nosql/schema_test.go
index 1d0beb8..740a5f8 100644
--- a/syncbase/nosql/schema_test.go
+++ b/syncbase/nosql/schema_test.go
@@ -19,29 +19,20 @@
// This test as following steps:
// 1) Call NoSQLDatabase() for a non existent db.
// 2) Create the database, and verify if Schema got stored properly.
-// 3) Call EnforceSchema() to make sure that the method is no-op and is
-// able to read the schema from db.
-// 4) Call NoSQLDatabase() on the same db to create a new handle with an
-// upgraded schema, call EnforceSchema() and check if SchemaUpgrader
-// is called and if the new schema is stored appropriately.
+// 3) Call NoSQLDatabase() on the same db to create a new handle with new
+// schema metadata, call EnforceSchema() and check if the new metadata was
+// stored appropriately.
func TestSchemaCheck(t *testing.T) {
ctx, sName, cleanup := tu.SetupOrDie(nil)
defer cleanup()
a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
schema := tu.DefaultSchema(0)
- mockUpgrader := schema.Upgrader.(*tu.MockSchemaUpgrader)
db1 := a.NoSQLDatabase("db1", schema)
- // Verify that calling Upgrade on a non existing database does not throw
- // errors.
- err := db1.EnforceSchema(ctx)
- if err != nil {
+ if err := db1.EnforceSchema(ctx); err != nil {
t.Fatalf("db1.EnforceSchema() failed: %v", err)
}
- if mockUpgrader.CallCount > 0 {
- t.Fatal("Call to upgrader was not expected.")
- }
// Create db1, this step also stores the schema provided above
if err := db1.Create(ctx, nil); err != nil {
@@ -52,14 +43,6 @@
t.Fatalf("Failed to lookup schema after create: %v", err)
}
- // Make redundant call to Upgrade to verify that it is a no-op
- if err := db1.EnforceSchema(ctx); err != nil {
- t.Fatalf("db1.EnforceSchema() failed: %v", err)
- }
- if mockUpgrader.CallCount > 0 {
- t.Fatal("Call to upgrader was not expected.")
- }
-
// try to make a new database object for the same database but this time
// with a new schema version
schema.Metadata.Version = 1
@@ -72,14 +55,11 @@
if err := otherdb1.EnforceSchema(ctx); err != nil {
t.Fatalf("otherdb1.EnforceSchema() failed: %v", err)
}
- if mockUpgrader.CallCount != 1 {
- t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
- }
// check if the contents of SchemaMetadata are correctly stored in the db.
- metadata, err3 := getSchemaMetadata(ctx, otherdb1.FullName())
- if err3 != nil {
- t.Fatalf("GetSchemaMetadata failed: %v", err3)
+ metadata, err := getSchemaMetadata(ctx, otherdb1.FullName())
+ if err != nil {
+ t.Fatalf("GetSchemaMetadata failed: %v", err)
}
if metadata.Version != 1 {
t.Fatalf("Unexpected version number: %d", metadata.Version)
diff --git a/syncbase/nosql/syncgroup_v23_test.go b/syncbase/nosql/syncgroup_v23_test.go
index 459e394..65fa6dd 100644
--- a/syncbase/nosql/syncgroup_v23_test.go
+++ b/syncbase/nosql/syncgroup_v23_test.go
@@ -10,6 +10,7 @@
"os"
"reflect"
"runtime/debug"
+ "sort"
"strings"
"testing"
"time"
@@ -18,9 +19,11 @@
"v.io/v23/context"
"v.io/v23/naming"
wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/services/watch"
"v.io/v23/syncbase"
"v.io/v23/syncbase/nosql"
"v.io/v23/verror"
+ "v.io/v23/vom"
_ "v.io/x/ref/runtime/factories/generic"
constants "v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/test/v23test"
@@ -83,8 +86,10 @@
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
+ // TODO(aghassemi): Resolve permission is currently needed for Watch.
+ // See https://github.com/vanadium/issues/issues/1110
sh.StartSyncbase(server1Creds, "sync1", "",
- `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
+ `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
@@ -94,8 +99,11 @@
ok(t, runPopulateNonVomData(client0Ctx, "sync0", "bar", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
+ beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
+ ok(t, err)
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
+ ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
ok(t, runVerifySyncgroupNonVomData(client1Ctx, "sync1", "bar", 0, 10))
}
@@ -120,8 +128,10 @@
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
+ // TODO(aghassemi): Resolve permission is currently needed for Watch.
+ // See https://github.com/vanadium/issues/issues/1110
sh.StartSyncbase(server1Creds, "sync1", "",
- `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
+ `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
@@ -130,12 +140,18 @@
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
+ beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
+ ok(t, err)
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
+ ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
+ beforeDeleteMarker, err := getResumeMarker(client1Ctx, "sync1")
+ ok(t, err)
ok(t, runDeleteData(client0Ctx, "sync0", 0))
ok(t, runVerifyDeletedData(client0Ctx, "sync0", "foo"))
ok(t, runVerifyDeletedData(client1Ctx, "sync1", "foo"))
+ ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 5, true, beforeDeleteMarker))
ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "bar", 0, 10, false))
@@ -607,7 +623,6 @@
func runJoinSyncgroup(ctx *context.T, serviceName, sgName string) error {
a := syncbase.NewService(serviceName).App("a")
d := a.NoSQLDatabase("d", nil)
-
sg := d.Syncgroup(sgName)
info := wire.SyncgroupMemberInfo{SyncPriority: 10}
if _, err := sg.Join(ctx, info); err != nil {
@@ -831,6 +846,64 @@
return nil
}
+func runVerifySyncgroupDataWithWatch(ctx *context.T, serviceName, keyPrefix string, count int, expectDelete bool, beforeSyncMarker watch.ResumeMarker) error {
+ if count == 0 {
+ return fmt.Errorf("count cannot be 0: got %d", count)
+ }
+ a := syncbase.NewService(serviceName).App("a")
+ d := a.NoSQLDatabase("d", nil)
+ ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+
+ stream, err := d.Watch(ctxWithTimeout, testTable, keyPrefix, beforeSyncMarker)
+ if err != nil {
+ return fmt.Errorf("watch error: %v\n", err)
+ }
+
+ var changes []nosql.WatchChange
+ for i := 0; stream.Advance() && i < count; i++ {
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("watch stream error: %v\n", err)
+ }
+ changes = append(changes, stream.Change())
+ }
+
+ sort.Sort(ByRow(changes))
+
+ if got, want := len(changes), count; got != want {
+ return fmt.Errorf("unexpected number of changes: got %d, want %d", got, want)
+ }
+
+ for i, change := range changes {
+ if got, want := change.Table, "tb"; got != want {
+ return fmt.Errorf("unexpected watch table: got %q, want %q", got, want)
+ }
+ if got, want := change.Row, fmt.Sprintf("%s%d", keyPrefix, i); got != want {
+ return fmt.Errorf("unexpected watch row: got %q, want %q", got, want)
+ }
+ if got, want := change.FromSync, true; got != want {
+ return fmt.Errorf("unexpected FromSync value: got %t, want %t", got, want)
+ }
+ if expectDelete {
+ if got, want := change.ChangeType, nosql.DeleteChange; got != want {
+ return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+ }
+ return nil
+ }
+ var result string
+ if got, want := change.ChangeType, nosql.PutChange; got != want {
+ return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+ }
+ if err := vom.Decode(change.ValueBytes, &result); err != nil {
+ return fmt.Errorf("couldn't decode watch value: %v", err)
+ }
+ if got, want := result, fmt.Sprintf("testkey%s%d", keyPrefix, i); got != want {
+ return fmt.Errorf("unexpected watch value: got %q, want %q", got, want)
+ }
+ }
+ return nil
+}
+
func runVerifySyncgroupNonVomData(ctx *context.T, serviceName, keyPrefix string, start, count uint64) error {
a := syncbase.NewService(serviceName).App("a")
d := a.NoSQLDatabase("d", nil)
@@ -1232,6 +1305,12 @@
return nil
}
+func getResumeMarker(ctx *context.T, serviceName string) (watch.ResumeMarker, error) {
+ a := syncbase.NewService(serviceName).App("a")
+ d := a.NoSQLDatabase("d", nil)
+ return d.GetResumeMarker(ctx)
+}
+
func ok(t *testing.T, err error) {
if err != nil {
debug.PrintStack()
@@ -1242,3 +1321,10 @@
func TestMain(m *testing.M) {
v23test.TestMain(m)
}
+
+// ByRow implements sort.Interface for []nosql.WatchChange based on the Row field.
+type ByRow []nosql.WatchChange
+
+func (c ByRow) Len() int { return len(c) }
+func (c ByRow) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
+func (c ByRow) Less(i, j int) bool { return c[i].Row < c[j].Row }
diff --git a/vdl/.api b/vdl/.api
index bdddd3e..202f666 100644
--- a/vdl/.api
+++ b/vdl/.api
@@ -243,6 +243,7 @@
pkg vdl, var Int32Type *Type
pkg vdl, var Int64Type *Type
pkg vdl, var Int8Type *Type
+pkg vdl, var RawBytesTargetFunc func(rv reflect.Value) Target
pkg vdl, var StringType *Type
pkg vdl, var TypeObjectType *Type
pkg vdl, var Uint16Type *Type
diff --git a/vdl/convert.go b/vdl/convert.go
index 858dc68..3da523c 100644
--- a/vdl/convert.go
+++ b/vdl/convert.go
@@ -533,6 +533,11 @@
// FromNil implements the Target interface method.
func (c convTarget) FromNil(tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromNil(tt)
+ }
+ }
if !compatible(c.tt, tt) {
return fmt.Errorf("types %q and %q aren't compatible", c.tt, tt)
}
@@ -581,6 +586,11 @@
// FromBool implements the Target interface method.
func (c convTarget) FromBool(src bool, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromBool(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -593,6 +603,11 @@
// FromUint implements the Target interface method.
func (c convTarget) FromUint(src uint64, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromUint(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -605,6 +620,11 @@
// FromInt implements the Target interface method.
func (c convTarget) FromInt(src int64, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromInt(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -617,6 +637,11 @@
// FromFloat implements the Target interface method.
func (c convTarget) FromFloat(src float64, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromFloat(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -629,6 +654,11 @@
// FromComplex implements the Target interface method.
func (c convTarget) FromComplex(src complex128, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromComplex(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -641,6 +671,11 @@
// FromBytes implements the Target interface method.
func (c convTarget) FromBytes(src []byte, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromBytes(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -653,6 +688,11 @@
// FromString implements the Target interface method.
func (c convTarget) FromString(src string, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromString(src, tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
if err != nil {
return err
@@ -665,11 +705,21 @@
// FromEnumLabel implements the Target interface method.
func (c convTarget) FromEnumLabel(src string, tt *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromEnumLabel(src, tt)
+ }
+ }
return c.FromString(src, tt)
}
// FromTypeObject implements the Target interface method.
func (c convTarget) FromTypeObject(src *Type) error {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.FromTypeObject(src)
+ }
+ }
fin, fill, err := startConvert(c, TypeObjectType)
if err != nil {
return err
@@ -1001,8 +1051,20 @@
return fmt.Errorf("invalid conversion from typeobject to %v", c.tt)
}
+// An interface used to identify that a given target is a raw bytes target.
+// This is used to special case conversion to RawBytes without having the
+// vdl package have a dependency on vom.
+type rbTarget interface {
+ RawBytesTargetHack()
+}
+
// StartList implements the Target interface method.
func (c convTarget) StartList(tt *Type, len int) (ListTarget, error) {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.StartList(tt, len)
+ }
+ }
// TODO(bprosnitz) Re-think allocation strategy and possibly use len (currently unused).
fin, fill, err := startConvert(c, tt)
return compConvTarget{fin, fill}, err
@@ -1010,42 +1072,69 @@
// FinishList implements the Target interface method.
func (c convTarget) FinishList(x ListTarget) error {
+ if rbTarg, ok := x.(rbTarget); ok {
+ return rbTarg.(Target).FinishList(x)
+ }
cc := x.(compConvTarget)
return finishConvert(cc.fin, cc.fill)
}
// StartSet implements the Target interface method.
func (c convTarget) StartSet(tt *Type, len int) (SetTarget, error) {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.StartSet(tt, len)
+ }
+ }
fin, fill, err := startConvert(c, tt)
return compConvTarget{fin, fill}, err
}
// FinishSet implements the Target interface method.
func (c convTarget) FinishSet(x SetTarget) error {
+ if rbTarg, ok := x.(rbTarget); ok {
+ return rbTarg.(Target).FinishSet(x)
+ }
cc := x.(compConvTarget)
return finishConvert(cc.fin, cc.fill)
}
// StartMap implements the Target interface method.
func (c convTarget) StartMap(tt *Type, len int) (MapTarget, error) {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.StartMap(tt, len)
+ }
+ }
fin, fill, err := startConvert(c, tt)
return compConvTarget{fin, fill}, err
}
// FinishMap implements the Target interface method.
func (c convTarget) FinishMap(x MapTarget) error {
+ if rbTarg, ok := x.(rbTarget); ok {
+ return rbTarg.(Target).FinishMap(x)
+ }
cc := x.(compConvTarget)
return finishConvert(cc.fin, cc.fill)
}
// StartFields implements the Target interface method.
func (c convTarget) StartFields(tt *Type) (FieldsTarget, error) {
+ if RawBytesTargetFunc != nil && c.vv == nil {
+ if target := RawBytesTargetFunc(c.rv); target != nil {
+ return target.StartFields(tt)
+ }
+ }
fin, fill, err := startConvert(c, tt)
return compConvTarget{fin, fill}, err
}
// FinishFields implements the Target interface method.
func (c convTarget) FinishFields(x FieldsTarget) error {
+ if rbTarg, ok := x.(rbTarget); ok {
+ return rbTarg.(Target).FinishFields(x)
+ }
cc := x.(compConvTarget)
return finishConvert(cc.fin, cc.fill)
}
diff --git a/vdl/reflect_type.go b/vdl/reflect_type.go
index 5e40f86..3371895 100644
--- a/vdl/reflect_type.go
+++ b/vdl/reflect_type.go
@@ -127,6 +127,10 @@
var rtTargetConvertible = reflect.TypeOf([]rawBytesTargetConvertible{}).Elem()
+// Hook for function that returns a Target that writes to a vom.RawBytes.
+// See vom.rbTarget.
+var RawBytesTargetFunc func(rv reflect.Value) Target
+
// basicType returns the *Type corresponding to rt for basic types that cannot
// be named by the user, and have a well-known conversion.
func basicType(rt reflect.Type) *Type {
diff --git a/vdl/target.go b/vdl/target.go
index 555ec4b..dbc682b 100644
--- a/vdl/target.go
+++ b/vdl/target.go
@@ -216,6 +216,13 @@
}
rv = rv.Elem()
}
+
+ if reflect.PtrTo(rv.Type()).Implements(rtTargetConvertible) {
+ rvPtr := reflect.New(rv.Type())
+ rvPtr.Elem().Set(rv)
+ return rvPtr.Interface().(rawBytesTargetConvertible).ToTarget(target)
+ }
+
// Handle special-case for errors.
if rv.Type().ConvertibleTo(rtError) {
return fromError(target, rv)
diff --git a/vom/.api b/vom/.api
index c61c5c0..34893b5 100644
--- a/vom/.api
+++ b/vom/.api
@@ -2,7 +2,6 @@
pkg vom, const ControlKindIncompleteType ControlKind
pkg vom, const ControlKindNil ControlKind
pkg vom, const DefaultVersion Version
-pkg vom, const DefaultVersionWithRawBytesSupport Version
pkg vom, const DumpKindAnyLensLen DumpKind
pkg vom, const DumpKindAnyMsgLen DumpKind
pkg vom, const DumpKindByteLen DumpKind
diff --git a/vom/decoder_test.go b/vom/decoder_test.go
index 2c1d360..a81339e 100644
--- a/vom/decoder_test.go
+++ b/vom/decoder_test.go
@@ -250,6 +250,8 @@
{types.YzStruct{}, types.YzStruct{}},
{types.MBool(false), types.MBool(false)},
{types.NString(""), types.NString("")},
+ {vdl.ValueOf(uint16(5)), vdl.ValueOf(uint16(5))},
+ {vdl.ValueOf([]interface{}{uint16(5)}).Index(0), vdl.ValueOf(uint16(5))},
}
var (
diff --git a/vom/dump.go b/vom/dump.go
index 7e5647e..51f0882 100644
--- a/vom/dump.go
+++ b/vom/dump.go
@@ -868,6 +868,7 @@
d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", err)
return err
}
+ d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", elemType)
if d.version >= Version81 {
d.prepareAtom("waiting for any message length index")
switch index, ctrl, err := binaryDecodeUintWithControl(d.buf); {
@@ -883,7 +884,6 @@
return d.decodeValue(elemType, target)
}
}
- d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", elemType)
return d.decodeValue(elemType, target)
}
default:
diff --git a/vom/encoder.go b/vom/encoder.go
index 2b17ba4..aa7b538 100644
--- a/vom/encoder.go
+++ b/vom/encoder.go
@@ -19,8 +19,7 @@
Version80 = Version(0x80)
Version81 = Version(0x81)
- DefaultVersion = Version80
- DefaultVersionWithRawBytesSupport = Version81 // TODO(bprosnitz) Remove once switch to 81 being default
+ DefaultVersion = Version81
)
func (v Version) String() string {
@@ -103,7 +102,7 @@
// NewEncoder returns a new Encoder that writes to the given writer in the
// binary format. The binary format is compact and fast.
func NewEncoder(w io.Writer) *Encoder {
- return NewVersionedEncoder(Version80, w)
+ return NewVersionedEncoder(DefaultVersion, w)
}
// NewVersionedEncoder returns a new Encoder that writes to the given writer with
@@ -126,7 +125,7 @@
// writer in the binary format. Types will be encoded separately through the
// given typeEncoder.
func NewEncoderWithTypeEncoder(w io.Writer, typeEnc *TypeEncoder) *Encoder {
- return NewVersionedEncoderWithTypeEncoder(Version80, w, typeEnc)
+ return NewVersionedEncoderWithTypeEncoder(DefaultVersion, w, typeEnc)
}
// NewVersionedEncoderWithTypeEncoder returns a new Encoder that writes to the given
@@ -205,7 +204,11 @@
rv := reflect.ValueOf(v)
for rv.Kind() == reflect.Ptr && !rv.IsNil() {
if rv.Type().ConvertibleTo(rtPtrToValue) {
- return rv.Convert(rtPtrToValue).Interface().(*vdl.Value).Type()
+ vv := rv.Convert(rtPtrToValue).Interface().(*vdl.Value)
+ if vv.Kind() == vdl.Any && vv.Elem().IsValid() {
+ vv = vv.Elem()
+ }
+ return vv.Type()
}
if rv.Type().ConvertibleTo(rtPtrToRawBytes) {
return rv.Convert(rtPtrToRawBytes).Interface().(*RawBytes).Type
@@ -897,16 +900,15 @@
func (l *anyLenList) StartAny(startMarker uint64) *anyStartRef {
l.lens = append(l.lens, 0)
+ index := uint64(len(l.lens) - 1)
return &anyStartRef{
- index: uint64(len(l.lens) - 1),
- marker: startMarker,
+ index: index,
+ marker: startMarker + lenUint(index),
}
}
func (l *anyLenList) FinishAny(start *anyStartRef, endMarker uint64) {
- lenIncLenBytes := endMarker - start.marker
- len := lenIncLenBytes - lenUint(lenIncLenBytes)
- l.lens[start.index] = len
+ l.lens[start.index] = endMarker - start.marker
}
func (l *anyLenList) Reset() error {
diff --git a/vom/internal/perf_test.go b/vom/internal/perf_test.go
index c6f00b2..8512cf2 100644
--- a/vom/internal/perf_test.go
+++ b/vom/internal/perf_test.go
@@ -153,7 +153,7 @@
// vom.Encoder.Encode RawBytes benchmarking
func vomEncoderEncodeRawBytes(t testing.TB, n int, rv *vom.RawBytes) {
var buf bytes.Buffer
- encoder := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf)
+ encoder := vom.NewEncoder(&buf)
for j := 0; j != n; j++ {
err := encoder.Encode(rv)
if err != nil {
@@ -163,7 +163,7 @@
}
func makeCustomerRawBytes(tb testing.TB) *vom.RawBytes {
- bytes, err := vom.VersionedEncode(vom.DefaultVersionWithRawBytesSupport, customer)
+ bytes, err := vom.Encode(customer)
if err != nil {
tb.Fatalf("vom.Encode failed: %v", err)
}
@@ -208,7 +208,7 @@
data := buf.Bytes()
// TODO(bprosnitz) remove the encoder below when we switch to version 81
var buf81 bytes.Buffer
- encoder81 := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf81)
+ encoder81 := vom.NewEncoder(&buf81)
for j := 0; j != 1000; j++ {
err := encoder81.Encode(customer)
if err != nil {
@@ -268,7 +268,7 @@
func BenchmarkVomDecoder1000RawBytes(b *testing.B) {
var buf bytes.Buffer
- encoder := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf)
+ encoder := vom.NewEncoder(&buf)
for j := 0; j != 1000; j++ {
err := encoder.Encode(customer)
if err != nil {
diff --git a/vom/raw_bytes.go b/vom/raw_bytes.go
index 7f95643..5d510c0 100644
--- a/vom/raw_bytes.go
+++ b/vom/raw_bytes.go
@@ -29,7 +29,7 @@
func RawBytesFromValue(value interface{}) (*RawBytes, error) {
// TODO(bprosnitz) This implementation is temporary - we should make it faster
- dat, err := VersionedEncode(DefaultVersionWithRawBytesSupport, value)
+ dat, err := Encode(value)
if err != nil {
return nil, err
}
@@ -40,7 +40,7 @@
func (rb *RawBytes) ToValue(value interface{}) error {
// TODO(bprosnitz) This implementation is temporary - we should make it faster
- dat, err := VersionedEncode(DefaultVersionWithRawBytesSupport, rb)
+ dat, err := Encode(rb)
if err != nil {
return err
}
@@ -60,3 +60,275 @@
type rvHackInterface interface {
HackGetRv() reflect.Value
}
+
+var rbType reflect.Type = reflect.TypeOf((*RawBytes)(nil))
+
+// If rv is a raw bytes pointer, return a vdl target that
+// modifies it. Otherwise return nil.
+func makeRawBytesTarget(rv reflect.Value) vdl.Target {
+ if rv.Type() == rbType {
+ rv.Set(reflect.ValueOf(&RawBytes{}))
+ return &rbTarget{rb: rv.Interface().(*RawBytes)}
+ }
+ return nil
+}
+
+func init() {
+ vdl.RawBytesTargetFunc = makeRawBytesTarget
+}
+
+// vdl.Target that writes to a vom.RawBytes.
+// This structure is intended to be one-time-use and
+// created from makeRawBytesTarget.
+type rbTarget struct {
+ rb *RawBytes // RawBytes to write to
+ enc *encoder // encoder to act as the underlying target
+ buf *bytes.Buffer // buffer of bytes written
+ startCount int // count the depth level of open calls, to determine when to write to rb
+}
+
+func (r *rbTarget) start(tt *vdl.Type) error {
+ r.startCount++
+ if r.startCount > 1 {
+ return nil
+ }
+
+ r.buf = bytes.NewBuffer(nil)
+ e := NewEncoder(r.buf)
+ r.enc = &e.enc
+ if _, err := r.enc.writer.Write([]byte{byte(r.enc.version)}); err != nil {
+ return err
+ }
+ r.enc.sentVersionByte = true
+ tid, err := r.enc.typeEnc.encode(tt)
+ if err != nil {
+ return err
+ }
+ if err := r.enc.startEncode(containsAny(tt), containsTypeObject(tt), hasChunkLen(tt), false, int64(tid)); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (r *rbTarget) finish() error {
+ r.startCount--
+ if r.startCount > 0 {
+ return nil
+ }
+
+ if err := r.enc.finishEncode(); err != nil {
+ return err
+ }
+ err := Decode(r.buf.Bytes(), r.rb)
+ return err
+}
+
+func (r *rbTarget) FromBool(src bool, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromBool(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromUint(src uint64, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromUint(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromInt(src int64, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromInt(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromFloat(src float64, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromFloat(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromComplex(src complex128, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromComplex(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromBytes(src []byte, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromBytes(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromString(src string, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromString(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromEnumLabel(src string, tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromEnumLabel(src, tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromTypeObject(src *vdl.Type) error {
+ if err := r.start(vdl.TypeObjectType); err != nil {
+ return err
+ }
+ if err := r.enc.FromTypeObject(src); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) FromNil(tt *vdl.Type) error {
+ if err := r.start(tt); err != nil {
+ return err
+ }
+ if err := r.enc.FromNil(tt); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) StartList(tt *vdl.Type, len int) (vdl.ListTarget, error) {
+ if err := r.start(tt); err != nil {
+ return nil, err
+ }
+ if _, err := r.enc.StartList(tt, len); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishList(_ vdl.ListTarget) error {
+ if err := r.enc.FinishList(nil); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) StartSet(tt *vdl.Type, len int) (vdl.SetTarget, error) {
+ if err := r.start(tt); err != nil {
+ return nil, err
+ }
+ if _, err := r.enc.StartSet(tt, len); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishSet(_ vdl.SetTarget) error {
+ if err := r.enc.FinishSet(nil); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) StartMap(tt *vdl.Type, len int) (vdl.MapTarget, error) {
+ if err := r.start(tt); err != nil {
+ return nil, err
+ }
+ if _, err := r.enc.StartMap(tt, len); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishMap(_ vdl.MapTarget) error {
+ if err := r.enc.FinishMap(nil); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error) {
+ if err := r.start(tt); err != nil {
+ return nil, err
+ }
+ if _, err := r.enc.StartFields(tt); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishFields(_ vdl.FieldsTarget) error {
+ if err := r.enc.FinishFields(nil); err != nil {
+ return err
+ }
+ return r.finish()
+}
+
+func (r *rbTarget) StartElem(index int) (vdl.Target, error) {
+ if _, err := r.enc.StartElem(index); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishElem(_ vdl.Target) error {
+ return r.enc.FinishElem(nil)
+}
+
+func (r *rbTarget) StartKey() (vdl.Target, error) {
+ if _, err := r.enc.StartKey(); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) FinishKey(_ vdl.Target) error {
+ return r.enc.FinishKey(nil)
+}
+
+func (r *rbTarget) FinishKeyStartField(_ vdl.Target) (vdl.Target, error) {
+ if _, err := r.enc.FinishKeyStartField(nil); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func (r *rbTarget) StartField(key string) (vdl.Target, vdl.Target, error) {
+ if _, _, err := r.enc.StartField(key); err != nil {
+ return nil, nil, err
+ }
+ return r, r, nil
+}
+
+func (r *rbTarget) FinishField(_, _ vdl.Target) error {
+ return r.enc.FinishField(nil, nil)
+}
+
+func (r *rbTarget) RawBytesTargetHack() {}
diff --git a/vom/raw_bytes_test.go b/vom/raw_bytes_test.go
index 7f63898..92916e9 100644
--- a/vom/raw_bytes_test.go
+++ b/vom/raw_bytes_test.go
@@ -15,12 +15,6 @@
"v.io/v23/vom/testdata/types"
)
-func TestVdlTypeOfRawBytes(t *testing.T) {
- if got, want := vdl.TypeOf(&RawBytes{}), vdl.AnyType; got != want {
- t.Errorf("got %v, want %v", got, want)
- }
-}
-
type testUint64 uint64
type structTypeObject struct {
@@ -48,7 +42,7 @@
name: "testUint64(99)",
goValue: testUint64(99),
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(testUint64(0)),
Data: []byte{0x63},
},
@@ -57,7 +51,7 @@
name: "typeobject(int32)",
goValue: vdl.Int32Type,
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(vdl.Int32Type),
RefTypes: []*vdl.Type{vdl.Int32Type},
Data: []byte{0x00},
@@ -67,7 +61,7 @@
name: "structTypeObject{typeobject(int32)}",
goValue: structTypeObject{vdl.Int32Type},
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(structTypeObject{}),
RefTypes: []*vdl.Type{vdl.Int32Type},
Data: []byte{0x00, 0x00, WireCtrlEnd},
@@ -78,19 +72,19 @@
goValue: structAnyAndTypes{
vdl.Int32Type,
&RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.BoolType,
Data: []byte{0x01},
},
vdl.BoolType,
&RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(""),
Data: []byte{0x03, 0x61, 0x62, 0x63},
},
},
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(structAnyAndTypes{}),
RefTypes: []*vdl.Type{vdl.Int32Type, vdl.BoolType, vdl.StringType},
AnyLengths: []uint64{1, 4},
@@ -104,23 +98,58 @@
},
},
{
+ name: "large message", // to test that multibyte length is encoded properly
+ goValue: makeLargeBytes(1000),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.ListType(vdl.ByteType),
+ Data: append([]byte{0xfe, 0x03, 0xe8}, makeLargeBytes(1000)...),
+ },
+ },
+ {
+ name: "*vdl.Value",
+ goValue: vdl.ValueOf(uint16(5)),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.Uint16Type,
+ Data: []byte{0x05},
+ },
+ },
+ {
+ name: "*vdl.Value - top level any",
+ goValue: vdl.ValueOf([]interface{}{uint16(5)}).Index(0),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.Uint16Type,
+ Data: []byte{0x05},
+ },
+ },
+ {
name: "any(nil)",
goValue: &RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.AnyType,
Data: []byte{WireCtrlNil},
},
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.AnyType,
Data: []byte{WireCtrlNil},
},
},
}
+func makeLargeBytes(size int) []byte {
+ b := make([]byte, size)
+ for i := range b {
+ b[i] = byte(i % 10)
+ }
+ return b
+}
+
func TestDecodeToRawBytes(t *testing.T) {
for _, test := range rawBytesTestCases {
- bytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, test.goValue)
+ bytes, err := Encode(test.goValue)
if err != nil {
t.Fatalf("%s: error in encode %v", test.name, err)
}
@@ -136,11 +165,11 @@
func TestEncodeFromRawBytes(t *testing.T) {
for _, test := range rawBytesTestCases {
- fullBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, test.goValue)
+ fullBytes, err := Encode(test.goValue)
if err != nil {
t.Fatalf("%s: error in encode %v", test.name, err)
}
- fullBytesFromRaw, err := VersionedEncode(DefaultVersionWithRawBytesSupport, &test.rawBytes)
+ fullBytesFromRaw, err := Encode(&test.rawBytes)
if err != nil {
t.Fatalf("%s: error in encode %v", test.name, err)
}
@@ -160,7 +189,7 @@
name: "testUint64(99)",
goValue: testUint64(99),
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(testUint64(0)),
RefTypes: []*vdl.Type{vdl.TypeOf(testUint64(0))},
AnyLengths: []uint64{1},
@@ -171,7 +200,7 @@
name: "typeobject(int32)",
goValue: vdl.Int32Type,
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(vdl.Int32Type),
RefTypes: []*vdl.Type{vdl.TypeObjectType, vdl.Int32Type},
AnyLengths: []uint64{1},
@@ -182,7 +211,7 @@
name: "structTypeObject{typeobject(int32)}",
goValue: structTypeObject{vdl.Int32Type},
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(structTypeObject{}),
RefTypes: []*vdl.Type{vdl.TypeOf(structTypeObject{}), vdl.Int32Type},
AnyLengths: []uint64{3},
@@ -194,19 +223,19 @@
goValue: structAnyAndTypes{
vdl.Int32Type,
&RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.BoolType,
Data: []byte{0x01},
},
vdl.BoolType,
&RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(""),
Data: []byte{0x03, 0x61, 0x62, 0x63},
},
},
rawBytes: RawBytes{
- Version: DefaultVersionWithRawBytesSupport,
+ Version: DefaultVersion,
Type: vdl.TypeOf(structAnyAndTypes{}),
RefTypes: []*vdl.Type{vdl.TypeOf(structAnyAndTypes{}), vdl.Int32Type, vdl.BoolType, vdl.StringType},
AnyLengths: []uint64{16, 1, 4},
@@ -219,13 +248,45 @@
},
},
},
+ {
+ name: "large message", // to test that multibyte length is encoded properly
+ goValue: makeLargeBytes(1000),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.ListType(vdl.ByteType),
+ RefTypes: []*vdl.Type{vdl.ListType(vdl.ByteType)},
+ AnyLengths: []uint64{0x3eb},
+ Data: append([]byte{0xfe, 0x03, 0xe8}, makeLargeBytes(1000)...),
+ },
+ },
+ {
+ name: "*vdl.Value",
+ goValue: vdl.ValueOf(uint16(5)),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.Uint16Type,
+ RefTypes: []*vdl.Type{vdl.Uint16Type},
+ AnyLengths: []uint64{1},
+ Data: []byte{0x05},
+ },
+ },
+ {
+ name: "*vdl.Value - top level any",
+ goValue: vdl.ValueOf([]interface{}{uint16(5)}).Index(0),
+ rawBytes: RawBytes{
+ Version: DefaultVersion,
+ Type: vdl.Uint16Type,
+ RefTypes: []*vdl.Type{vdl.Uint16Type},
+ AnyLengths: []uint64{1},
+ Data: []byte{0x05},
+ },
+ },
}
func TestWrappedRawBytes(t *testing.T) {
for i, test := range rawBytesWrappedTestCases {
unwrapped := rawBytesTestCases[i]
-
- wrappedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, structAny{&unwrapped.rawBytes})
+ wrappedBytes, err := Encode(structAny{&unwrapped.rawBytes})
if err != nil {
t.Fatalf("%s: error in encode %v", test.name, err)
}
@@ -233,9 +294,6 @@
if err := Decode(wrappedBytes, &any); err != nil {
t.Fatalf("%s: error in decode %v", test.name, err)
}
- if any.X.RefTypes[0] != vdl.TypeOf(test.goValue) {
- t.Errorf("expected type %v to be first in ref list, but was %v", test.goValue, any.X.RefTypes[0])
- }
if !reflect.DeepEqual(any.X, &test.rawBytes) {
t.Errorf("%s: got %#v, want %#v", test.name, any.X, &test.rawBytes)
}
@@ -244,11 +302,11 @@
func TestEncodeNilRawBytes(t *testing.T) {
// Top-level
- expectedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, vdl.ZeroValue(vdl.AnyType))
+ expectedBytes, err := Encode(vdl.ZeroValue(vdl.AnyType))
if err != nil {
t.Fatal(err)
}
- encodedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, (*RawBytes)(nil))
+ encodedBytes, err := Encode((*RawBytes)(nil))
if err != nil {
t.Fatal(err)
}
@@ -257,11 +315,11 @@
}
// Within an object.
- expectedBytes, err = VersionedEncode(DefaultVersionWithRawBytesSupport, []*vdl.Value{vdl.ZeroValue(vdl.AnyType)})
+ expectedBytes, err = Encode([]*vdl.Value{vdl.ZeroValue(vdl.AnyType)})
if err != nil {
t.Fatal(err)
}
- encodedBytes, err = VersionedEncode(DefaultVersionWithRawBytesSupport, []*RawBytes{nil})
+ encodedBytes, err = Encode([]*RawBytes{nil})
if err != nil {
t.Fatal(err)
}
@@ -373,3 +431,48 @@
}
}
}
+
+func TestVdlTypeOfRawBytes(t *testing.T) {
+ if got, want := vdl.TypeOf(&RawBytes{}), vdl.AnyType; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func TestVdlValueOfRawBytes(t *testing.T) {
+ for _, test := range rawBytesTestCases {
+ want := vdl.ValueOf(test.goValue)
+ got := vdl.ValueOf(test.rawBytes)
+ if !vdl.EqualValue(got, want) {
+ t.Errorf("vdl.ValueOf(RawBytes) %s: got %v, want %v", test.name, got, want)
+ }
+ }
+}
+
+func TestConvertRawBytes(t *testing.T) {
+ for _, test := range rawBytesTestCases {
+ var rb *RawBytes
+ if err := vdl.Convert(&rb, test.goValue); err != nil {
+ t.Errorf("vdl.Convert %#v to RawBytes: %v", test.goValue, err)
+ }
+ if !reflect.DeepEqual(rb, &test.rawBytes) {
+ t.Errorf("vdl.Convert to RawBytes %s: got %v, want %v", test.name, rb, test.rawBytes)
+ }
+ }
+}
+
+type structAnyInterface struct {
+ X interface{}
+}
+
+func TestConvertRawBytesWrapped(t *testing.T) {
+ for _, test := range rawBytesTestCases {
+ var any structAny
+ if err := vdl.Convert(&any, structAnyInterface{test.goValue}); err != nil {
+ t.Errorf("vdl.Convert %#v to RawBytes: %v", structAnyInterface{test.goValue}, err)
+ }
+ got, want := any, structAny{&test.rawBytes}
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("vdl.Convert to RawBytes %s: got %v, want %v", test.name, got, want)
+ }
+ }
+}
diff --git a/vom/single_shot.go b/vom/single_shot.go
index 1a58d70..f3a1a08 100644
--- a/vom/single_shot.go
+++ b/vom/single_shot.go
@@ -18,7 +18,7 @@
// This is a "single-shot" encoding; full type information is always included in
// the returned encoding, as if a new encoder were used for each call.
func Encode(v interface{}) ([]byte, error) {
- return VersionedEncode(Version80, v)
+ return VersionedEncode(DefaultVersion, v)
}
// VersionedEncode performs single-shot encoding to a specific version of VOM
diff --git a/vom/type_encoder.go b/vom/type_encoder.go
index f8964de..faa305c 100644
--- a/vom/type_encoder.go
+++ b/vom/type_encoder.go
@@ -33,7 +33,7 @@
// NewTypeEncoder returns a new TypeEncoder that writes types to the given
// writer in the binary format.
func NewTypeEncoder(w io.Writer) *TypeEncoder {
- return NewVersionedTypeEncoder(Version80, w)
+ return NewVersionedTypeEncoder(DefaultVersion, w)
}
// NewTypeEncoderVersion returns a new TypeEncoder that writes types to the given