v.io/x/ref/services/syncbase: use vom.RawBytes in LogRec struct

The LogRec struct in services/syncbase/server/interfaces/sync_types.vdl
was still using []byte to transfer client data.

This change makes it use "any", which turns into vom.RawBytes, at least in the
Go world.

blob.go:
processBlobRefs()  now takes a *vom.RawBytes and uses ToValue(),
rather than using vom.Decode() on a []byte.

initiator.go:
insertSgRecInDb() and insertRecInDb() now both take a *vom.RawBytes.
The latter has to use vom.Encode() to turn it into a []byte
to put in the database.

replay_test.go:
createReplayStream() now uses vom.RawBytesFromValue() instead of
vom.Encode().

responder.go:
makeWireLogRec() now needs vom.Decode() to make a vom.RawBytes
from the []byte value it got from the database.

responder_test.go:
TestSendDataDeltas() was putting strings into the database without encoding.
Now it uses vom.Encode() to encode them.

watcher.go:
processWatchBlobRefs() now uses vom.Decode() to get vom.RawBytes values
to give to processBlobRefs().

Change-Id: Iaa97db0dd27a2e45f608ebb7474cc845d86a4ea6
diff --git a/services/syncbase/server/interfaces/interfaces.vdl.go b/services/syncbase/server/interfaces/interfaces.vdl.go
index 698a7ec..c1f7cb3 100644
--- a/services/syncbase/server/interfaces/interfaces.vdl.go
+++ b/services/syncbase/server/interfaces/interfaces.vdl.go
@@ -20,6 +20,7 @@
 	"v.io/v23/vdl"
 	vdltime "v.io/v23/vdlroot/time"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 )
 
 var _ = __VDLInit() // Must be first; see __VDLInit comments for details.
