syncbase/vsync: Addressing Adam's review comments from go/vcl/12873 and a few other cleanups.

Change-Id: I876d7685cc5e34c9575c0dfe4c41271826a97dc1
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index 410371e..3bb3667 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -21,7 +21,7 @@
 	// records, the responder's genvector, and a "Finish" DeltaResp
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
-	// next Database, in common with this responder.
+	// next Database in common with this responder.
 	GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index 0ca399f..b87397b 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -35,7 +35,7 @@
 	// records, the responder's genvector, and a "Finish" DeltaResp
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
-	// next Database, in common with this responder.
+	// next Database in common with this responder.
 	GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
@@ -206,7 +206,7 @@
 	// records, the responder's genvector, and a "Finish" DeltaResp
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
-	// next Database, in common with this responder.
+	// next Database in common with this responder.
 	GetDeltas(*context.T, SyncGetDeltasServerCall) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
@@ -235,7 +235,7 @@
 	// records, the responder's genvector, and a "Finish" DeltaResp
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
-	// next Database, in common with this responder.
+	// next Database in common with this responder.
 	GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
@@ -314,7 +314,7 @@
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "GetDeltas",
-			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database, in common with this responder.",
+			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
index 5ab18b0..6d608a3 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
@@ -49,7 +49,11 @@
 	RecType   byte   // type of log record.
 
         // Object related information.
-        ObjId       string      // id of the object that was updated. This id is relative to Application and Database names.
+
+	// Id of the object that was updated. This id is relative to Application
+	// and Database names and is the store key for a particular row in a
+	// table.
+        ObjId       string
         CurVers     string      // current version number of the object.
         Parents     []string    // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime     time.Time   // timestamp when the update is generated.
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
index aa0872a..aeb4e31 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -48,8 +48,10 @@
 	Id      uint64 // device id that created the log record.
 	Gen     uint64 // generation number for the log record.
 	RecType byte   // type of log record.
-	// Object related information.
-	ObjId      string    // id of the object that was updated. This id is relative to Application and Database names.
+	// Id of the object that was updated. This id is relative to Application
+	// and Database names and is the store key for a particular row in a
+	// table.
+	ObjId      string
 	CurVers    string    // current version number of the object.
 	Parents    []string  // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime    time.Time // timestamp when the update is generated.
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 3c4a054..963121e 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -208,7 +208,85 @@
 	wtx.fromSync = true
 }
 
-// Exported as a helper function for testing purposes
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency
+// control. At minimum, an object implementing the StoreReader interface is
+// required since this is a Get operation.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+	switch w := st.(type) {
+	case *transaction:
+		w.mu.Lock()
+		defer w.mu.Unlock()
+		if w.err != nil {
+			return nil, convertError(w.err)
+		}
+		return getVersion(w.itx, key)
+	case *wstore:
+		return getVersion(w.ist, key)
+	}
+	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire. At minimum, an object implementing the
+// StoreReader interface is required since this is a Get operation.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+	switch w := st.(type) {
+	case *transaction:
+		w.mu.Lock()
+		defer w.mu.Unlock()
+		if w.err != nil {
+			return valbuf, convertError(w.err)
+		}
+		return getAtVersion(w.itx, key, valbuf, version)
+	case *wstore:
+		return getAtVersion(w.ist, key, valbuf, version)
+	}
+	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	// Note that we do not enqueue a PutOp in the log since this Put is not
+	// updating the current version of a key.
+	return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+		return err
+	}
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	return nil
+}
+
+// Exported as a helper function for testing purposes.
 func getLogEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 036d326..a21057f 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -20,7 +20,6 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/context"
 	"v.io/v23/verror"
 )
 
@@ -50,34 +49,6 @@
 	return []byte(join(string(key), string(version)))
 }
 
