Merge "vom/fuzz: some minor changes to the vom fuzzer"
diff --git a/.api b/.api
index 4a83cd3..27ed809 100644
--- a/.api
+++ b/.api
@@ -14,6 +14,7 @@
 pkg v23, func NewEndpoint(string) (naming.Endpoint, error)
 pkg v23, func NewFlowManager(*context.T, time.Duration) (flow.Manager, error)
 pkg v23, func RegisterRuntimeFactory(RuntimeFactory)
+pkg v23, func TryInit() (*context.T, Shutdown, error)
 pkg v23, func WithBackgroundContext(*context.T) *context.T
 pkg v23, func WithListenSpec(*context.T, rpc.ListenSpec) *context.T
 pkg v23, func WithNewClient(*context.T, ...rpc.ClientOpt) (*context.T, rpc.Client, error)
diff --git a/.gitignore b/.gitignore
index 5e2e933..4209dc0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1 @@
 /.jiri
-#TODO(nlacasse): Get rid of .v23 below once v23->jiri transition is complete.
-/.v23
diff --git a/flow/.api b/flow/.api
index ececf65..49281c1 100644
--- a/flow/.api
+++ b/flow/.api
@@ -51,10 +51,11 @@
 pkg flow, type ManagedConn interface, RemoteBlessings() security.Blessings
 pkg flow, type ManagedConn interface, RemoteDischarges() map[string]security.Discharge
 pkg flow, type ManagedConn interface, RemoteEndpoint() naming.Endpoint
-pkg flow, type Manager interface { Accept, Closed, Dial, Listen, ProxyListen, RoutingID, Status, StopListening }
+pkg flow, type Manager interface { Accept, Closed, Dial, DialCached, Listen, ProxyListen, RoutingID, Status, StopListening }
 pkg flow, type Manager interface, Accept(*context.T) (Flow, error)
 pkg flow, type Manager interface, Closed() <-chan struct{}
 pkg flow, type Manager interface, Dial(*context.T, naming.Endpoint, PeerAuthorizer, time.Duration) (Flow, error)
+pkg flow, type Manager interface, DialCached(*context.T, naming.Endpoint, PeerAuthorizer, time.Duration) (Flow, error)
 pkg flow, type Manager interface, Listen(*context.T, string, string) error
 pkg flow, type Manager interface, ProxyListen(*context.T, string, naming.Endpoint) (<-chan struct{}, error)
 pkg flow, type Manager interface, RoutingID() naming.RoutingID