@@ -524,7 +525,7 @@
 // metadata and data. Value is the actual value of a store object.
 type LogRec struct {
 	Metadata LogRecMetadata
-	Value    []byte
+	Value    *vom.RawBytes
 }
 
 func (LogRec) __VDLReflect(struct {
@@ -536,7 +537,7 @@
 	if !x.Metadata.VDLIsZero() {
 		return false
 	}
-	if len(x.Value) != 0 {
+	if x.Value != nil && !x.Value.VDLIsZero() {
 		return false
 	}
 	return true
@@ -554,17 +555,11 @@
 			return err
 		}
 	}
-	if len(x.Value) != 0 {
+	if x.Value != nil && !x.Value.VDLIsZero() {
 		if err := enc.NextField("Value"); err != nil {
 			return err
 		}
-		if err := enc.StartValue(__VDLType_list_7); err != nil {
-			return err
-		}
-		if err := enc.EncodeBytes(x.Value); err != nil {
-			return err
-		}
-		if err := enc.FinishValue(); err != nil {
+		if err := x.Value.VDLWrite(enc); err != nil {
 			return err
 		}
 	}
@@ -575,7 +570,9 @@
 }
 
 func (x *LogRec) VDLRead(dec vdl.Decoder) error {
-	*x = LogRec{}
+	*x = LogRec{
+		Value: vom.RawBytesOf(vdl.ZeroValue(vdl.AnyType)),
+	}
 	if err := dec.StartValue(__VDLType_struct_6); err != nil {
 		return err
 	}
@@ -592,7 +589,8 @@
 				return err
 			}
 		case "Value":
-			if err := dec.ReadValueBytes(-1, &x.Value); err != nil {
+			x.Value = new(vom.RawBytes)
+			if err := x.Value.VDLRead(dec); err != nil {
 				return err
 			}
 		default:
@@ -617,7 +615,7 @@
 }
 
 func (x GroupId) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_string_8); err != nil {
+	if err := enc.StartValue(__VDLType_string_7); err != nil {
 		return err
 	}
 	if err := enc.EncodeString(string(x)); err != nil {
@@ -695,7 +693,7 @@
 }
 
 func (x SyncgroupStatus) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_enum_9); err != nil {
+	if err := enc.StartValue(__VDLType_enum_8); err != nil {
 		return err
 	}
 	if err := enc.EncodeString(x.String()); err != nil {
@@ -758,7 +756,7 @@
 }
 
 func (x Syncgroup) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_10); err != nil {
+	if err := enc.StartValue(__VDLType_struct_9); err != nil {
 		return err
 	}
 	if x.Id != (syncbase.Id{}) {
@@ -836,7 +834,7 @@
 }
 
 func __VDLWriteAnon_map_2(enc vdl.Encoder, x map[string]syncbase.SyncgroupMemberInfo) error {
-	if err := enc.StartValue(__VDLType_map_13); err != nil {
+	if err := enc.StartValue(__VDLType_map_12); err != nil {
 		return err
 	}
 	if err := enc.SetLenHint(len(x)); err != nil {
@@ -867,7 +865,7 @@
 
 func (x *Syncgroup) VDLRead(dec vdl.Decoder) error {
 	*x = Syncgroup{}
-	if err := dec.StartValue(__VDLType_struct_10); err != nil {
+	if err := dec.StartValue(__VDLType_struct_9); err != nil {
 		return err
 	}
 	for {
@@ -926,7 +924,7 @@
 }
 
 func __VDLReadAnon_map_2(dec vdl.Decoder, x *map[string]syncbase.SyncgroupMemberInfo) error {
-	if err := dec.StartValue(__VDLType_map_13); err != nil {
+	if err := dec.StartValue(__VDLType_map_12); err != nil {
 		return err
 	}
 	var tmpMap map[string]syncbase.SyncgroupMemberInfo
@@ -974,7 +972,7 @@
 }
 
 func (x CollectionPerms) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_map_15); err != nil {
+	if err := enc.StartValue(__VDLType_map_14); err != nil {
 		return err
 	}
 	if err := enc.SetLenHint(len(x)); err != nil {
@@ -1004,7 +1002,7 @@
 }
 
 func (x *CollectionPerms) VDLRead(dec vdl.Decoder) error {
-	if err := dec.StartValue(__VDLType_map_15); err != nil {
+	if err := dec.StartValue(__VDLType_map_14); err != nil {
 		return err
 	}
 	var tmpMap CollectionPerms
@@ -1054,7 +1052,7 @@
 }
 
 func (x SgDeltaReq) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_17); err != nil {
+	if err := enc.StartValue(__VDLType_struct_16); err != nil {
 		return err
 	}
 	if x.DbId != (syncbase.Id{}) {
@@ -1081,7 +1079,7 @@
 
 func (x *SgDeltaReq) VDLRead(dec vdl.Decoder) error {
 	*x = SgDeltaReq{}
-	if err := dec.StartValue(__VDLType_struct_17); err != nil {
+	if err := dec.StartValue(__VDLType_struct_16); err != nil {
 		return err
 	}
 	for {
@@ -1135,7 +1133,7 @@
 }
 
 func (x DataDeltaReq) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_18); err != nil {
+	if err := enc.StartValue(__VDLType_struct_17); err != nil {
 		return err
 	}
 	if x.DbId != (syncbase.Id{}) {
@@ -1169,7 +1167,7 @@
 }
 
 func __VDLWriteAnon_set_3(enc vdl.Encoder, x map[GroupId]struct{}) error {
-	if err := enc.StartValue(__VDLType_set_19); err != nil {
+	if err := enc.StartValue(__VDLType_set_18); err != nil {
 		return err
 	}
 	if err := enc.SetLenHint(len(x)); err != nil {
@@ -1191,7 +1189,7 @@
 
 func (x *DataDeltaReq) VDLRead(dec vdl.Decoder) error {
 	*x = DataDeltaReq{}
-	if err := dec.StartValue(__VDLType_struct_18); err != nil {
+	if err := dec.StartValue(__VDLType_struct_17); err != nil {
 		return err
 	}
 	for {
@@ -1223,7 +1221,7 @@
 }
 
 func __VDLReadAnon_set_3(dec vdl.Decoder, x *map[GroupId]struct{}) error {
-	if err := dec.StartValue(__VDLType_set_19); err != nil {
+	if err := dec.StartValue(__VDLType_set_18); err != nil {
 		return err
 	}
 	var tmpMap map[GroupId]struct{}
@@ -1297,7 +1295,7 @@
 }
 
 func (x DeltaReqSgs) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_union_20); err != nil {
+	if err := enc.StartValue(__VDLType_union_19); err != nil {
 		return err
 	}
 	if err := enc.NextField("Sgs"); err != nil {
@@ -1313,7 +1311,7 @@
 }
 
 func (x DeltaReqData) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_union_20); err != nil {
+	if err := enc.StartValue(__VDLType_union_19); err != nil {
 		return err
 	}
 	if err := enc.NextField("Data"); err != nil {
@@ -1329,7 +1327,7 @@
 }
 
 func VDLReadDeltaReq(dec vdl.Decoder, x *DeltaReq) error {
-	if err := dec.StartValue(__VDLType_union_20); err != nil {
+	if err := dec.StartValue(__VDLType_union_19); err != nil {
 		return err
 	}
 	f, err := dec.NextField()
@@ -1414,7 +1412,7 @@
 }
 
 func (x DeltaRespRec) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_union_21); err != nil {
+	if err := enc.StartValue(__VDLType_union_20); err != nil {
 		return err
 	}
 	if err := enc.NextField("Rec"); err != nil {
@@ -1430,7 +1428,7 @@
 }
 
 func (x DeltaRespGvs) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_union_21); err != nil {
+	if err := enc.StartValue(__VDLType_union_20); err != nil {
 		return err
 	}
 	if err := enc.NextField("Gvs"); err != nil {
@@ -1446,7 +1444,7 @@
 }
 
 func VDLReadDeltaResp(dec vdl.Decoder, x *DeltaResp) error {
-	if err := dec.StartValue(__VDLType_union_21); err != nil {
+	if err := dec.StartValue(__VDLType_union_20); err != nil {
 		return err
 	}
 	f, err := dec.NextField()
@@ -1507,7 +1505,7 @@
 }
 
 func (x SgPriority) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_22); err != nil {
+	if err := enc.StartValue(__VDLType_struct_21); err != nil {
 		return err
 	}
 	if x.DevType != 0 {
@@ -1558,7 +1556,7 @@
 
 func (x *SgPriority) VDLRead(dec vdl.Decoder) error {
 	*x = SgPriority{}
-	if err := dec.StartValue(__VDLType_struct_22); err != nil {
+	if err := dec.StartValue(__VDLType_struct_21); err != nil {
 		return err
 	}
 	for {
@@ -1614,7 +1612,7 @@
 }
 
 func (x SgPriorities) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_map_23); err != nil {
+	if err := enc.StartValue(__VDLType_map_22); err != nil {
 		return err
 	}
 	if err := enc.SetLenHint(len(x)); err != nil {
@@ -1638,7 +1636,7 @@
 }
 
 func (x *SgPriorities) VDLRead(dec vdl.Decoder) error {
-	if err := dec.StartValue(__VDLType_map_23); err != nil {
+	if err := dec.StartValue(__VDLType_map_22); err != nil {
 		return err
 	}
 	var tmpMap SgPriorities
@@ -1683,7 +1681,7 @@
 }
 
 func (x DeltaFinalResp) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_24); err != nil {
+	if err := enc.StartValue(__VDLType_struct_23); err != nil {
 		return err
 	}
 	if len(x.SgPriorities) != 0 {
@@ -1702,7 +1700,7 @@
 
 func (x *DeltaFinalResp) VDLRead(dec vdl.Decoder) error {
 	*x = DeltaFinalResp{}
-	if err := dec.StartValue(__VDLType_struct_24); err != nil {
+	if err := dec.StartValue(__VDLType_struct_23); err != nil {
 		return err
 	}
 	for {
@@ -1743,14 +1741,14 @@
 }
 
 func (x ChunkHash) VDLWrite(enc vdl.Encoder) error {
-	if err := enc.StartValue(__VDLType_struct_25); err != nil {
+	if err := enc.StartValue(__VDLType_struct_24); err != nil {
 		return err
 	}
 	if len(x.Hash) != 0 {
 		if err := enc.NextField("Hash"); err != nil {
 			return err
 		}
-		if err := enc.StartValue(__VDLType_list_7); err != nil {
+		if err := enc.StartValue(__VDLType_list_25); err != nil {
 			return err
 		}
 		if err := enc.EncodeBytes(x.Hash); err != nil {
@@ -1768,7 +1766,7 @@
 
 func (x *ChunkHash) VDLRead(dec vdl.Decoder) error {
 	*x = ChunkHash{}
-	if err := dec.StartValue(__VDLType_struct_25); err != nil {
+	if err := dec.StartValue(__VDLType_struct_24); err != nil {
 		return err
 	}
 	for {
@@ -1816,7 +1814,7 @@
 		if err := enc.NextField("Data"); err != nil {
 			return err
 		}
-		if err := enc.StartValue(__VDLType_list_7); err != nil {
+		if err := enc.StartValue(__VDLType_list_25); err != nil {
 			return err
 		}
 		if err := enc.EncodeBytes(x.Data); err != nil {
@@ -3636,25 +3634,25 @@
 	__VDLType_list_4    *vdl.Type
 	__VDLType_struct_5  *vdl.Type
 	__VDLType_struct_6  *vdl.Type
-	__VDLType_list_7    *vdl.Type
-	__VDLType_string_8  *vdl.Type
-	__VDLType_enum_9    *vdl.Type
+	__VDLType_string_7  *vdl.Type
+	__VDLType_enum_8    *vdl.Type
+	__VDLType_struct_9  *vdl.Type
 	__VDLType_struct_10 *vdl.Type
 	__VDLType_struct_11 *vdl.Type
-	__VDLType_struct_12 *vdl.Type
-	__VDLType_map_13    *vdl.Type
-	__VDLType_struct_14 *vdl.Type
-	__VDLType_map_15    *vdl.Type
+	__VDLType_map_12    *vdl.Type
+	__VDLType_struct_13 *vdl.Type
+	__VDLType_map_14    *vdl.Type
+	__VDLType_struct_15 *vdl.Type
 	__VDLType_struct_16 *vdl.Type
 	__VDLType_struct_17 *vdl.Type
-	__VDLType_struct_18 *vdl.Type
-	__VDLType_set_19    *vdl.Type
+	__VDLType_set_18    *vdl.Type
+	__VDLType_union_19  *vdl.Type
 	__VDLType_union_20  *vdl.Type
-	__VDLType_union_21  *vdl.Type
-	__VDLType_struct_22 *vdl.Type
-	__VDLType_map_23    *vdl.Type
+	__VDLType_struct_21 *vdl.Type
+	__VDLType_map_22    *vdl.Type
+	__VDLType_struct_23 *vdl.Type
 	__VDLType_struct_24 *vdl.Type
-	__VDLType_struct_25 *vdl.Type
+	__VDLType_list_25   *vdl.Type
 	__VDLType_struct_26 *vdl.Type
 	__VDLType_struct_27 *vdl.Type
 	__VDLType_struct_28 *vdl.Type
@@ -3717,25 +3715,25 @@
 	__VDLType_list_4 = vdl.TypeOf((*[]string)(nil))
 	__VDLType_struct_5 = vdl.TypeOf((*vdltime.Time)(nil)).Elem()
 	__VDLType_struct_6 = vdl.TypeOf((*LogRec)(nil)).Elem()
-	__VDLType_list_7 = vdl.TypeOf((*[]byte)(nil))
-	__VDLType_string_8 = vdl.TypeOf((*GroupId)(nil))
-	__VDLType_enum_9 = vdl.TypeOf((*SyncgroupStatus)(nil))
-	__VDLType_struct_10 = vdl.TypeOf((*Syncgroup)(nil)).Elem()
-	__VDLType_struct_11 = vdl.TypeOf((*syncbase.Id)(nil)).Elem()
-	__VDLType_struct_12 = vdl.TypeOf((*syncbase.SyncgroupSpec)(nil)).Elem()
-	__VDLType_map_13 = vdl.TypeOf((*map[string]syncbase.SyncgroupMemberInfo)(nil))
-	__VDLType_struct_14 = vdl.TypeOf((*syncbase.SyncgroupMemberInfo)(nil)).Elem()
-	__VDLType_map_15 = vdl.TypeOf((*CollectionPerms)(nil))
-	__VDLType_struct_16 = vdl.TypeOf((*access.AccessList)(nil)).Elem()
-	__VDLType_struct_17 = vdl.TypeOf((*SgDeltaReq)(nil)).Elem()
-	__VDLType_struct_18 = vdl.TypeOf((*DataDeltaReq)(nil)).Elem()
-	__VDLType_set_19 = vdl.TypeOf((*map[GroupId]struct{})(nil))
-	__VDLType_union_20 = vdl.TypeOf((*DeltaReq)(nil))
-	__VDLType_union_21 = vdl.TypeOf((*DeltaResp)(nil))
-	__VDLType_struct_22 = vdl.TypeOf((*SgPriority)(nil)).Elem()
-	__VDLType_map_23 = vdl.TypeOf((*SgPriorities)(nil))
-	__VDLType_struct_24 = vdl.TypeOf((*DeltaFinalResp)(nil)).Elem()
-	__VDLType_struct_25 = vdl.TypeOf((*ChunkHash)(nil)).Elem()
+	__VDLType_string_7 = vdl.TypeOf((*GroupId)(nil))
+	__VDLType_enum_8 = vdl.TypeOf((*SyncgroupStatus)(nil))
+	__VDLType_struct_9 = vdl.TypeOf((*Syncgroup)(nil)).Elem()
+	__VDLType_struct_10 = vdl.TypeOf((*syncbase.Id)(nil)).Elem()
+	__VDLType_struct_11 = vdl.TypeOf((*syncbase.SyncgroupSpec)(nil)).Elem()
+	__VDLType_map_12 = vdl.TypeOf((*map[string]syncbase.SyncgroupMemberInfo)(nil))
+	__VDLType_struct_13 = vdl.TypeOf((*syncbase.SyncgroupMemberInfo)(nil)).Elem()
+	__VDLType_map_14 = vdl.TypeOf((*CollectionPerms)(nil))
+	__VDLType_struct_15 = vdl.TypeOf((*access.AccessList)(nil)).Elem()
+	__VDLType_struct_16 = vdl.TypeOf((*SgDeltaReq)(nil)).Elem()
+	__VDLType_struct_17 = vdl.TypeOf((*DataDeltaReq)(nil)).Elem()
+	__VDLType_set_18 = vdl.TypeOf((*map[GroupId]struct{})(nil))
+	__VDLType_union_19 = vdl.TypeOf((*DeltaReq)(nil))
+	__VDLType_union_20 = vdl.TypeOf((*DeltaResp)(nil))
+	__VDLType_struct_21 = vdl.TypeOf((*SgPriority)(nil)).Elem()
+	__VDLType_map_22 = vdl.TypeOf((*SgPriorities)(nil))
+	__VDLType_struct_23 = vdl.TypeOf((*DeltaFinalResp)(nil)).Elem()
+	__VDLType_struct_24 = vdl.TypeOf((*ChunkHash)(nil)).Elem()
+	__VDLType_list_25 = vdl.TypeOf((*[]byte)(nil))
 	__VDLType_struct_26 = vdl.TypeOf((*ChunkData)(nil)).Elem()
 	__VDLType_struct_27 = vdl.TypeOf((*TimeReq)(nil)).Elem()
 	__VDLType_struct_28 = vdl.TypeOf((*TimeResp)(nil)).Elem()
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index b8c9748..355d937 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -67,7 +67,7 @@
 // metadata and data. Value is the actual value of a store object.
 type LogRec struct {
 	Metadata LogRecMetadata
-	Value    []byte
+	Value    any
 }
 
 // GroupId is a globally unique syncgroup ID.
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index 248dc54..c21b088 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -962,7 +962,7 @@
 // sharedSgPrefixes contains only those shared with a peer device that provided this data;
 // it is nil if the data was created locally.
 func (s *syncService) processBlobRefs(ctx *context.T, dbId wire.Id, st store.StoreReader, peer string, isCreator bool,
-	allSgPrefixes map[string]sgSet, sharedSgPrefixes map[string]sgSet, m *interfaces.LogRecMetadata, valbuf []byte) error {
+	allSgPrefixes map[string]sgSet, sharedSgPrefixes map[string]sgSet, m *interfaces.LogRecMetadata, rawValue *vom.RawBytes) error {
 
 	objid := m.ObjId
 	srcPeer := syncbaseIdToName(m.Id)
@@ -970,12 +970,12 @@
 	vlog.VI(4).Infof("sync: processBlobRefs: begin: objid %s, peer %s, src %s", objid, peer, srcPeer)
 	defer vlog.VI(4).Infof("sync: processBlobRefs: end: objid %s, peer %s, src %s", objid, peer, srcPeer)
 
-	if valbuf == nil {
+	if rawValue == nil {
 		return nil
 	}
 
 	var val *vdl.Value
-	if err := vom.Decode(valbuf, &val); err != nil {
+	if err := rawValue.ToValue(&val); err != nil {
 		// If we cannot decode the value, ignore blob processing and
 		// continue. This is fine since all stored values need not be
 		// vom encoded.
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 40850dd..6c076be 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -714,23 +714,28 @@
 }
 
 // insertSgRecInDb inserts the versioned value of a syncgroup in the Database.
-func (iSt *initiationState) insertSgRecInDb(ctx *context.T, rec *LocalLogRec, valbuf []byte, tx store.Transaction) error {
+func (iSt *initiationState) insertSgRecInDb(ctx *context.T, rec *LocalLogRec, rawValue *vom.RawBytes, tx store.Transaction) error {
 	m := rec.Metadata
 	var sg interfaces.Syncgroup
-	if err := vom.Decode(valbuf, &sg); err != nil {
+	if err := rawValue.ToValue(&sg); err != nil {
 		return err
 	}
 	return setSGDataEntryByOID(ctx, tx, m.ObjId, m.CurVers, &sg)
 }
 
 // insertRecInDb inserts the versioned value in the Database.
-func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *LocalLogRec, valbuf []byte, tx *watchable.Transaction) error {
+func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *LocalLogRec, rawValue *vom.RawBytes, tx *watchable.Transaction) error {
 	m := rec.Metadata
 	// TODO(hpucha): Hack right now. Need to change Database's handling of
 	// deleted objects. Currently, the initiator needs to treat deletions
 	// specially since deletions do not get a version number or a special
 	// value in the Database.
 	if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+		var valbuf []byte
+		var err error
+		if valbuf, err = vom.Encode(rawValue); err != nil {
+			return err
+		}
 		return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
 	}
 	return nil
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index a751a48..e88ea9c 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -354,14 +354,14 @@
 		}
 
 		var val string = "abc"
-		valbuf, err := vom.Encode(val)
+		rawValue, err := vom.RawBytesFromValue(val)
 		if err != nil {
 			t.Fatalf("createReplayStream encode failed, err %v", err)
 		}
 
 		rec := interfaces.DeltaRespRec{interfaces.LogRec{
 			Metadata: createMetadata(t, ty, cmd),
-			Value:    valbuf,
+			Value:    rawValue,
 		}}
 
 		stream.add(rec)
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0861009..1e9add3 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -459,26 +459,31 @@
 func (rSt *responderState) makeWireLogRec(ctx *context.T, rec *LocalLogRec) (*interfaces.LogRec, error) {
 	// Get the object value at the required version.
 	key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
-	var value []byte
+	var rawValue *vom.RawBytes
 
 	if rSt.sg {
 		sg, err := getSGDataEntryByOID(ctx, rSt.st, key, version)
 		if err != nil {
 			return nil, err
 		}
-		value, err = vom.Encode(sg)
+		rawValue, err = vom.RawBytesFromValue(sg)
 		if err != nil {
 			return nil, err
 		}
 	} else if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+		var value []byte
 		var err error
 		value, err = watchable.GetAtVersion(ctx, rSt.st, []byte(key), nil, []byte(version))
 		if err != nil {
 			return nil, err
 		}
+		err = vom.Decode(value, &rawValue)
+		if err != nil {
+			return nil, err
+		}
 	}
 
-	wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}
+	wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: rawValue}
 	return wireRec, nil
 }
 
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index f631a76..37d2d1f 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -14,6 +14,7 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
+	"v.io/v23/vom"
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/store/watchable"
@@ -385,7 +386,12 @@
 					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
 				}
 				value := fmt.Sprintf("value_%s", okey)
-				if err := watchable.PutAtVersion(nil, tx, []byte(okey), []byte(value), []byte(vers)); err != nil {
+				var encodedValue []byte
+				var err error
+				if encodedValue, err = vom.Encode(value); err != nil {
+					t.Fatalf("vom.Encode(%q) failed err %v", value, err)
+				}
+				if err := watchable.PutAtVersion(nil, tx, []byte(okey), encodedValue, []byte(vers)); err != nil {
 					t.Fatalf("PutAtVersion(%d:%d) failed rec %v value %s: err %v", id, k, rec, value, err)
 				}
 
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index e87869f..8dc34d3 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -21,6 +21,7 @@
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/services/watch"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/server/interfaces"
@@ -322,8 +323,12 @@
 		if err != nil {
 			return err
 		}
+		var rawValue *vom.RawBytes
+		if err = vom.Decode(buf, &rawValue); err != nil {
+			return err
+		}
 
-		if err = s.processBlobRefs(ctx, dbId, st, s.name, true, sgPfxs, nil, m, buf); err != nil {
+		if err = s.processBlobRefs(ctx, dbId, st, s.name, true, sgPfxs, nil, m, rawValue); err != nil {
 			return err
 		}
 	}