-// GetVersion returns the current version of a managed key. This method is used
-// by the Sync module when the initiator is attempting to add new versions of
-// objects. Reading the version key is used for optimistic concurrency control.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
-	wtx := st.(*transaction)
-
-	wtx.mu.Lock()
-	defer wtx.mu.Unlock()
-	if wtx.err != nil {
-		return nil, convertError(wtx.err)
-	}
-	return getVersion(wtx.itx, key)
-}
-
-// GetAtVersion returns the value of a managed key at the requested
-// version. This method is used by the Sync module when the responder needs to
-// send objects over the wire.
-func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
-	wtx := st.(*transaction)
-
-	wtx.mu.Lock()
-	defer wtx.mu.Unlock()
-	if wtx.err != nil {
-		return valbuf, convertError(wtx.err)
-	}
-	return getAtVersion(wtx.itx, key, valbuf, version)
-}
-
 func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
 	return st.Get(makeVersionKey(key), nil)
 }
@@ -95,41 +66,6 @@
 	return getAtVersion(st, key, valbuf, version)
 }
 
-// PutAtVersion puts a value for the managed key at the requested version. This
-// method is used by the Sync module exclusively when the initiator adds objects
-// with versions created on other Syncbases.
-func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
-	wtx := tx.(*transaction)
-
-	wtx.mu.Lock()
-	defer wtx.mu.Unlock()
-	if wtx.err != nil {
-		return convertError(wtx.err)
-	}
-
-	return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
-}
-
-// PutVersion updates the version of a managed key to the requested
-// version. This method is used by the Sync module exclusively when the
-// initiator selects which of the already stored versions (via PutAtVersion
-// calls) becomes the current version.
-func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
-	wtx := tx.(*transaction)
-
-	wtx.mu.Lock()
-	defer wtx.mu.Unlock()
-	if wtx.err != nil {
-		return convertError(wtx.err)
-	}
-
-	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
-		return err
-	}
-	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
-	return nil
-}
-
 func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
 	version := NewVersion()
 	if err := tx.Put(makeVersionKey(key), version); err != nil {
diff --git a/x/ref/services/syncbase/vsync/cr.go b/x/ref/services/syncbase/vsync/conflict_resolution.go
similarity index 83%
rename from x/ref/services/syncbase/vsync/cr.go
rename to x/ref/services/syncbase/vsync/conflict_resolution.go
index f7b1827..724e49f 100644
--- a/x/ref/services/syncbase/vsync/cr.go
+++ b/x/ref/services/syncbase/vsync/conflict_resolution.go
@@ -7,6 +7,7 @@
 import (
 	"v.io/v23/context"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 // Policies for conflict resolution.
@@ -64,9 +65,9 @@
 
 // resolveObjConflict resolves a conflict for an object given its ID and the 3
 // versions that express the conflict: the object's local version, its remote
-// version (from the device contacted), and the common ancestor from which both
-// versions branched away.  The function returns the new object value according
-// to the conflict resolution policy.
+// version (from the device contacted), and the closest common ancestor (see
+// dag.go on how the ancestor is chosen). The function returns the new object
+// value according to the conflict resolution policy.
 func (iSt *initiationState) resolveObjConflict(ctx *context.T, oid, local, remote, ancestor string) (*conflictResolution, error) {
 	// Fetch the log records of the 3 object versions.
 	versions := []string{local, remote, ancestor}
@@ -101,35 +102,29 @@
 		res.ty = pickLocal
 	case local.Metadata.CurVers < remote.Metadata.CurVers:
 		res.ty = pickRemote
+	default:
+		vlog.Fatalf("resolveObjConflictByTime:: local and remote update times and versions are the same, local %v remote %v", local, remote)
 	}
 
 	return &res, nil
 }
 
-// getLogRecsBatch gets the log records for an array of versions.
+// getLogRecsBatch gets the log records for an array of versions for a given object.
 func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
 	lrecs := make([]*localLogRec, len(versions))
-	var err error
 	for p, v := range versions {
-		lrecs[p], err = iSt.getLogRec(ctx, obj, v)
+		logKey, err := getLogRecKey(ctx, iSt.tx, obj, v)
+		if err != nil {
+			return nil, err
+		}
+		dev, gen, err := splitLogRecKey(ctx, logKey)
+		if err != nil {
+			return nil, err
+		}
+		lrecs[p], err = getLogRec(ctx, iSt.tx, dev, gen)
 		if err != nil {
 			return nil, err
 		}
 	}
 	return lrecs, nil
 }