diff --git a/flow/model.go b/flow/model.go
index 57300fc..75430a3 100644
--- a/flow/model.go
+++ b/flow/model.go
@@ -74,6 +74,17 @@
 	// closed.
 	Dial(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
 
+	// DialCached creates a Flow to the provided remote endpoint using only cached
+	// connections from previous Listen or Dial calls.
+	// If no cached connection exists, an error will be returned.
+	//
+	// 'auth' is used to determine the blessings that will be sent to the remote end.
+	//
+	// channelTimeout specifies the duration we are willing to wait before determining
+	// that connections managed by this Manager are unhealthy and should be
+	// closed.
+	DialCached(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
+
 	// RoutingID returns the naming.Routing of the flow.Manager.
 	// If the RoutingID of the manager is naming.NullRoutingID, the manager can
 	// only be used to Dial outgoing calls.
diff --git a/init.go b/init.go
index fd8b3a7..d913246 100644
--- a/init.go
+++ b/init.go
@@ -8,11 +8,21 @@
 
 import "v.io/v23/context"
 
+// TryInit is like Init, except that it returns an error instead of panicking.
+func TryInit() (*context.T, Shutdown, error) {
+	return internalInit()
+}
+
 // Init should be called once for each vanadium executable, providing
 // the setup of the vanadium initial context.T and a Shutdown function
 // that can be used to clean up the runtime.  We allow calling Init
 // multiple times (useful in tests), but only as long as you call the
 // Shutdown returned previously before calling Init the second time.
+// Init panics if it encounters an error.
 func Init() (*context.T, Shutdown) {
-	return internalInit()
+	ctx, shutdown, err := internalInit()
+	if err != nil {
+		panic(err)
+	}
+	return ctx, shutdown
 }
diff --git a/init_internal_test.go b/init_internal_test.go
index a83c1c6..fd45b9a 100644
--- a/init_internal_test.go
+++ b/init_internal_test.go
@@ -70,7 +70,8 @@
 		if r == nil {
 			t.Fatalf("recover returned nil")
 		}
-		str := r.(string)
+		err := r.(error)
+		str := err.Error()
 		if !strings.Contains(str, "No RuntimeFactory has been registered") {
 			t.Fatalf("unexpected error: %s", str)
 		}
@@ -114,7 +115,8 @@
 		if r == nil {
 			t.Fatalf("recover returned nil")
 		}
-		str := r.(string)
+		err := r.(error)
+		str := err.Error()
 		if !strings.Contains(str, "A runtime has already been initialized") {
 			t.Fatalf("unexpected error: %s", str)
 		}
diff --git a/init_mojo.go b/init_mojo.go
index e265169..c516674 100644
--- a/init_mojo.go
+++ b/init_mojo.go
@@ -28,6 +28,14 @@
 // multiple times, but only as long as you call the Shutdown returned
 // previously before calling Init the second time.
 func Init(mctx application.Context) (*context.T, Shutdown) {
+	ctx, shutdown, err := TryInit(mctx)
+	if err != nil {
+		panic(err)
+	}
+	return ctx, shutdown
+}
+
+func TryInit(mctx application.Context) (*context.T, Shutdown, error) {
 	// mctx.Args() is a slice that contains the url of this mojo service
 	// followed by all arguments passed to the mojo service via the
 	// "--args-for" flag. Since the v23 runtime factories parse arguments
diff --git a/model.go b/model.go
index 4ae7fcd..edf9a22 100644
--- a/model.go
+++ b/model.go
@@ -444,12 +444,12 @@
 	return buf.String()
 }
 
-func internalInit() (*context.T, Shutdown) {
+func internalInit() (*context.T, Shutdown, error) {
 	initState.mu.Lock()
 	runtimeFactory := initState.runtimeFactory
 	if initState.runtimeFactory == nil {
 		initState.mu.Unlock()
-		panic("No RuntimeFactory has been registered nor specified. This is most" +
+		return nil, nil, fmt.Errorf("No RuntimeFactory has been registered nor specified. This is most" +
 			" likely because your main package has not imported a RuntimeFactory")
 	}
 
@@ -463,7 +463,7 @@
 This registration is from:
 %s
 `
-		panic(fmt.Sprintf(format, initState.runtimeStack, stack))
+		return nil, nil, fmt.Errorf(format, initState.runtimeStack, stack)
 	}
 	initState.runtimeStack = stack
 	initState.mu.Unlock()
@@ -480,7 +480,7 @@
 	if err != nil {
 		cancel()
 		rootcancel()
-		panic(err)
+		return nil, nil, err
 	}
 
 	initState.mu.Lock()
@@ -503,8 +503,8 @@
 
 	if err := rt.Init(ctx); err != nil {
 		vshutdown()
-		panic(err)
+		return nil, nil, err
 	}
 
-	return ctx, vshutdown
+	return ctx, vshutdown, nil
 }
diff --git a/options/.api b/options/.api
index 05d88ab..c2500d8 100644
--- a/options/.api
+++ b/options/.api
@@ -1,5 +1,6 @@
 pkg options, method (ChannelTimeout) RPCCallOpt()
 pkg options, method (ChannelTimeout) RPCServerOpt()
+pkg options, method (ConnectionTimeout) RPCCallOpt()
 pkg options, method (IsLeaf) RPCServerOpt()
 pkg options, method (LameDuckTimeout) RPCServerOpt()
 pkg options, method (NameResolutionAuthorizer) NSOpt()
@@ -8,11 +9,11 @@
 pkg options, method (NoRetry) RPCCallOpt()
 pkg options, method (Preresolved) NSOpt()
 pkg options, method (Preresolved) RPCCallOpt()
-pkg options, method (RetryTimeout) RPCCallOpt()
 pkg options, method (ServerAuthorizer) RPCCallOpt()
 pkg options, method (ServerPeers) RPCServerOpt()
 pkg options, method (ServesMountTable) RPCServerOpt()
 pkg options, type ChannelTimeout time.Duration
+pkg options, type ConnectionTimeout time.Duration
 pkg options, type IsLeaf bool
 pkg options, type LameDuckTimeout time.Duration
 pkg options, type NameResolutionAuthorizer struct
@@ -20,7 +21,6 @@
 pkg options, type NoRetry struct
 pkg options, type Preresolved struct
 pkg options, type Preresolved struct, Resolution *naming.MountEntry
-pkg options, type RetryTimeout time.Duration
 pkg options, type ServerAuthorizer struct
 pkg options, type ServerAuthorizer struct, embedded security.Authorizer
 pkg options, type ServerPeers []security.BlessingPattern
diff --git a/options/options.go b/options/options.go
index be8219d..3f2e346 100644
--- a/options/options.go
+++ b/options/options.go
@@ -97,12 +97,6 @@
 func (NameResolutionAuthorizer) RPCCallOpt() {}
 func (NameResolutionAuthorizer) NSOpt()      {}
 
-// RetryTimeout is the duration during which we will retry starting
-// an RPC call.  Zero means don't retry.
-type RetryTimeout time.Duration
-
-func (RetryTimeout) RPCCallOpt() {}
-
 // Preresolved specifies that the RPC call should not further Resolve the name.
 // If a MountEntry is provided, use it.  Otherwise use the name passed in the
 // RPC call.  If the name is relative, it will be made global using
@@ -144,3 +138,10 @@
 
 func (ChannelTimeout) RPCCallOpt()   {}
 func (ChannelTimeout) RPCServerOpt() {}
+
+// ConnectionTimeout is the amount of time we will try establishing a connection
+// to the remote end during an RPC. Zero means only use cached connections and
+// do not attempt to retry if no connection exists in the cache.
+type ConnectionTimeout time.Duration
+
+func (ConnectionTimeout) RPCCallOpt() {}
diff --git a/rpc/reserved/reserved.go b/rpc/reserved/reserved.go
index 1dd8615..7138e15 100644
--- a/rpc/reserved/reserved.go
+++ b/rpc/reserved/reserved.go
@@ -14,8 +14,7 @@
 )
 
 // Signature invokes the reserved signature RPC on the given name, and returns
-// the results.  The client will be used to invoke the RPC - if it is nil, the
-// default client from the runtime is used.
+// the results.
 func Signature(ctx *context.T, name string, opts ...rpc.CallOpt) ([]signature.Interface, error) {
 	var sig []signature.Interface
 	res := []interface{}{&sig}
@@ -26,8 +25,7 @@
 }
 
 // MethodSignature invokes the reserved method signature RPC on the given name,
-// and returns the results.  The client will be used to invoke the RPC - if it
-// is nil, the default client from the runtime is used.
+// and returns the results.
 func MethodSignature(ctx *context.T, name, method string, opts ...rpc.CallOpt) (signature.Method, error) {
 	args := []interface{}{method}
 	var sig signature.Method
diff --git a/security/.api b/security/.api
index df579dd..d579bed 100644
--- a/security/.api
+++ b/security/.api
@@ -88,7 +88,7 @@
 pkg security, type BlessingStore interface, CacheDischarge(Discharge, Caveat, DischargeImpetus)
 pkg security, type BlessingStore interface, ClearDischarges(...Discharge)
 pkg security, type BlessingStore interface, DebugString() string
-pkg security, type BlessingStore interface, Default() Blessings
+pkg security, type BlessingStore interface, Default() (Blessings, <-chan struct{})
 pkg security, type BlessingStore interface, Discharge(Caveat, DischargeImpetus) (Discharge, time.Time)
 pkg security, type BlessingStore interface, ForPeer(...string) Blessings
 pkg security, type BlessingStore interface, PeerBlessings() map[BlessingPattern]Blessings
diff --git a/security/blessings.go b/security/blessings.go
index 9f95ead..a6b6732 100644
--- a/security/blessings.go
+++ b/security/blessings.go
@@ -356,7 +356,8 @@
 // DefaultBlessingPatterns returns the BlessingsPatterns of the Default Blessings
 // of the provided Principal.
 func DefaultBlessingPatterns(p Principal) (patterns []BlessingPattern) {
-	for _, b := range BlessingNames(p, p.BlessingStore().Default()) {
+	blessings, _ := p.BlessingStore().Default()
+	for _, b := range BlessingNames(p, blessings) {
 		patterns = append(patterns, BlessingPattern(b))
 	}
 	return
diff --git a/security/model.go b/security/model.go
index e1c464d..2b76cea 100644
--- a/security/model.go
+++ b/security/model.go
@@ -208,18 +208,19 @@
 	// hosts blessings.
 	SetDefault(blessings Blessings) error
 
-	// Default returns the blessings to be shared with peers for which
-	// no other information is available in order to select blessings
-	// from the store.
+	// Default returns the blessings to be shared with peers for which no
+	// other information is available in order to select blessings from the
+	// store.
 	//
 	// For example, Default can be used by servers to identify themselves
 	// to clients before the client has identified itself.
 	//
 	// Default returns the blessings provided to the last call to
-	// SetDefault.
+	// SetDefault and a channel which will be closed when the
+	// Default changes (i.e., SetDefault is called again).
 	//
 	// Returns the zero value if there is no usable blessings.
-	Default() Blessings
+	Default() (Blessings, <-chan struct{})
 
 	// PublicKey returns the public key of the Principal for which
 	// this store hosts blessings.
diff --git a/security/principal.go b/security/principal.go
index b435cf3..e73889c 100644
--- a/security/principal.go
+++ b/security/principal.go
@@ -69,7 +69,7 @@
 }
 func (errStore) ForPeer(peerBlessings ...string) Blessings                     { return Blessings{} }
 func (errStore) SetDefault(blessings Blessings) error                          { return verror.New(errNilStore, nil) }
-func (errStore) Default() Blessings                                            { return Blessings{} }
+func (errStore) Default() (Blessings, <-chan struct{})                         { return Blessings{}, nil }
 func (errStore) PeerBlessings() map[BlessingPattern]Blessings                  { return nil }
 func (errStore) CacheDischarge(Discharge, Caveat, DischargeImpetus)            { return }
 func (errStore) ClearDischarges(...Discharge)                                  { return }
diff --git a/security/principal_blessings_test.go b/security/principal_blessings_test.go
index 769fa23..3c9e31a 100644
--- a/security/principal_blessings_test.go
+++ b/security/principal_blessings_test.go
@@ -314,7 +314,7 @@
 	if got := s.ForPeer(); !got.IsZero() {
 		t.Errorf("BlessingStore.ForPeer: got %v want empty", got)
 	}
-	if got := s.Default(); !got.IsZero() {
+	if got, _ := s.Default(); !got.IsZero() {
 		t.Errorf("BlessingStore.Default: got %v want empty", got)
 	}
 	if got, want := s.PublicKey(), p.PublicKey(); !reflect.DeepEqual(got, want) {
diff --git a/syncbase/featuretests/client_v23_test.go b/syncbase/featuretests/client_v23_test.go
index 8793bcc..e38300b 100644
--- a/syncbase/featuretests/client_v23_test.go
+++ b/syncbase/featuretests/client_v23_test.go
@@ -6,8 +6,12 @@
 
 import (
 	"testing"
+	"time"
 
+	"v.io/v23/context"
 	"v.io/v23/syncbase"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/vom"
 	"v.io/x/ref/test/v23test"
 )
 
@@ -19,7 +23,9 @@
 
 	// Start syncbased.
 	serverCreds := sh.ForkCredentials("server")
-	sh.StartSyncbase(serverCreds, testSbName, "", `{"Read": {"In":["root:server", "root:client"]}, "Write": {"In":["root:server", "root:client"]}}`)
+	// TODO(aghassemi): Resolve permission is currently needed for Watch.
+	// See https://github.com/vanadium/issues/issues/1110
+	sh.StartSyncbase(serverCreds, testSbName, "", `{"Resolve": {"In":["root:server", "root:client"]}, "Read": {"In":["root:server", "root:client"]}, "Write": {"In":["root:server", "root:client"]}}`)
 
 	// Create app, database and table.
 	// TODO(ivanpi): Use setupAppA.
@@ -36,17 +42,52 @@
 	if err := tb.Create(ctx, nil); err != nil {
 		t.Fatalf("unable to create a table: %v", err)
 	}
+	marker, err := d.GetResumeMarker(ctx)
+	if err != nil {
+		t.Fatalf("unable to get the resume marker: %v", err)
+	}
 
 	// Do a Put followed by a Get.
-	r := tb.Row("r")
-	if err := r.Put(ctx, "testkey"); err != nil {
+	r := tb.Row("testkey")
+	if err := r.Put(ctx, "testvalue"); err != nil {
 		t.Fatalf("r.Put() failed: %v", err)
 	}
 	var result string
 	if err := r.Get(ctx, &result); err != nil {
 		t.Fatalf("r.Get() failed: %v", err)
 	}
-	if got, want := result, "testkey"; got != want {
+	if got, want := result, "testvalue"; got != want {
 		t.Fatalf("unexpected value: got %q, want %q", got, want)
 	}
+
+	// Do a watch from the resume marker before the put operation.
+	ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
+	defer cancel()
+	stream, err := d.Watch(ctxWithTimeout, "tb", "", marker)
+	if err != nil {
+		t.Fatalf("unable to start a watch %v", err)
+	}
+	if !stream.Advance() {
+		t.Fatalf("watch stream unexpectedly reached the end: %v", stream.Err())
+	}
+	change := stream.Change()
+	if got, want := change.Table, "tb"; got != want {
+		t.Fatalf("unexpected watch table: got %q, want %q", got, want)
+	}
+	if got, want := change.Row, "testkey"; got != want {
+		t.Fatalf("unexpected watch row: got %q, want %q", got, want)
+	}
+	if got, want := change.ChangeType, nosql.PutChange; got != want {
+		t.Fatalf("unexpected watch change type: got %q, want %q", got, want)
+	}
+	if got, want := change.FromSync, false; got != want {
+		t.Fatalf("unexpected FromSync value: got %t, want %t", got, want)
+	}
+	if err := vom.Decode(change.ValueBytes, &result); err != nil {
+		t.Fatalf("couldn't decode watch value: %v", err)
+	}
+	if got, want := result, "testvalue"; got != want {
+		t.Fatalf("unexpected watch value: got %q, want %q", got, want)
+	}
+
 }
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index 05dfd15..9d260db 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -570,7 +570,6 @@
 	}
 	return &nosql.Schema{
 		Metadata: metadata,
-		Upgrader: nil,
 		Resolver: resolver,
 	}
 }
diff --git a/syncbase/featuretests/ping_pong_test.go b/syncbase/featuretests/ping_pong_test.go
new file mode 100644
index 0000000..0473d27
--- /dev/null
+++ b/syncbase/featuretests/ping_pong_test.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package featuretests_test
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"v.io/v23/naming"
+	"v.io/v23/services/watch"
+	"v.io/v23/syncbase"
+	"v.io/v23/syncbase/nosql"
+	constants "v.io/x/ref/services/syncbase/server/util"
+	"v.io/x/ref/test/v23test"
+)
+
+// BenchmarkPingPongPair measures the round trip sync latency between a pair of
+// Syncbase instances that Ping Pong data to each other over a syncgroup.
+//
+// This benchmark performs the following operations:
+// - Create two syncbase instances and have them join the same syncgroup.
+// - Each watches the other syncbase's "section" of the table.
+// - A preliminary write by each syncbase ensures that clocks are synced.
+// - During Ping Pong, each syncbase instance finds its local dev time and
+//   writes it to its section of the table. The other syncbase watches for
+//   this value; once received, it does the same.
+//
+// After the benchmark completes, the "ns/op" value refers to the average time
+// per Ping Pong roundtrip completed by the two syncbases.
+//
+// Note: This benchmark can write simpler values (like int32) or use time.Now()
+// instead of using a DevModeGetTime RPC. This can affect the benchmark stats.
+func BenchmarkPingPongPair(b *testing.B) {
+	sh := v23test.NewShell(b, v23test.Opts{})
+	defer sh.Cleanup()
+	sh.StartRootMountTable()
+
+	// Setup 2 Syncbases with the dev mode clock on.
+	sbs := setupSyncbases(b, sh, 2, "--dev")
+
+	// Syncbase s0 is the creator.
+	sgName := naming.Join(sbs[0].sbName, constants.SyncbaseSuffix, "SG1")
+
+	// TODO(alexfandrianto): Was unable to use the empty prefix ("tb:").
+	// Observation: w0's watch isn't working with the empty prefix.
+	// Possible Explanation: The empty prefix ACL receives an initial value from
+	// the Table ACL. If this value is synced over from the opposing peer,
+	// conflict resolution can mean that s0 loses the ability to watch.
+	syncString := fmt.Sprintf("%s:p", testTable)
+	ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgName, syncString, "", sbBlessings(sbs), nil))
+
+	// Syncbase s1 will attempt to join the syncgroup.
+	ok(b, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sgName))
+
+	// Obtain the handles to the databases.
+	db0, _ := getDbAndTable(sbs[0].sbName)
+	db1, _ := getDbAndTable(sbs[1].sbName)
+
+	// Set up the watch streams (watching the other syncbase's prefix).
+	prefix0, prefix1 := "prefix0", "prefix1"
+	w0, err := db0.Watch(sbs[0].clientCtx, testTable, prefix1, watch.ResumeMarker("now"))
+	ok(b, err)
+	w1, err := db1.Watch(sbs[1].clientCtx, testTable, prefix0, watch.ResumeMarker("now"))
+	ok(b, err)
+
+	// The join has succeeded, so it's time to ensure clocks are synchronized.
+	// The strategy is: s0 sends to s1, and then s1 responds.
+	sendTimeSync(b, sbs[0], prefix0, w1)
+	sendTimeSync(b, sbs[1], prefix1, w0)
+
+	// The clocks are synchronized, so it is now time to really start our watch
+	// streams. This will allow us to ping and pong.
+	c0, c1 := make(chan time.Time), make(chan time.Time)
+	go watchTimes(b, w0, c0)
+	go watchTimes(b, w1, c1)
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		suffix := fmt.Sprintf("%d", i)
+		ok(b, writeTime(sbs[0], prefix0, suffix))
+		<-c1
+		ok(b, writeTime(sbs[1], prefix1, suffix))
+		<-c0
+	}
+
+	// TODO(alexfandrianto): Should cancel these watch streams. Unfortunately, we
+	// cannot cancel while a watch stream's Advance() blocks.
+	//w0.Cancel()
+	//w1.Cancel()
+}
+
+// sendTimeSync sends data from 1 syncbase to another.
+// Be sure that the receiving watch stream sees the data and is still ok.
+func sendTimeSync(b *testing.B, ts *testSyncbase, senderPrefix string, w nosql.WatchStream) {
+	ok(b, writeTime(ts, senderPrefix, "synctime"))
+	if w.Advance() {
+		w.Change() // grab the change, but ignore the value.
+	}
+	watchStreamOk(b, w)
+}
+
+// watchTimes sends the value of each put through to the channel.
+func watchTimes(b *testing.B, w nosql.WatchStream, c chan time.Time) {
+	for w.Advance() {
+		var t time.Time
+		change := w.Change()
+		if change.ChangeType == nosql.DeleteChange {
+			b.Error("Received a delete change")
+		}
+		err := change.Value(&t)
+		if err != nil {
+			b.Error(err)
+		}
+		c <- t
+		watchStreamOk(b, w)
+	}
+}
+
+// getDbAndTable obtains the database and table handles for a syncbase name.
+func getDbAndTable(syncbaseName string) (d nosql.Database, tb nosql.Table) {
+	a := syncbase.NewService(syncbaseName).App(testApp)
+	d = a.NoSQLDatabase(testDb, nil)
+	tb = d.Table(testTable)
+	return
+}
+
+// writeTime writes a timestamp into syncbase at the combination of keyPrefix
+// and keySuffix.
+func writeTime(ts *testSyncbase, keyPrefix, keySuffix string) error {
+	ctx := ts.clientCtx
+	syncbaseName := ts.sbName
+	_, tb := getDbAndTable(syncbaseName)
+
+	key := fmt.Sprintf("%s/%s", keyPrefix, keySuffix)
+	// sc is a helper to obtain the syncbase wire service.
+	time, err := sc(syncbaseName).DevModeGetTime(ctx)
+	if err != nil {
+		return fmt.Errorf("sb.DevModeGetTime() failed: %v", err)
+	}
+
+	if err2 := tb.Put(ctx, key, time); err2 != nil {
+		return fmt.Errorf("tb.Put() failed: %v", err2)
+	}
+	return nil
+}
+
+// watchStreamOk emits an error if the watch stream has an error.
+func watchStreamOk(b *testing.B, w nosql.WatchStream) {
+	if w.Err() != nil {
+		b.Errorf("stream error: %v", w.Err())
+	}
+}
diff --git a/syncbase/featuretests/test_util_test.go b/syncbase/featuretests/test_util_test.go
index 180f0b2..8306421 100644
--- a/syncbase/featuretests/test_util_test.go
+++ b/syncbase/featuretests/test_util_test.go
@@ -15,6 +15,7 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
+	wire_syncbase "v.io/v23/services/syncbase"
 	wire "v.io/v23/services/syncbase/nosql"
 	"v.io/v23/syncbase"
 	"v.io/v23/syncbase/nosql"
@@ -52,7 +53,7 @@
 }
 
 // Spawns "num" Syncbase instances and returns handles to them.
-func setupSyncbases(t *testing.T, sh *v23test.Shell, num int, args ...string) []*testSyncbase {
+func setupSyncbases(t testing.TB, sh *v23test.Shell, num int, args ...string) []*testSyncbase {
 	sbs := make([]*testSyncbase, num)
 	for i, _ := range sbs {
 		sbName, clientId := fmt.Sprintf("s%d", i), fmt.Sprintf("c%d", i)
@@ -62,8 +63,8 @@
 			clientId:  clientId,
 			clientCtx: sh.ForkContext(clientId),
 		}
-		// Give RWA permissions to this Syncbase's client.
-		acl := fmt.Sprintf(`{"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId)
+		// Give XRWA permissions to this Syncbase's client.
+		acl := fmt.Sprintf(`{"Resolve":{"In":["root:%s"]},"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId, clientId)
 		sh.StartSyncbase(sbs[i].sbCreds, sbs[i].sbName, "", acl, args...)
 	}
 	// Call setupHierarchy on each Syncbase.
@@ -327,27 +328,35 @@
 }
 
 ////////////////////////////////////////////////////////////
+// Helpers to interact with the Syncbase service directly.
+
+// Obtain the Syncbase client stub.
+func sc(name string) wire_syncbase.ServiceClientStub {
+	return wire_syncbase.ServiceClient(name)
+}
+
+////////////////////////////////////////////////////////////
 // Generic testing helpers
 
-func ok(t *testing.T, err error) {
+func ok(t testing.TB, err error) {
 	if err != nil {
 		tu.Fatal(t, err)
 	}
 }
 
-func nok(t *testing.T, err error) {
+func nok(t testing.TB, err error) {
 	if err == nil {
 		tu.Fatal(t, "nil err")
 	}
 }
 
-func eq(t *testing.T, got, want interface{}) {
+func eq(t testing.TB, got, want interface{}) {
 	if !reflect.DeepEqual(got, want) {
 		tu.Fatalf(t, "got %v, want %v", got, want)
 	}
 }
 
-func neq(t *testing.T, got, notWant interface{}) {
+func neq(t testing.TB, got, notWant interface{}) {
 	if reflect.DeepEqual(got, notWant) {
 		tu.Fatalf(t, "got %v", got)
 	}
diff --git a/syncbase/featuretests/vclock_v23_test.go b/syncbase/featuretests/vclock_v23_test.go
index d77559c..a6be515 100644
--- a/syncbase/featuretests/vclock_v23_test.go
+++ b/syncbase/featuretests/vclock_v23_test.go
@@ -388,10 +388,6 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Helper functions
 
-func sc(name string) wire.ServiceClientStub {
-	return wire.ServiceClient(name)
-}
-
 // Creates a "chain" of syncgroups, where each adjacent pair of Syncbases {A,B}
 // share a syncgroup with key prefix "AB".
 func setupChain(t *testing.T, sbs []*testSyncbase) {
diff --git a/syncbase/nosql/.api b/syncbase/nosql/.api
index 8c2d318..72b9183 100644
--- a/syncbase/nosql/.api
+++ b/syncbase/nosql/.api
@@ -1,7 +1,6 @@
 pkg nosql, const DeleteChange ChangeType
 pkg nosql, const PutChange ChangeType
 pkg nosql, func NewDatabase(string, string, *Schema) *database
-pkg nosql, func NewSchema(wire.SchemaMetadata, SchemaUpgrader) *Schema
 pkg nosql, func NewValue(*context.T, interface{}) (*Value, error)
 pkg nosql, func Prefix(string) PrefixRange
 pkg nosql, func Range(string, string) RowRange
@@ -135,9 +134,6 @@
 pkg nosql, type Schema struct
 pkg nosql, type Schema struct, Metadata wire.SchemaMetadata
 pkg nosql, type Schema struct, Resolver ConflictResolver
-pkg nosql, type Schema struct, Upgrader SchemaUpgrader
-pkg nosql, type SchemaUpgrader interface { Run }
-pkg nosql, type SchemaUpgrader interface, Run(Database, int32, int32) error
 pkg nosql, type Stream interface { Advance, Cancel, Err }
 pkg nosql, type Stream interface, Advance() bool
 pkg nosql, type Stream interface, Cancel()
diff --git a/syncbase/nosql/benchmark_test.go b/syncbase/nosql/benchmark_test.go
index a8d663a..c0bc6a9 100644
--- a/syncbase/nosql/benchmark_test.go
+++ b/syncbase/nosql/benchmark_test.go
@@ -187,6 +187,7 @@
 
 // Measures how long it takes to put and get notified about 'numRows' rows.
 func runWatchPutsBenchmark(b *testing.B, value interface{}) {
+	b.Skip("Hangs on occasion, for unknown reasons - v.io/i/1134")
 	ctx, d, tb, cleanup := prepare(b)
 	defer cleanup()
 	b.ResetTimer()
@@ -215,6 +216,7 @@
 
 // Measures how long it takes to put and get notified about a single value.
 func runWatchOnePutBenchmark(b *testing.B, value interface{}) {
+	b.Skip("Hangs on occasion, for unknown reasons - v.io/i/1134")
 	ctx, d, tb, cleanup := prepare(b)
 	defer cleanup()
 	w, err := d.Watch(ctx, "tb", "", watch.ResumeMarker("now"))
diff --git a/syncbase/nosql/cr_connection_test.go b/syncbase/nosql/cr_connection_test.go
index 774fff2..27acb3a 100644
--- a/syncbase/nosql/cr_connection_test.go
+++ b/syncbase/nosql/cr_connection_test.go
@@ -127,7 +127,6 @@
 func getSchema(cr ConflictResolver) *Schema {
 	return &Schema{
 		Metadata: wire.SchemaMetadata{},
-		Upgrader: nil,
 		Resolver: cr,
 	}
 }
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index 9deef44..d4b26b7 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -211,7 +211,7 @@
 func (d *database) EnforceSchema(ctx *context.T) error {
 	var schema *Schema = d.schema
 	if schema == nil {
-		return verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+		return verror.New(verror.ErrBadState, ctx, "EnforceSchema cannot be used since a nil *Schema was provided at Database handle creation time.")
 	}
 
 	if schema.Metadata.Version < 0 {
@@ -222,7 +222,7 @@
 		return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
 	}
 
-	if _, err := d.upgradeIfOutdated(ctx); err != nil {
+	if _, err := d.updateSchemaMetadata(ctx); err != nil {
 		return err
 	}
 
@@ -243,7 +243,10 @@
 	d.crState.disconnect()
 }
 
-func (d *database) upgradeIfOutdated(ctx *context.T) (bool, error) {
+// updateSchemaMetadata reads the current SchemaMetadata from db and checks
+// if the SchemaMetadata provided by the app is newer. If so, it updates the
+// db with the new SchemaMetadata.
+func (d *database) updateSchemaMetadata(ctx *context.T) (bool, error) {
 	var schema *Schema = d.schema
 	schemaMgr := d.getSchemaManager()
 	currMeta, err := schemaMgr.getSchemaMetadata(ctx)
@@ -267,21 +270,9 @@
 	if currMeta.Version >= schema.Metadata.Version {
 		return false, nil
 	}
-	// Call the Upgrader provided by the app to upgrade the schema.
-	//
-	// TODO(jlodhia): disable sync before running Upgrader and reenable
-	// once Upgrader is finished.
-	//
-	// TODO(jlodhia): prevent other processes (local/remote) from accessing
-	// the database while upgrade is in progress.
-	upgradeErr := schema.Upgrader.Run(d, currMeta.Version, schema.Metadata.Version)
-	if upgradeErr != nil {
-		vlog.Error(upgradeErr)
-		return false, upgradeErr
-	}
+
 	// Update the schema metadata in db to the latest version.
-	metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
-	if metadataErr != nil {
+	if metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata); metadataErr != nil {
 		vlog.Error(metadataErr)
 		return false, metadataErr
 	}
diff --git a/syncbase/nosql/model.go b/syncbase/nosql/model.go
index 0c475af..4d76141 100644
--- a/syncbase/nosql/model.go
+++ b/syncbase/nosql/model.go
@@ -140,13 +140,11 @@
 	Blob(br wire.BlobRef) Blob
 
 	// EnforceSchema compares the current schema version of the database
-	// with the schema version provided while creating this database handle. If
-	// the current database schema version is lower, then the SchemaUpdater is
-	// called. If SchemaUpdater is successful this method stores the new schema
-	// metadata in database.
+	// with the schema version provided while creating this database handle and
+	// updates the schema metadata if required.
 	// This method also registers a conflict resolver with syncbase to receive
-	// conflicts. Note: schema can be nil, in which case this method skips
-	// schema check and the caller is responsible for maintaining schema sanity.
+	// conflicts. Note: schema can be nil, in which case this method should not
+	// be called and the caller is responsible for maintaining schema sanity.
 	EnforceSchema(ctx *context.T) error
 }
 
@@ -558,27 +556,18 @@
 	Cancel()
 }
 
-// SchemaUpgrader interface must be implemented by the App in order to upgrade
-// the database schema from a lower version to a higher version.
-type SchemaUpgrader interface {
-	// Takes an instance of database and upgrades data from old
-	// schema to new schema. This method must be idempotent.
-	Run(db Database, oldVersion, newVersion int32) error
-}
-
 // Each database has a Schema associated with it which defines the current
 // version of the database. When a new version of app wishes to change
 // its data in a way that it is not compatible with the old app's data,
-// the app must change the schema version and provide relevant upgrade logic
-// in the Upgrader. The conflict resolution rules are also associated with the
+// the app must change the schema version and perform relevant upgrade logic.
+// The conflict resolution rules are also associated with the
 // schema version. Hence if the conflict resolution rules change then the schema
 // version also must be bumped.
 //
-// Schema provides metadata and a SchemaUpgrader for a given database.
-// SchemaUpgrader is purely local and not persisted.
+// Schema provides metadata and a ConflictResolver for a given database.
+// ConflictResolver is purely local and not persisted.
 type Schema struct {
 	Metadata wire.SchemaMetadata
-	Upgrader SchemaUpgrader
 	Resolver ConflictResolver
 }
 
diff --git a/syncbase/nosql/schema.go b/syncbase/nosql/schema.go
index 1e82324..10f54d8 100644
--- a/syncbase/nosql/schema.go
+++ b/syncbase/nosql/schema.go
@@ -9,14 +9,6 @@
 	wire "v.io/v23/services/syncbase/nosql"
 )
 
-// NewSchema creates a new Schema object.
-func NewSchema(metadata wire.SchemaMetadata, upgrader SchemaUpgrader) *Schema {
-	return &Schema{
-		Metadata: metadata,
-		Upgrader: upgrader,
-	}
-}
-
 //////////////////////////////////////////
 // Implementation of SchemaManager (Not part of public client API)
 
diff --git a/syncbase/nosql/schema_test.go b/syncbase/nosql/schema_test.go
index 1d0beb8..740a5f8 100644
--- a/syncbase/nosql/schema_test.go
+++ b/syncbase/nosql/schema_test.go
@@ -19,29 +19,20 @@
 // This test as following steps:
 // 1) Call NoSQLDatabase() for a non existent db.
 // 2) Create the database, and verify if Schema got stored properly.
-// 3) Call EnforceSchema() to make sure that the method is no-op and is
-//    able to read the schema from db.
-// 4) Call NoSQLDatabase() on the same db to create a new handle with an
-//    upgraded schema, call EnforceSchema() and check if SchemaUpgrader
-//    is called and if the new schema is stored appropriately.
+// 3) Call NoSQLDatabase() on the same db to create a new handle with new
+//    schema metadata, call EnforceSchema() and check if the new metadata was
+//    stored appropriately.
 func TestSchemaCheck(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
 	defer cleanup()
 	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
 	schema := tu.DefaultSchema(0)
-	mockUpgrader := schema.Upgrader.(*tu.MockSchemaUpgrader)
 
 	db1 := a.NoSQLDatabase("db1", schema)
 
-	// Verify that calling Upgrade on a non existing database does not throw
-	// errors.
-	err := db1.EnforceSchema(ctx)
-	if err != nil {
+	if err := db1.EnforceSchema(ctx); err != nil {
 		t.Fatalf("db1.EnforceSchema() failed: %v", err)
 	}
-	if mockUpgrader.CallCount > 0 {
-		t.Fatal("Call to upgrader was not expected.")
-	}
 
 	// Create db1, this step also stores the schema provided above
 	if err := db1.Create(ctx, nil); err != nil {
@@ -52,14 +43,6 @@
 		t.Fatalf("Failed to lookup schema after create: %v", err)
 	}
 
-	// Make redundant call to Upgrade to verify that it is a no-op
-	if err := db1.EnforceSchema(ctx); err != nil {
-		t.Fatalf("db1.EnforceSchema() failed: %v", err)
-	}
-	if mockUpgrader.CallCount > 0 {
-		t.Fatal("Call to upgrader was not expected.")
-	}
-
 	// try to make a new database object for the same database but this time
 	// with a new schema version
 	schema.Metadata.Version = 1
@@ -72,14 +55,11 @@
 	if err := otherdb1.EnforceSchema(ctx); err != nil {
 		t.Fatalf("otherdb1.EnforceSchema() failed: %v", err)
 	}
-	if mockUpgrader.CallCount != 1 {
-		t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
-	}
 
 	// check if the contents of SchemaMetadata are correctly stored in the db.
-	metadata, err3 := getSchemaMetadata(ctx, otherdb1.FullName())
-	if err3 != nil {
-		t.Fatalf("GetSchemaMetadata failed: %v", err3)
+	metadata, err := getSchemaMetadata(ctx, otherdb1.FullName())
+	if err != nil {
+		t.Fatalf("GetSchemaMetadata failed: %v", err)
 	}
 	if metadata.Version != 1 {
 		t.Fatalf("Unexpected version number: %d", metadata.Version)
diff --git a/syncbase/nosql/syncgroup_v23_test.go b/syncbase/nosql/syncgroup_v23_test.go
index 459e394..65fa6dd 100644
--- a/syncbase/nosql/syncgroup_v23_test.go
+++ b/syncbase/nosql/syncgroup_v23_test.go
@@ -10,6 +10,7 @@
 	"os"
 	"reflect"
 	"runtime/debug"
+	"sort"
 	"strings"
 	"testing"
 	"time"
@@ -18,9 +19,11 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/services/watch"
 	"v.io/v23/syncbase"
 	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 	_ "v.io/x/ref/runtime/factories/generic"
 	constants "v.io/x/ref/services/syncbase/server/util"
 	"v.io/x/ref/test/v23test"
@@ -83,8 +86,10 @@
 
 	server1Creds := sh.ForkCredentials("s1")
 	client1Ctx := sh.ForkContext("c1")
+	// TODO(aghassemi): Resolve permission is currently needed for Watch.
+	// See https://github.com/vanadium/issues/issues/1110
 	sh.StartSyncbase(server1Creds, "sync1", "",
-		`{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
+		`{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
 
 	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
 
@@ -94,8 +99,11 @@
 	ok(t, runPopulateNonVomData(client0Ctx, "sync0", "bar", 0))
 
 	ok(t, runSetupAppA(client1Ctx, "sync1"))
+	beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
+	ok(t, err)
 	ok(t, runJoinSyncgroup(client1Ctx, "sync1", sgName))
 	ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
+	ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
 	ok(t, runVerifySyncgroupNonVomData(client1Ctx, "sync1", "bar", 0, 10))
 }
 
@@ -120,8 +128,10 @@
 
 	server1Creds := sh.ForkCredentials("s1")
 	client1Ctx := sh.ForkContext("c1")
+	// TODO(aghassemi): Resolve permission is currently needed for Watch.
+	// See https://github.com/vanadium/issues/issues/1110
 	sh.StartSyncbase(server1Creds, "sync1", "",
-		`{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
+		`{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
 
 	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
 
@@ -130,12 +140,18 @@
 	ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
 
 	ok(t, runSetupAppA(client1Ctx, "sync1"))
+	beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
+	ok(t, err)
 	ok(t, runJoinSyncgroup(client1Ctx, "sync1", sgName))
 	ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
+	ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
 
+	beforeDeleteMarker, err := getResumeMarker(client1Ctx, "sync1")
+	ok(t, err)
 	ok(t, runDeleteData(client0Ctx, "sync0", 0))
 	ok(t, runVerifyDeletedData(client0Ctx, "sync0", "foo"))
 	ok(t, runVerifyDeletedData(client1Ctx, "sync1", "foo"))
+	ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 5, true, beforeDeleteMarker))
 
 	ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
 	ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "bar", 0, 10, false))
@@ -607,7 +623,6 @@
 func runJoinSyncgroup(ctx *context.T, serviceName, sgName string) error {
 	a := syncbase.NewService(serviceName).App("a")
 	d := a.NoSQLDatabase("d", nil)
-
 	sg := d.Syncgroup(sgName)
 	info := wire.SyncgroupMemberInfo{SyncPriority: 10}
 	if _, err := sg.Join(ctx, info); err != nil {
@@ -831,6 +846,64 @@
 	return nil
 }
 
+func runVerifySyncgroupDataWithWatch(ctx *context.T, serviceName, keyPrefix string, count int, expectDelete bool, beforeSyncMarker watch.ResumeMarker) error {
+	if count == 0 {
+		return fmt.Errorf("count cannot be 0: got %d", count)
+	}
+	a := syncbase.NewService(serviceName).App("a")
+	d := a.NoSQLDatabase("d", nil)
+	ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
+	defer cancel()
+
+	stream, err := d.Watch(ctxWithTimeout, testTable, keyPrefix, beforeSyncMarker)
+	if err != nil {
+		return fmt.Errorf("watch error: %v\n", err)
+	}
+
+	var changes []nosql.WatchChange
+	for i := 0; stream.Advance() && i < count; i++ {
+		if err := stream.Err(); err != nil {
+			return fmt.Errorf("watch stream error: %v\n", err)
+		}
+		changes = append(changes, stream.Change())
+	}
+
+	sort.Sort(ByRow(changes))
+
+	if got, want := len(changes), count; got != want {
+		return fmt.Errorf("unexpected number of changes: got %d, want %d", got, want)
+	}
+
+	for i, change := range changes {
+		if got, want := change.Table, "tb"; got != want {
+			return fmt.Errorf("unexpected watch table: got %q, want %q", got, want)
+		}
+		if got, want := change.Row, fmt.Sprintf("%s%d", keyPrefix, i); got != want {
+			return fmt.Errorf("unexpected watch row: got %q, want %q", got, want)
+		}
+		if got, want := change.FromSync, true; got != want {
+			return fmt.Errorf("unexpected FromSync value: got %t, want %t", got, want)
+		}
+		if expectDelete {
+			if got, want := change.ChangeType, nosql.DeleteChange; got != want {
+				return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+			}
+			return nil
+		}
+		var result string
+		if got, want := change.ChangeType, nosql.PutChange; got != want {
+			return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+		}
+		if err := vom.Decode(change.ValueBytes, &result); err != nil {
+			return fmt.Errorf("couldn't decode watch value: %v", err)
+		}
+		if got, want := result, fmt.Sprintf("testkey%s%d", keyPrefix, i); got != want {
+			return fmt.Errorf("unexpected watch value: got %q, want %q", got, want)
+		}
+	}
+	return nil
+}
+
 func runVerifySyncgroupNonVomData(ctx *context.T, serviceName, keyPrefix string, start, count uint64) error {
 	a := syncbase.NewService(serviceName).App("a")
 	d := a.NoSQLDatabase("d", nil)
@@ -1232,6 +1305,12 @@
 	return nil
 }
 
+func getResumeMarker(ctx *context.T, serviceName string) (watch.ResumeMarker, error) {
+	a := syncbase.NewService(serviceName).App("a")
+	d := a.NoSQLDatabase("d", nil)
+	return d.GetResumeMarker(ctx)
+}
+
 func ok(t *testing.T, err error) {
 	if err != nil {
 		debug.PrintStack()
@@ -1242,3 +1321,10 @@
 func TestMain(m *testing.M) {
 	v23test.TestMain(m)
 }
+
+// ByRow implements sort.Interface for []nosql.WatchChange based on the Row field.
+type ByRow []nosql.WatchChange
+
+func (c ByRow) Len() int           { return len(c) }
+func (c ByRow) Swap(i, j int)      { c[i], c[j] = c[j], c[i] }
+func (c ByRow) Less(i, j int) bool { return c[i].Row < c[j].Row }
diff --git a/vdl/.api b/vdl/.api
index bdddd3e..202f666 100644
--- a/vdl/.api
+++ b/vdl/.api
@@ -243,6 +243,7 @@
 pkg vdl, var Int32Type *Type
 pkg vdl, var Int64Type *Type
 pkg vdl, var Int8Type *Type
+pkg vdl, var RawBytesTargetFunc func(rv reflect.Value) Target
 pkg vdl, var StringType *Type
 pkg vdl, var TypeObjectType *Type
 pkg vdl, var Uint16Type *Type
diff --git a/vdl/convert.go b/vdl/convert.go
index 858dc68..3da523c 100644
--- a/vdl/convert.go
+++ b/vdl/convert.go
@@ -533,6 +533,11 @@
 
 // FromNil implements the Target interface method.
 func (c convTarget) FromNil(tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromNil(tt)
+		}
+	}
 	if !compatible(c.tt, tt) {
 		return fmt.Errorf("types %q and %q aren't compatible", c.tt, tt)
 	}
@@ -581,6 +586,11 @@
 
 // FromBool implements the Target interface method.
 func (c convTarget) FromBool(src bool, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromBool(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -593,6 +603,11 @@
 
 // FromUint implements the Target interface method.
 func (c convTarget) FromUint(src uint64, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromUint(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -605,6 +620,11 @@
 
 // FromInt implements the Target interface method.
 func (c convTarget) FromInt(src int64, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromInt(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -617,6 +637,11 @@
 
 // FromFloat implements the Target interface method.
 func (c convTarget) FromFloat(src float64, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromFloat(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -629,6 +654,11 @@
 
 // FromComplex implements the Target interface method.
 func (c convTarget) FromComplex(src complex128, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromComplex(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -641,6 +671,11 @@
 
 // FromBytes implements the Target interface method.
 func (c convTarget) FromBytes(src []byte, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromBytes(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -653,6 +688,11 @@
 
 // FromString implements the Target interface method.
 func (c convTarget) FromString(src string, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromString(src, tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	if err != nil {
 		return err
@@ -665,11 +705,21 @@
 
 // FromEnumLabel implements the Target interface method.
 func (c convTarget) FromEnumLabel(src string, tt *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromEnumLabel(src, tt)
+		}
+	}
 	return c.FromString(src, tt)
 }
 
 // FromTypeObject implements the Target interface method.
 func (c convTarget) FromTypeObject(src *Type) error {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.FromTypeObject(src)
+		}
+	}
 	fin, fill, err := startConvert(c, TypeObjectType)
 	if err != nil {
 		return err
@@ -1001,8 +1051,20 @@
 	return fmt.Errorf("invalid conversion from typeobject to %v", c.tt)
 }
 
+// An interface used to identify that a given target is a raw bytes target.
+// This is used to special case conversion to RawBytes without having the
+// vdl package have a dependency on vom.
+type rbTarget interface {
+	RawBytesTargetHack()
+}
+
 // StartList implements the Target interface method.
 func (c convTarget) StartList(tt *Type, len int) (ListTarget, error) {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.StartList(tt, len)
+		}
+	}
 	// TODO(bprosnitz) Re-think allocation strategy and possibly use len (currently unused).
 	fin, fill, err := startConvert(c, tt)
 	return compConvTarget{fin, fill}, err
@@ -1010,42 +1072,69 @@
 
 // FinishList implements the Target interface method.
 func (c convTarget) FinishList(x ListTarget) error {
+	if rbTarg, ok := x.(rbTarget); ok {
+		return rbTarg.(Target).FinishList(x)
+	}
 	cc := x.(compConvTarget)
 	return finishConvert(cc.fin, cc.fill)
 }
 
 // StartSet implements the Target interface method.
 func (c convTarget) StartSet(tt *Type, len int) (SetTarget, error) {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.StartSet(tt, len)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	return compConvTarget{fin, fill}, err
 }
 
 // FinishSet implements the Target interface method.
 func (c convTarget) FinishSet(x SetTarget) error {
+	if rbTarg, ok := x.(rbTarget); ok {
+		return rbTarg.(Target).FinishSet(x)
+	}
 	cc := x.(compConvTarget)
 	return finishConvert(cc.fin, cc.fill)
 }
 
 // StartMap implements the Target interface method.
 func (c convTarget) StartMap(tt *Type, len int) (MapTarget, error) {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.StartMap(tt, len)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	return compConvTarget{fin, fill}, err
 }
 
 // FinishMap implements the Target interface method.
 func (c convTarget) FinishMap(x MapTarget) error {
+	if rbTarg, ok := x.(rbTarget); ok {
+		return rbTarg.(Target).FinishMap(x)
+	}
 	cc := x.(compConvTarget)
 	return finishConvert(cc.fin, cc.fill)
 }
 
 // StartFields implements the Target interface method.
 func (c convTarget) StartFields(tt *Type) (FieldsTarget, error) {
+	if RawBytesTargetFunc != nil && c.vv == nil {
+		if target := RawBytesTargetFunc(c.rv); target != nil {
+			return target.StartFields(tt)
+		}
+	}
 	fin, fill, err := startConvert(c, tt)
 	return compConvTarget{fin, fill}, err
 }
 
 // FinishFields implements the Target interface method.
 func (c convTarget) FinishFields(x FieldsTarget) error {
+	if rbTarg, ok := x.(rbTarget); ok {
+		return rbTarg.(Target).FinishFields(x)
+	}
 	cc := x.(compConvTarget)
 	return finishConvert(cc.fin, cc.fill)
 }
diff --git a/vdl/reflect_type.go b/vdl/reflect_type.go
index 5e40f86..3371895 100644
--- a/vdl/reflect_type.go
+++ b/vdl/reflect_type.go
@@ -127,6 +127,10 @@
 
 var rtTargetConvertible = reflect.TypeOf([]rawBytesTargetConvertible{}).Elem()
 
+// Hook for function that returns a Target that writes to a vom.RawBytes.
+// See vom.rbTarget.
+var RawBytesTargetFunc func(rv reflect.Value) Target
+
 // basicType returns the *Type corresponding to rt for basic types that cannot
 // be named by the user, and have a well-known conversion.
 func basicType(rt reflect.Type) *Type {
diff --git a/vdl/target.go b/vdl/target.go
index 555ec4b..dbc682b 100644
--- a/vdl/target.go
+++ b/vdl/target.go
@@ -216,6 +216,13 @@
 		}
 		rv = rv.Elem()
 	}
+
+	if reflect.PtrTo(rv.Type()).Implements(rtTargetConvertible) {
+		rvPtr := reflect.New(rv.Type())
+		rvPtr.Elem().Set(rv)
+		return rvPtr.Interface().(rawBytesTargetConvertible).ToTarget(target)
+	}
+
 	// Handle special-case for errors.
 	if rv.Type().ConvertibleTo(rtError) {
 		return fromError(target, rv)
diff --git a/vom/.api b/vom/.api
index c61c5c0..34893b5 100644
--- a/vom/.api
+++ b/vom/.api
@@ -2,7 +2,6 @@
 pkg vom, const ControlKindIncompleteType ControlKind
 pkg vom, const ControlKindNil ControlKind
 pkg vom, const DefaultVersion Version
-pkg vom, const DefaultVersionWithRawBytesSupport Version
 pkg vom, const DumpKindAnyLensLen DumpKind
 pkg vom, const DumpKindAnyMsgLen DumpKind
 pkg vom, const DumpKindByteLen DumpKind
diff --git a/vom/decoder_test.go b/vom/decoder_test.go
index 2c1d360..a81339e 100644
--- a/vom/decoder_test.go
+++ b/vom/decoder_test.go
@@ -250,6 +250,8 @@
 		{types.YzStruct{}, types.YzStruct{}},
 		{types.MBool(false), types.MBool(false)},
 		{types.NString(""), types.NString("")},
+		{vdl.ValueOf(uint16(5)), vdl.ValueOf(uint16(5))},
+		{vdl.ValueOf([]interface{}{uint16(5)}).Index(0), vdl.ValueOf(uint16(5))},
 	}
 
 	var (
diff --git a/vom/dump.go b/vom/dump.go
index 7e5647e..51f0882 100644
--- a/vom/dump.go
+++ b/vom/dump.go
@@ -868,6 +868,7 @@
 				d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", err)
 				return err
 			}
+			d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", elemType)
 			if d.version >= Version81 {
 				d.prepareAtom("waiting for any message length index")
 				switch index, ctrl, err := binaryDecodeUintWithControl(d.buf); {
@@ -883,7 +884,6 @@
 					return d.decodeValue(elemType, target)
 				}
 			}
-			d.writeAtom(DumpKindTypeId, PrimitivePUint{id}, "%v", elemType)
 			return d.decodeValue(elemType, target)
 		}
 	default:
diff --git a/vom/encoder.go b/vom/encoder.go
index 2b17ba4..aa7b538 100644
--- a/vom/encoder.go
+++ b/vom/encoder.go
@@ -19,8 +19,7 @@
 	Version80 = Version(0x80)
 	Version81 = Version(0x81)
 
-	DefaultVersion                    = Version80
-	DefaultVersionWithRawBytesSupport = Version81 // TODO(bprosnitz) Remove once switch to 81 being default
+	DefaultVersion = Version81
 )
 
 func (v Version) String() string {
@@ -103,7 +102,7 @@
 // NewEncoder returns a new Encoder that writes to the given writer in the
 // binary format. The binary format is compact and fast.
 func NewEncoder(w io.Writer) *Encoder {
-	return NewVersionedEncoder(Version80, w)
+	return NewVersionedEncoder(DefaultVersion, w)
 }
 
 // NewVersionedEncoder returns a new Encoder that writes to the given writer with
@@ -126,7 +125,7 @@
 // writer in the binary format. Types will be encoded separately through the
 // given typeEncoder.
 func NewEncoderWithTypeEncoder(w io.Writer, typeEnc *TypeEncoder) *Encoder {
-	return NewVersionedEncoderWithTypeEncoder(Version80, w, typeEnc)
+	return NewVersionedEncoderWithTypeEncoder(DefaultVersion, w, typeEnc)
 }
 
 // NewVersionedEncoderWithTypeEncoder returns a new Encoder that writes to the given
@@ -205,7 +204,11 @@
 	rv := reflect.ValueOf(v)
 	for rv.Kind() == reflect.Ptr && !rv.IsNil() {
 		if rv.Type().ConvertibleTo(rtPtrToValue) {
-			return rv.Convert(rtPtrToValue).Interface().(*vdl.Value).Type()
+			vv := rv.Convert(rtPtrToValue).Interface().(*vdl.Value)
+			if vv.Kind() == vdl.Any && vv.Elem().IsValid() {
+				vv = vv.Elem()
+			}
+			return vv.Type()
 		}
 		if rv.Type().ConvertibleTo(rtPtrToRawBytes) {
 			return rv.Convert(rtPtrToRawBytes).Interface().(*RawBytes).Type
@@ -897,16 +900,15 @@
 
 func (l *anyLenList) StartAny(startMarker uint64) *anyStartRef {
 	l.lens = append(l.lens, 0)
+	index := uint64(len(l.lens) - 1)
 	return &anyStartRef{
-		index:  uint64(len(l.lens) - 1),
-		marker: startMarker,
+		index:  index,
+		marker: startMarker + lenUint(index),
 	}
 }
 
 func (l *anyLenList) FinishAny(start *anyStartRef, endMarker uint64) {
-	lenIncLenBytes := endMarker - start.marker
-	len := lenIncLenBytes - lenUint(lenIncLenBytes)
-	l.lens[start.index] = len
+	l.lens[start.index] = endMarker - start.marker
 }
 
 func (l *anyLenList) Reset() error {
diff --git a/vom/internal/perf_test.go b/vom/internal/perf_test.go
index c6f00b2..8512cf2 100644
--- a/vom/internal/perf_test.go
+++ b/vom/internal/perf_test.go
@@ -153,7 +153,7 @@
 // vom.Encoder.Encode RawBytes benchmarking
 func vomEncoderEncodeRawBytes(t testing.TB, n int, rv *vom.RawBytes) {
 	var buf bytes.Buffer
-	encoder := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf)
+	encoder := vom.NewEncoder(&buf)
 	for j := 0; j != n; j++ {
 		err := encoder.Encode(rv)
 		if err != nil {
@@ -163,7 +163,7 @@
 }
 
 func makeCustomerRawBytes(tb testing.TB) *vom.RawBytes {
-	bytes, err := vom.VersionedEncode(vom.DefaultVersionWithRawBytesSupport, customer)
+	bytes, err := vom.Encode(customer)
 	if err != nil {
 		tb.Fatalf("vom.Encode failed: %v", err)
 	}
@@ -208,7 +208,7 @@
 	data := buf.Bytes()
 	// TODO(bprosnitz) remove the encoder below when we switch to version 81
 	var buf81 bytes.Buffer
-	encoder81 := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf81)
+	encoder81 := vom.NewEncoder(&buf81)
 	for j := 0; j != 1000; j++ {
 		err := encoder81.Encode(customer)
 		if err != nil {
@@ -268,7 +268,7 @@
 
 func BenchmarkVomDecoder1000RawBytes(b *testing.B) {
 	var buf bytes.Buffer
-	encoder := vom.NewVersionedEncoder(vom.DefaultVersionWithRawBytesSupport, &buf)
+	encoder := vom.NewEncoder(&buf)
 	for j := 0; j != 1000; j++ {
 		err := encoder.Encode(customer)
 		if err != nil {
diff --git a/vom/raw_bytes.go b/vom/raw_bytes.go
index 7f95643..5d510c0 100644
--- a/vom/raw_bytes.go
+++ b/vom/raw_bytes.go
@@ -29,7 +29,7 @@
 
 func RawBytesFromValue(value interface{}) (*RawBytes, error) {
 	// TODO(bprosnitz) This implementation is temporary - we should make it faster
-	dat, err := VersionedEncode(DefaultVersionWithRawBytesSupport, value)
+	dat, err := Encode(value)
 	if err != nil {
 		return nil, err
 	}
@@ -40,7 +40,7 @@
 
 func (rb *RawBytes) ToValue(value interface{}) error {
 	// TODO(bprosnitz) This implementation is temporary - we should make it faster
-	dat, err := VersionedEncode(DefaultVersionWithRawBytesSupport, rb)
+	dat, err := Encode(rb)
 	if err != nil {
 		return err
 	}
@@ -60,3 +60,275 @@
 type rvHackInterface interface {
 	HackGetRv() reflect.Value
 }
+
+var rbType reflect.Type = reflect.TypeOf((*RawBytes)(nil))
+
+// If rv is a raw bytes pointer, return a vdl target that
+// modifies it. Otherwise return nil.
+func makeRawBytesTarget(rv reflect.Value) vdl.Target {
+	if rv.Type() == rbType {
+		rv.Set(reflect.ValueOf(&RawBytes{}))
+		return &rbTarget{rb: rv.Interface().(*RawBytes)}
+	}
+	return nil
+}
+
+func init() {
+	vdl.RawBytesTargetFunc = makeRawBytesTarget
+}
+
+// vdl.Target that writes to a vom.RawBytes.
+// This structure is intended to be one-time-use and
+// created from makeRawBytesTarget.
+type rbTarget struct {
+	rb         *RawBytes     // RawBytes to write to
+	enc        *encoder      // encoder to act as the underlying target
+	buf        *bytes.Buffer // buffer of bytes written
+	startCount int           // count the depth level of open calls, to determine when to write to rb
+}
+
+func (r *rbTarget) start(tt *vdl.Type) error {
+	r.startCount++
+	if r.startCount > 1 {
+		return nil
+	}
+
+	r.buf = bytes.NewBuffer(nil)
+	e := NewEncoder(r.buf)
+	r.enc = &e.enc
+	if _, err := r.enc.writer.Write([]byte{byte(r.enc.version)}); err != nil {
+		return err
+	}
+	r.enc.sentVersionByte = true
+	tid, err := r.enc.typeEnc.encode(tt)
+	if err != nil {
+		return err
+	}
+	if err := r.enc.startEncode(containsAny(tt), containsTypeObject(tt), hasChunkLen(tt), false, int64(tid)); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (r *rbTarget) finish() error {
+	r.startCount--
+	if r.startCount > 0 {
+		return nil
+	}
+
+	if err := r.enc.finishEncode(); err != nil {
+		return err
+	}
+	err := Decode(r.buf.Bytes(), r.rb)
+	return err
+}
+
+func (r *rbTarget) FromBool(src bool, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromBool(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromUint(src uint64, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromUint(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromInt(src int64, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromInt(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromFloat(src float64, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromFloat(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromComplex(src complex128, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromComplex(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromBytes(src []byte, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromBytes(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromString(src string, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromString(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromEnumLabel(src string, tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromEnumLabel(src, tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromTypeObject(src *vdl.Type) error {
+	if err := r.start(vdl.TypeObjectType); err != nil {
+		return err
+	}
+	if err := r.enc.FromTypeObject(src); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) FromNil(tt *vdl.Type) error {
+	if err := r.start(tt); err != nil {
+		return err
+	}
+	if err := r.enc.FromNil(tt); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) StartList(tt *vdl.Type, len int) (vdl.ListTarget, error) {
+	if err := r.start(tt); err != nil {
+		return nil, err
+	}
+	if _, err := r.enc.StartList(tt, len); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishList(_ vdl.ListTarget) error {
+	if err := r.enc.FinishList(nil); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) StartSet(tt *vdl.Type, len int) (vdl.SetTarget, error) {
+	if err := r.start(tt); err != nil {
+		return nil, err
+	}
+	if _, err := r.enc.StartSet(tt, len); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishSet(_ vdl.SetTarget) error {
+	if err := r.enc.FinishSet(nil); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) StartMap(tt *vdl.Type, len int) (vdl.MapTarget, error) {
+	if err := r.start(tt); err != nil {
+		return nil, err
+	}
+	if _, err := r.enc.StartMap(tt, len); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishMap(_ vdl.MapTarget) error {
+	if err := r.enc.FinishMap(nil); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error) {
+	if err := r.start(tt); err != nil {
+		return nil, err
+	}
+	if _, err := r.enc.StartFields(tt); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishFields(_ vdl.FieldsTarget) error {
+	if err := r.enc.FinishFields(nil); err != nil {
+		return err
+	}
+	return r.finish()
+}
+
+func (r *rbTarget) StartElem(index int) (vdl.Target, error) {
+	if _, err := r.enc.StartElem(index); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishElem(_ vdl.Target) error {
+	return r.enc.FinishElem(nil)
+}
+
+func (r *rbTarget) StartKey() (vdl.Target, error) {
+	if _, err := r.enc.StartKey(); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) FinishKey(_ vdl.Target) error {
+	return r.enc.FinishKey(nil)
+}
+
+func (r *rbTarget) FinishKeyStartField(_ vdl.Target) (vdl.Target, error) {
+	if _, err := r.enc.FinishKeyStartField(nil); err != nil {
+		return nil, err
+	}
+	return r, nil
+}
+
+func (r *rbTarget) StartField(key string) (vdl.Target, vdl.Target, error) {
+	if _, _, err := r.enc.StartField(key); err != nil {
+		return nil, nil, err
+	}
+	return r, r, nil
+}
+
+func (r *rbTarget) FinishField(_, _ vdl.Target) error {
+	return r.enc.FinishField(nil, nil)
+}
+
+func (r *rbTarget) RawBytesTargetHack() {}
diff --git a/vom/raw_bytes_test.go b/vom/raw_bytes_test.go
index 7f63898..92916e9 100644
--- a/vom/raw_bytes_test.go
+++ b/vom/raw_bytes_test.go
@@ -15,12 +15,6 @@
 	"v.io/v23/vom/testdata/types"
 )
 
-func TestVdlTypeOfRawBytes(t *testing.T) {
-	if got, want := vdl.TypeOf(&RawBytes{}), vdl.AnyType; got != want {
-		t.Errorf("got %v, want %v", got, want)
-	}
-}
-
 type testUint64 uint64
 
 type structTypeObject struct {
@@ -48,7 +42,7 @@
 		name:    "testUint64(99)",
 		goValue: testUint64(99),
 		rawBytes: RawBytes{
-			Version: DefaultVersionWithRawBytesSupport,
+			Version: DefaultVersion,
 			Type:    vdl.TypeOf(testUint64(0)),
 			Data:    []byte{0x63},
 		},
@@ -57,7 +51,7 @@
 		name:    "typeobject(int32)",
 		goValue: vdl.Int32Type,
 		rawBytes: RawBytes{
-			Version:  DefaultVersionWithRawBytesSupport,
+			Version:  DefaultVersion,
 			Type:     vdl.TypeOf(vdl.Int32Type),
 			RefTypes: []*vdl.Type{vdl.Int32Type},
 			Data:     []byte{0x00},
@@ -67,7 +61,7 @@
 		name:    "structTypeObject{typeobject(int32)}",
 		goValue: structTypeObject{vdl.Int32Type},
 		rawBytes: RawBytes{
-			Version:  DefaultVersionWithRawBytesSupport,
+			Version:  DefaultVersion,
 			Type:     vdl.TypeOf(structTypeObject{}),
 			RefTypes: []*vdl.Type{vdl.Int32Type},
 			Data:     []byte{0x00, 0x00, WireCtrlEnd},
@@ -78,19 +72,19 @@
 		goValue: structAnyAndTypes{
 			vdl.Int32Type,
 			&RawBytes{
-				Version: DefaultVersionWithRawBytesSupport,
+				Version: DefaultVersion,
 				Type:    vdl.BoolType,
 				Data:    []byte{0x01},
 			},
 			vdl.BoolType,
 			&RawBytes{
-				Version: DefaultVersionWithRawBytesSupport,
+				Version: DefaultVersion,
 				Type:    vdl.TypeOf(""),
 				Data:    []byte{0x03, 0x61, 0x62, 0x63},
 			},
 		},
 		rawBytes: RawBytes{
-			Version:    DefaultVersionWithRawBytesSupport,
+			Version:    DefaultVersion,
 			Type:       vdl.TypeOf(structAnyAndTypes{}),
 			RefTypes:   []*vdl.Type{vdl.Int32Type, vdl.BoolType, vdl.StringType},
 			AnyLengths: []uint64{1, 4},
@@ -104,23 +98,58 @@
 		},
 	},
 	{
+		name:    "large message", // to test that multibyte length is encoded properly
+		goValue: makeLargeBytes(1000),
+		rawBytes: RawBytes{
+			Version: DefaultVersion,
+			Type:    vdl.ListType(vdl.ByteType),
+			Data:    append([]byte{0xfe, 0x03, 0xe8}, makeLargeBytes(1000)...),
+		},
+	},
+	{
+		name:    "*vdl.Value",
+		goValue: vdl.ValueOf(uint16(5)),
+		rawBytes: RawBytes{
+			Version: DefaultVersion,
+			Type:    vdl.Uint16Type,
+			Data:    []byte{0x05},
+		},
+	},
+	{
+		name:    "*vdl.Value - top level any",
+		goValue: vdl.ValueOf([]interface{}{uint16(5)}).Index(0),
+		rawBytes: RawBytes{
+			Version: DefaultVersion,
+			Type:    vdl.Uint16Type,
+			Data:    []byte{0x05},
+		},
+	},
+	{
 		name: "any(nil)",
 		goValue: &RawBytes{
-			Version: DefaultVersionWithRawBytesSupport,
+			Version: DefaultVersion,
 			Type:    vdl.AnyType,
 			Data:    []byte{WireCtrlNil},
 		},
 		rawBytes: RawBytes{
-			Version: DefaultVersionWithRawBytesSupport,
+			Version: DefaultVersion,
 			Type:    vdl.AnyType,
 			Data:    []byte{WireCtrlNil},
 		},
 	},
 }
 
+func makeLargeBytes(size int) []byte {
+	b := make([]byte, size)
+	for i := range b {
+		b[i] = byte(i % 10)
+	}
+	return b
+}
+
 func TestDecodeToRawBytes(t *testing.T) {
 	for _, test := range rawBytesTestCases {
-		bytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, test.goValue)
+		bytes, err := Encode(test.goValue)
 		if err != nil {
 			t.Fatalf("%s: error in encode %v", test.name, err)
 		}
@@ -136,11 +165,11 @@
 
 func TestEncodeFromRawBytes(t *testing.T) {
 	for _, test := range rawBytesTestCases {
-		fullBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, test.goValue)
+		fullBytes, err := Encode(test.goValue)
 		if err != nil {
 			t.Fatalf("%s: error in encode %v", test.name, err)
 		}
-		fullBytesFromRaw, err := VersionedEncode(DefaultVersionWithRawBytesSupport, &test.rawBytes)
+		fullBytesFromRaw, err := Encode(&test.rawBytes)
 		if err != nil {
 			t.Fatalf("%s: error in encode %v", test.name, err)
 		}
@@ -160,7 +189,7 @@
 		name:    "testUint64(99)",
 		goValue: testUint64(99),
 		rawBytes: RawBytes{
-			Version:    DefaultVersionWithRawBytesSupport,
+			Version:    DefaultVersion,
 			Type:       vdl.TypeOf(testUint64(0)),
 			RefTypes:   []*vdl.Type{vdl.TypeOf(testUint64(0))},
 			AnyLengths: []uint64{1},
@@ -171,7 +200,7 @@
 		name:    "typeobject(int32)",
 		goValue: vdl.Int32Type,
 		rawBytes: RawBytes{
-			Version:    DefaultVersionWithRawBytesSupport,
+			Version:    DefaultVersion,
 			Type:       vdl.TypeOf(vdl.Int32Type),
 			RefTypes:   []*vdl.Type{vdl.TypeObjectType, vdl.Int32Type},
 			AnyLengths: []uint64{1},
@@ -182,7 +211,7 @@
 		name:    "structTypeObject{typeobject(int32)}",
 		goValue: structTypeObject{vdl.Int32Type},
 		rawBytes: RawBytes{
-			Version:    DefaultVersionWithRawBytesSupport,
+			Version:    DefaultVersion,
 			Type:       vdl.TypeOf(structTypeObject{}),
 			RefTypes:   []*vdl.Type{vdl.TypeOf(structTypeObject{}), vdl.Int32Type},
 			AnyLengths: []uint64{3},
@@ -194,19 +223,19 @@
 		goValue: structAnyAndTypes{
 			vdl.Int32Type,
 			&RawBytes{
-				Version: DefaultVersionWithRawBytesSupport,
+				Version: DefaultVersion,
 				Type:    vdl.BoolType,
 				Data:    []byte{0x01},
 			},
 			vdl.BoolType,
 			&RawBytes{
-				Version: DefaultVersionWithRawBytesSupport,
+				Version: DefaultVersion,
 				Type:    vdl.TypeOf(""),
 				Data:    []byte{0x03, 0x61, 0x62, 0x63},
 			},
 		},
 		rawBytes: RawBytes{
-			Version:    DefaultVersionWithRawBytesSupport,
+			Version:    DefaultVersion,
 			Type:       vdl.TypeOf(structAnyAndTypes{}),
 			RefTypes:   []*vdl.Type{vdl.TypeOf(structAnyAndTypes{}), vdl.Int32Type, vdl.BoolType, vdl.StringType},
 			AnyLengths: []uint64{16, 1, 4},
@@ -219,13 +248,45 @@
 			},
 		},
 	},
+	{
+		name:    "large message", // to test that multibyte length is encoded properly
+		goValue: makeLargeBytes(1000),
+		rawBytes: RawBytes{
+			Version:    DefaultVersion,
+			Type:       vdl.ListType(vdl.ByteType),
+			RefTypes:   []*vdl.Type{vdl.ListType(vdl.ByteType)},
+			AnyLengths: []uint64{0x3eb},
+			Data:       append([]byte{0xfe, 0x03, 0xe8}, makeLargeBytes(1000)...),
+		},
+	},
+	{
+		name:    "*vdl.Value",
+		goValue: vdl.ValueOf(uint16(5)),
+		rawBytes: RawBytes{
+			Version:    DefaultVersion,
+			Type:       vdl.Uint16Type,
+			RefTypes:   []*vdl.Type{vdl.Uint16Type},
+			AnyLengths: []uint64{1},
+			Data:       []byte{0x05},
+		},
+	},
+	{
+		name:    "*vdl.Value - top level any",
+		goValue: vdl.ValueOf([]interface{}{uint16(5)}).Index(0),
+		rawBytes: RawBytes{
+			Version:    DefaultVersion,
+			Type:       vdl.Uint16Type,
+			RefTypes:   []*vdl.Type{vdl.Uint16Type},
+			AnyLengths: []uint64{1},
+			Data:       []byte{0x05},
+		},
+	},
 }
 
 func TestWrappedRawBytes(t *testing.T) {
 	for i, test := range rawBytesWrappedTestCases {
 		unwrapped := rawBytesTestCases[i]
-
-		wrappedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, structAny{&unwrapped.rawBytes})
+		wrappedBytes, err := Encode(structAny{&unwrapped.rawBytes})
 		if err != nil {
 			t.Fatalf("%s: error in encode %v", test.name, err)
 		}
@@ -233,9 +294,6 @@
 		if err := Decode(wrappedBytes, &any); err != nil {
 			t.Fatalf("%s: error in decode %v", test.name, err)
 		}
-		if any.X.RefTypes[0] != vdl.TypeOf(test.goValue) {
-			t.Errorf("expected type %v to be first in ref list, but was %v", test.goValue, any.X.RefTypes[0])
-		}
 		if !reflect.DeepEqual(any.X, &test.rawBytes) {
 			t.Errorf("%s: got %#v, want %#v", test.name, any.X, &test.rawBytes)
 		}
@@ -244,11 +302,11 @@
 
 func TestEncodeNilRawBytes(t *testing.T) {
 	// Top-level
-	expectedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, vdl.ZeroValue(vdl.AnyType))
+	expectedBytes, err := Encode(vdl.ZeroValue(vdl.AnyType))
 	if err != nil {
 		t.Fatal(err)
 	}
-	encodedBytes, err := VersionedEncode(DefaultVersionWithRawBytesSupport, (*RawBytes)(nil))
+	encodedBytes, err := Encode((*RawBytes)(nil))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -257,11 +315,11 @@
 	}
 
 	// Within an object.
-	expectedBytes, err = VersionedEncode(DefaultVersionWithRawBytesSupport, []*vdl.Value{vdl.ZeroValue(vdl.AnyType)})
+	expectedBytes, err = Encode([]*vdl.Value{vdl.ZeroValue(vdl.AnyType)})
 	if err != nil {
 		t.Fatal(err)
 	}
-	encodedBytes, err = VersionedEncode(DefaultVersionWithRawBytesSupport, []*RawBytes{nil})
+	encodedBytes, err = Encode([]*RawBytes{nil})
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -373,3 +431,48 @@
 		}
 	}
 }
+
+func TestVdlTypeOfRawBytes(t *testing.T) {
+	if got, want := vdl.TypeOf(&RawBytes{}), vdl.AnyType; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func TestVdlValueOfRawBytes(t *testing.T) {
+	for _, test := range rawBytesTestCases {
+		want := vdl.ValueOf(test.goValue)
+		got := vdl.ValueOf(test.rawBytes)
+		if !vdl.EqualValue(got, want) {
+			t.Errorf("vdl.ValueOf(RawBytes) %s: got %v, want %v", test.name, got, want)
+		}
+	}
+}
+
+func TestConvertRawBytes(t *testing.T) {
+	for _, test := range rawBytesTestCases {
+		var rb *RawBytes
+		if err := vdl.Convert(&rb, test.goValue); err != nil {
+			t.Errorf("vdl.Convert %#v to RawBytes: %v", test.goValue, err)
+		}
+		if !reflect.DeepEqual(rb, &test.rawBytes) {
+			t.Errorf("vdl.Convert to RawBytes %s: got %v, want %v", test.name, rb, test.rawBytes)
+		}
+	}
+}
+
+type structAnyInterface struct {
+	X interface{}
+}
+
+func TestConvertRawBytesWrapped(t *testing.T) {
+	for _, test := range rawBytesTestCases {
+		var any structAny
+		if err := vdl.Convert(&any, structAnyInterface{test.goValue}); err != nil {
+			t.Errorf("vdl.Convert %#v to RawBytes: %v", structAnyInterface{test.goValue}, err)
+		}
+		got, want := any, structAny{&test.rawBytes}
+		if !reflect.DeepEqual(got, want) {
+			t.Errorf("vdl.Convert to RawBytes %s: got %v, want %v", test.name, got, want)
+		}
+	}
+}
diff --git a/vom/single_shot.go b/vom/single_shot.go
index 1a58d70..f3a1a08 100644
--- a/vom/single_shot.go
+++ b/vom/single_shot.go
@@ -18,7 +18,7 @@
 // This is a "single-shot" encoding; full type information is always included in
 // the returned encoding, as if a new encoder were used for each call.
 func Encode(v interface{}) ([]byte, error) {
-	return VersionedEncode(Version80, v)
+	return VersionedEncode(DefaultVersion, v)
 }
 
 // VersionedEncode performs single-shot encoding to a specific version of VOM
diff --git a/vom/type_encoder.go b/vom/type_encoder.go
index f8964de..faa305c 100644
--- a/vom/type_encoder.go
+++ b/vom/type_encoder.go
@@ -33,7 +33,7 @@
 // NewTypeEncoder returns a new TypeEncoder that writes types to the given
 // writer in the binary format.
 func NewTypeEncoder(w io.Writer) *TypeEncoder {
-	return NewVersionedTypeEncoder(Version80, w)
+	return NewVersionedTypeEncoder(DefaultVersion, w)
 }
 
 // NewTypeEncoderVersion returns a new TypeEncoder that writes types to the given