-
-// getLogRec returns the log record corresponding to a given object and its version.
-func (iSt *initiationState) getLogRec(ctx *context.T, obj, vers string) (*localLogRec, error) {
-	// TODO(hpucha): May be the change the name in dag for getLogrec. We now
-	// have a few functions of this name.
-	logKey, err := getLogrec(ctx, iSt.tx, obj, vers)
-	if err != nil {
-		return nil, err
-	}
-	dev, gen, err := splitLogRecKey(ctx, logKey)
-	if err != nil {
-		return nil, err
-	}
-	return getLogRec(ctx, iSt.tx, dev, gen)
-}
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index c0d8efa..0f5adfd 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -688,8 +688,8 @@
 	return nil
 }
 
-// getLogrec returns the log record information for a given object version.
-func getLogrec(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
+// getLogRecKey returns the key of the log record for a given object version.
+func getLogRecKey(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
 	node, err := getNode(ctx, st, oid, version)
 	if err != nil {
 		return "", err
@@ -702,7 +702,7 @@
 
 // nodeKey returns the key used to access a DAG node (oid, version).
 func nodeKey(oid, version string) string {
-	return util.JoinKeyParts(util.SyncPrefix, "dag", "n", oid, version)
+	return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "n", oid, version)
 }
 
 // setNode stores the DAG node entry.
@@ -752,7 +752,7 @@
 
 // headKey returns the key used to access the DAG object head.
 func headKey(oid string) string {
-	return util.JoinKeyParts(util.SyncPrefix, "dag", "h", oid)
+	return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "h", oid)
 }
 
 // setHead stores version as the DAG object head.
@@ -784,7 +784,7 @@
 
 // batchKey returns the key used to access the DAG batch info.
 func batchKey(btid uint64) string {
-	return util.JoinKeyParts(util.SyncPrefix, "dag", "b", fmt.Sprintf("%d", btid))
+	return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "b", fmt.Sprintf("%d", btid))
 }
 
 // setBatch stores the DAG batch entry.
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 7dd7175..1aa75d5 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -35,7 +35,7 @@
 		t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+	if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
 		t.Errorf("non-existent object %s:%s has a logrec: %v", oid, version, logrec)
 	}
 
@@ -60,7 +60,7 @@
 		t.Errorf("object %s:%s has wrong data: %v instead of %v", oid, version, node2, node)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, version); err != nil || logrec != "logrec-23" {
+	if logrec, err := getLogRecKey(nil, st, oid, version); err != nil || logrec != "logrec-23" {
 		t.Errorf("object %s:%s has wrong logrec: %s", oid, version, logrec)
 	}
 }
@@ -96,7 +96,7 @@
 		t.Errorf("hasNode() found deleted object %s:%s", oid, version)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+	if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
 		t.Errorf("deleted object %s:%s has logrec: %s", oid, version, logrec)
 	}
 }
@@ -335,7 +335,7 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -411,10 +411,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -494,13 +494,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -587,13 +587,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -674,10 +674,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
 
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 1d447c5..600af4b 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -4,6 +4,12 @@
 
 package vsync
 
+// Initiator is a goroutine that periodically picks a peer from all the known
+// remote peers, and requests deltas from that peer for all the SyncGroups in
+// common across all apps/databases. It then modifies the sync metadata (DAG and
+// local log records) based on the deltas, detects and resolves conflicts if
+// any, and suitably updates the local Databases.
+
 import (
 	"sort"
 	"strings"
@@ -68,6 +74,9 @@
 		case <-ticker.C:
 		}
 
+		// TODO(hpucha): Cut a gen for the responder even if there is no
+		// one to initiate to?
+
 		// Do work.
 		peer, err := s.pickPeer(ctx)
 		if err != nil {
@@ -82,12 +91,21 @@
 // * Contacting the peer to receive all the deltas based on the local genvector.
 // * Processing those deltas to discover objects which have been updated.
 // * Processing updated objects to detect and resolve any conflicts if needed.
-// * Communicating relevant object updates to the Database.
-// The processing of the deltas is done one Database at a time.
+// * Communicating relevant object updates to the Database, and updating local
+// genvector to catch up to the received remote genvector.
+//
+// The processing of the deltas is done one Database at a time. If a local error
+// is encountered during the processing of a Database, that Database is skipped
+// and the initiator continues on to the next one. If the connection to the peer
+// encounters an error, this initiation round is aborted. Note that until the
+// local genvector is updated based on the received deltas (the last step in an
+// initiation round), the work done by the initiator is idempotent.
+//
+// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
 func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
 	info := s.allMembers.members[peer]
 	if info == nil {
-		return
+		vlog.Fatalf("getDeltasFromPeer:: missing information in member view for %q", peer)
 	}
 	connected := false
 	var stream interfaces.SyncGetDeltasClientCall
@@ -299,11 +317,11 @@
 	// devices.  These remote devices in turn can send these
 	// generations back to the initiator in progress which was
 	// started with older generation information.
-	if err := iSt.sync.checkPtLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+	if err := iSt.sync.checkptLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
 		return err
 	}
 
-	local, lgen, err := iSt.sync.getDbGenInfo(ctx, iSt.appName, iSt.dbName)
+	local, lgen, err := iSt.sync.copyDbGenInfo(ctx, iSt.appName, iSt.dbName)
 	if err != nil {
 		return err
 	}
@@ -364,6 +382,10 @@
 	// repeatedly doesn't affect what data is seen next.
 	rcvr := iSt.stream.RecvStream()
 	start, finish := false, false
+
+	// TODO(hpucha): See if we can avoid committing the entire delta stream
+	// as one batch. Currently the dependency is between the log records and
+	// the batch info.
 	tx := iSt.st.NewTransaction()
 	committed := false
 
@@ -373,7 +395,7 @@
 		}
 	}()
 
-	// Track received batches.
+	// Track received batches (BatchId --> BatchCount mapping).
 	batchMap := make(map[uint64]uint64)
 
 	for rcvr.Advance() {
@@ -396,7 +418,7 @@
 
 		case interfaces.DeltaRespRec:
 			// Insert log record in Database.
-			// TODO(hpucha): Should we reserve more postions in a batch?
+			// TODO(hpucha): Should we reserve more positions in a batch?
 			// TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
 			pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
 			rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
@@ -444,6 +466,11 @@
 	// should not conflict with any other keys. So if it fails, it is a
 	// non-retriable error.
 	err := tx.Commit()
+	if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+		// Note: This might be triggered with memstore until it handles
+		// transactions in a more fine-grained fashion.
+		vlog.Fatalf("recvAndProcessDeltas:: encountered concurrent transaction")
+	}
 	if err == nil {
 		committed = true
 	}
@@ -474,7 +501,9 @@
 		return err
 	}
 	// TODO(hpucha): Hack right now. Need to change Database's handling of
-	// deleted objects.
+	// deleted objects. Currently, the initiator needs to treat deletions
+	// specially since deletions do not get a version number or a special
+	// value in the Database.
 	if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
 		return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
 	}
@@ -482,8 +511,13 @@
 }
 
 // processUpdatedObjects processes all the updates received by the initiator,
-// one object at a time. For each updated object, we first check if the object
-// has any conflicts, resulting in three possibilities:
+// one object at a time. Conflict detection and resolution is carried out after
+// the entire delta of log records is replayed, instead of incrementally after
+// each record/batch is replayed, to avoid repeating conflict resolution already
+// performed by other peers.
+//
+// For each updated object, we first check if the object has any conflicts,
+// resulting in three possibilities:
 //
 // * There is no conflict, and no updates are needed to the Database
 // (isConflict=false, newHead == oldHead). All changes received convey
@@ -565,16 +599,16 @@
 
 // detectConflicts iterates through all the updated objects to detect conflicts.
 func (iSt *initiationState) detectConflicts(ctx *context.T) error {
-	for obj, st := range iSt.updObjects {
+	for objid, confSt := range iSt.updObjects {
 		// Check if object has a conflict.
 		var err error
-		st.isConflict, st.newHead, st.oldHead, st.ancestor, err = hasConflict(ctx, iSt.tx, obj, iSt.dagGraft)
+		confSt.isConflict, confSt.newHead, confSt.oldHead, confSt.ancestor, err = hasConflict(ctx, iSt.tx, objid, iSt.dagGraft)
 		if err != nil {
 			return err
 		}
 
-		if !st.isConflict {
-			st.res = &conflictResolution{ty: pickRemote}
+		if !confSt.isConflict {
+			confSt.res = &conflictResolution{ty: pickRemote}
 		}
 	}
 	return nil
@@ -583,10 +617,10 @@
 // updateDbAndSync updates the Database, and if that is successful, updates log,
 // dag and genvector data structures as needed.
 func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
-	for obj, st := range iSt.updObjects {
+	for objid, confSt := range iSt.updObjects {
 		// If the local version is picked, no further updates to the
 		// Database are needed.
-		if st.res.ty == pickLocal {
+		if confSt.res.ty == pickLocal {
 			continue
 		}
 
@@ -596,8 +630,8 @@
 		// TODO(hpucha): Hack right now. Need to change Database's
 		// handling of deleted objects.
 		oldVersDeleted := true
-		if st.oldHead != NoVersion {
-			oldDagNode, err := getNode(ctx, iSt.tx, obj, st.oldHead)
+		if confSt.oldHead != NoVersion {
+			oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
 			if err != nil {
 				return err
 			}
@@ -606,17 +640,17 @@
 
 		var newVersion string
 		var newVersDeleted bool
-		switch st.res.ty {
+		switch confSt.res.ty {
 		case pickRemote:
-			newVersion = st.newHead
-			newDagNode, err := getNode(ctx, iSt.tx, obj, newVersion)
+			newVersion = confSt.newHead
+			newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
 			if err != nil {
 				return err
 			}
 			newVersDeleted = newDagNode.Deleted
 		case createNew:
-			newVersion = st.res.rec.Metadata.CurVers
-			newVersDeleted = st.res.rec.Metadata.Delete
+			newVersion = confSt.res.rec.Metadata.CurVers
+			newVersDeleted = confSt.res.rec.Metadata.Delete
 		}
 
 		// Skip delete followed by a delete.
@@ -626,36 +660,36 @@
 
 		if !oldVersDeleted {
 			// Read current version to enter it in the readset of the transaction.
-			version, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj))
+			version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
 			if err != nil {
 				return err
 			}
-			if string(version) != st.oldHead {
+			if string(version) != confSt.oldHead {
 				return store.NewErrConcurrentTransaction(ctx)
 			}
 		} else {
 			// Ensure key doesn't exist.
-			if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+			if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
 				return store.NewErrConcurrentTransaction(ctx)
 			}
 		}
 
 		if !newVersDeleted {
-			if st.res.ty == createNew {
-				if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(obj), st.res.val, []byte(newVersion)); err != nil {
+			if confSt.res.ty == createNew {
+				if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
 					return err
 				}
 			}
-			if err := watchable.PutVersion(ctx, iSt.tx, []byte(obj), []byte(newVersion)); err != nil {
+			if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
 				return err
 			}
 		} else {
-			if err := iSt.tx.Delete([]byte(obj)); err != nil {
+			if err := iSt.tx.Delete([]byte(objid)); err != nil {
 				return err
 			}
 		}
 
-		if err := iSt.updateLogAndDag(ctx, obj); err != nil {
+		if err := iSt.updateLogAndDag(ctx, objid); err != nil {
 			return err
 		}
 	}
@@ -665,31 +699,31 @@
 
 // updateLogAndDag updates the log and dag data structures.
 func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
-	st, ok := iSt.updObjects[obj]
+	confSt, ok := iSt.updObjects[obj]
 	if !ok {
 		return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
 	}
 	var newVersion string
 
-	if !st.isConflict {
-		newVersion = st.newHead
+	if !confSt.isConflict {
+		newVersion = confSt.newHead
 	} else {
 		// Object had a conflict. Create a log record to reflect resolution.
 		var rec *localLogRec
 
 		switch {
-		case st.res.ty == pickLocal:
+		case confSt.res.ty == pickLocal:
 			// Local version was picked as the conflict resolution.
-			rec = iSt.createLocalLinkLogRec(ctx, obj, st.oldHead, st.newHead)
-			newVersion = st.oldHead
-		case st.res.ty == pickRemote:
+			rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.oldHead, confSt.newHead)
+			newVersion = confSt.oldHead
+		case confSt.res.ty == pickRemote:
 			// Remote version was picked as the conflict resolution.
-			rec = iSt.createLocalLinkLogRec(ctx, obj, st.newHead, st.oldHead)
-			newVersion = st.newHead
+			rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.newHead, confSt.oldHead)
+			newVersion = confSt.newHead
 		default:
 			// New version was created to resolve the conflict.
-			rec = st.res.rec
-			newVersion = st.res.rec.Metadata.CurVers
+			rec = confSt.res.rec
+			newVersion = confSt.res.rec.Metadata.CurVers
 		}
 
 		if err := putLogRec(ctx, iSt.tx, rec); err != nil {
@@ -742,14 +776,14 @@
 // updateSyncSt updates local sync state at the end of an initiator cycle.
 func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
 	// Get the current local sync state.
-	dsInMem, err := iSt.sync.getDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+	dsInMem, err := iSt.sync.copyDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
 	if err != nil {
 		return err
 	}
 	ds := &dbSyncState{
-		Gen:     dsInMem.gen,
-		CkPtGen: dsInMem.ckPtGen,
-		GenVec:  dsInMem.genvec,
+		Gen:        dsInMem.gen,
+		CheckptGen: dsInMem.checkptGen,
+		GenVec:     dsInMem.genvec,
 	}
 
 	// remote can be a subset of local.
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index d3c55d4..bb62d6e 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -2,6 +2,14 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+// The initiator tests below are driven by replaying the state from the log
+// files (in testdata directory). These log files may mimic watching the
+// Database locally (addl commands in the log file) or obtaining log records and
+// generation vector from a remote peer (addr, genvec commands). The log files
+// contain the metadata of log records. The log files are only used to set up
+// the state. The tests verify that given a particular local state and a stream
+// of remote deltas, the initiator behaves as expected.
+
 package vsync
 
 import (
@@ -199,8 +207,7 @@
 // Helpers.
 
 func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
-	// Set a large value to prevent the threads from firing.
-	// Test is not thread safe.
+	// Set a large value to prevent the initiator from running.
 	peerSyncInterval = 1 * time.Hour
 	conflictResolutionPolicy = useTime
 	svc := createService(t)
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index d096aa8..2049755 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -164,7 +164,7 @@
 
 	var respVec interfaces.GenVector
 	var respGen uint64
-	respVec, respGen, rSt.errState = rSt.sync.getDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
+	respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
 	if rSt.errState != nil {
 		return
 	}
@@ -426,14 +426,9 @@
 // makeWireLogRec creates a sync log record to send on the wire from a given
 // local sync record.
 func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
-	// Get the object value at the required version.  Note: GetAtVersion()
-	// requires a transaction to read the data, so create and abort one.
-	// TODO(hpucha): remove the fake Tx after the change in GetAtVersion().
-	tx := st.NewTransaction()
-	defer tx.Abort()
-
+	// Get the object value at the required version.
 	key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
-	value, err := watchable.GetAtVersion(ctx, tx, []byte(key), nil, []byte(version))
+	value, err := watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
 	if err != nil {
 		return nil, err
 	}
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index b452c07..cf67ca8 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -350,7 +350,7 @@
 		s.id = 10 //responder.
 
 		wantDiff, wantVec := test.genDiff, test.outVec
-		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
+		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
 
 		req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
 		rSt := newResponderState(nil, nil, s, req)
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index d268843..d0322d4 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -81,7 +81,7 @@
 	return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
 }
 
-// randIntn generates as an int, a non-negative pseudo-random number in [0,n).
+// randIntn mimics rand.Intn (generates a non-negative pseudo-random number in [0,n)).
 func randIntn(n int) int {
 	rngLock.Lock()
 	defer rngLock.Unlock()
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 9f8bf17..c0e1429 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -46,8 +46,8 @@
 	gen uint64
 	pos uint64
 
-	ckPtGen uint64
-	genvec  interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+	checkptGen uint64
+	genvec     interfaces.GenVector // Note: Generation vector contains state from remote devices only.
 }
 
 // initSync initializes the sync module during startup. It scans all the
@@ -144,8 +144,8 @@
 	return gen, pos
 }
 
-// checkPtLocalGen freezes the local generation number for the responder's use.
-func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+// checkptLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string) error {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -157,7 +157,7 @@
 
 	// The frozen generation is the last generation number used, i.e. one
 	// below the next available one to use.
-	ds.ckPtGen = ds.gen - 1
+	ds.checkptGen = ds.gen - 1
 	return nil
 }
 
@@ -172,8 +172,8 @@
 	}
 }
 
-// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
-func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+// copyDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) copyDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -184,19 +184,18 @@
 	}
 
 	dsCopy := &dbSyncStateInMem{
-		gen:     ds.gen,
-		pos:     ds.pos,
-		ckPtGen: ds.ckPtGen,
+		gen:        ds.gen,
+		pos:        ds.pos,
+		checkptGen: ds.checkptGen,
 	}
 
-	// Make a copy of the genvec.
 	dsCopy.genvec = copyGenVec(ds.genvec)
 
 	return dsCopy, nil
 }
 
-// getDbGenInfo returns a copy of the current generation information of the Database.
-func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+// copyDbGenInfo returns a copy of the current generation information of the Database.
+func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -206,15 +205,14 @@
 		return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
 
-	// Make a copy of the genvec.
 	genvec := copyGenVec(ds.genvec)
 
 	// Add local generation information to the genvec.
 	for _, gv := range genvec {
-		gv[s.id] = ds.ckPtGen
+		gv[s.id] = ds.checkptGen
 	}
 
-	return genvec, ds.ckPtGen, nil
+	return genvec, ds.checkptGen, nil
 }
 
 // putDbGenInfoRemote puts the current remote generation information of the Database.
@@ -228,7 +226,6 @@
 		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
 
-	// Make a copy of the genvec.
 	ds.genvec = copyGenVec(genvec)
 
 	return nil
@@ -268,7 +265,7 @@
 
 // dbSyncStateKey returns the key used to access the sync state of a Database.
 func dbSyncStateKey() string {
-	return util.JoinKeyParts(util.SyncPrefix, "dbss")
+	return util.JoinKeyParts(util.SyncPrefix, dbssPrefix)
 }
 
 // putDbSyncState persists the sync state object for a given Database.
@@ -291,12 +288,12 @@
 
 // logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
 func logRecsPerDeviceScanPrefix(id uint64) string {
-	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id))
+	return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%x", id))
 }
 
 // logRecKey returns the key used to access a specific log record.
 func logRecKey(id, gen uint64) string {
-	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
+	return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
 }
 
 // splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
@@ -306,11 +303,14 @@
 	if len(parts) != 4 {
 		return 0, 0, verr
 	}
+	if parts[0] != util.SyncPrefix || parts[1] != logPrefix {
+		return 0, 0, verr
+	}
 	id, err := strconv.ParseUint(parts[2], 10, 64)
 	if err != nil {
 		return 0, 0, verr
 	}
-	gen, err := strconv.ParseUint(parts[3], 10, 64)
+	gen, err := strconv.ParseUint(parts[3], 16, 64)
 	if err != nil {
 		return 0, 0, verr
 	}
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index 437a373..3929a05 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -112,6 +112,32 @@
 	checkLogRec(t, st, id, gen, false, nil)
 }
 
+func TestLogRecKeyUtils(t *testing.T) {
+	invalid := []string{"$sync:aa:bb", "log:aa:bb", "$sync:log:aa:xx", "$sync:log:x:bb"}
+
+	for _, k := range invalid {
+		if _, _, err := splitLogRecKey(nil, k); err == nil {
+			t.Fatalf("splitting log rec key didn't fail %q", k)
+		}
+	}
+
+	valid := []struct {
+		id  uint64
+		gen uint64
+	}{
+		{10, 20},
+		{190, 540},
+		{9999, 999999},
+	}
+
+	for _, v := range valid {
+		gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.id, v.gen))
+		if gotId != v.id || gotGen != v.gen || err != nil {
+			t.Fatalf("failed key conversion id got %v want %v, gen got %v want %v, err %v", gotId, v.id, gotGen, v.gen, err)
+		}
+	}
+}
+
 //////////////////////////////
 // Helpers
 
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 31b1a3d..5be4af6 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -15,7 +15,6 @@
 
 import (
 	"fmt"
-	"strings"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -285,17 +284,17 @@
 
 // sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
 func sgDataKeyScanPrefix() string {
-	return util.JoinKeyParts(util.SyncPrefix, "sg", "d")
+	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
 }
 
 // sgDataKey returns the key used to access the SyncGroup data entry.
 func sgDataKey(gid interfaces.GroupId) string {
-	return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
+	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
 }
 
 // sgNameKey returns the key used to access the SyncGroup name entry.
 func sgNameKey(name string) string {
-	return util.JoinKeyParts(util.SyncPrefix, "sg", "n", name)
+	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
 }
 
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
@@ -552,15 +551,6 @@
 	return err
 }
 
-// TODO(hpucha): Should this be generalized?
-func splitPrefix(name string) (string, string) {
-	parts := strings.SplitN(name, "/", 2)
-	if len(parts) == 2 {
-		return parts[0], parts[1]
-	}
-	return parts[0], ""
-}
-
 // bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
 // a set of Snapshot operations to notify the sync watcher about the SyncGroup
 // prefixes to start accepting and the initial state of existing store keys that
@@ -630,7 +620,7 @@
 	for _, mt := range spec.MountTables {
 		name := naming.Join(mt, ss.name)
 		// TODO(hpucha): Is this add idempotent? Appears to be from code.
-		// Will it handle absolute names. Appears to be.
+		// Confirm that it is ok to use absolute names here.
 		if err := ss.server.AddName(name); err != nil {
 			return err
 		}
diff --git a/x/ref/services/syncbase/vsync/syncgroup_test.go b/x/ref/services/syncbase/vsync/syncgroup_test.go
index 396a6bd..6c87ee7 100644
--- a/x/ref/services/syncbase/vsync/syncgroup_test.go
+++ b/x/ref/services/syncbase/vsync/syncgroup_test.go
@@ -43,7 +43,9 @@
 
 // TestAddSyncGroup tests adding SyncGroups.
 func TestAddSyncGroup(t *testing.T) {
-	// Set a large value to prevent the threads from firing.
+	// Set a large value to prevent the initiator from running. Since this
+	// test adds a fake SyncGroup, if the initiator runs, it will attempt
+	// to initiate using this fake and partial SyncGroup data.
 	peerSyncInterval = 1 * time.Hour
 	svc := createService(t)
 	defer destroyService(t, svc)
diff --git a/x/ref/services/syncbase/vsync/types.vdl b/x/ref/services/syncbase/vsync/types.vdl
index 226f739..f44468e 100644
--- a/x/ref/services/syncbase/vsync/types.vdl
+++ b/x/ref/services/syncbase/vsync/types.vdl
@@ -8,6 +8,15 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 )
 
+// Key prefixes for sync data structures. All these prefixes are prepended with
+// util.SyncPrefix.
+const (
+	logPrefix  = "log"
+	dbssPrefix = "dbss"
+	dagPrefix  = "dag"
+	sgPrefix   = "sg"
+)
+
 // syncData represents the persistent state of the sync module.
 type syncData struct {
 	Id uint64
@@ -15,9 +24,9 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen     uint64               // local generation number incremented on every local update.
-	CkPtGen uint64               // local generation number advertised to remote peers (used by the responder).
-	GenVec  interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Gen        uint64               // local generation number incremented on every local update.
+	CheckptGen uint64               // local generation number advertised to remote peers (used by the responder).
+	GenVec     interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
 // localLogRec represents the persistent local state of a log record. Metadata
diff --git a/x/ref/services/syncbase/vsync/types.vdl.go b/x/ref/services/syncbase/vsync/types.vdl.go
index ae51caa..e9b1d7d 100644
--- a/x/ref/services/syncbase/vsync/types.vdl.go
+++ b/x/ref/services/syncbase/vsync/types.vdl.go
@@ -27,9 +27,9 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen     uint64               // local generation number incremented on every local update.
-	CkPtGen uint64               // local generation number advertised to remote peers (used by the responder).
-	GenVec  interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Gen        uint64               // local generation number incremented on every local update.
+	CheckptGen uint64               // local generation number advertised to remote peers (used by the responder).
+	GenVec     interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
 func (dbSyncState) __VDLReflect(struct {
@@ -54,3 +54,11 @@
 	vdl.Register((*dbSyncState)(nil))
 	vdl.Register((*localLogRec)(nil))
 }
+
+const logPrefix = "log"
+
+const dbssPrefix = "dbss"
+
+const dagPrefix = "dag"
+
+const sgPrefix = "sg"