syncbase/vsync: add the sync DAG layer.

Add the sync DAG layer to store the history of object mutations,
use it to detect conflicts, and record conflict resolutions.

Change-Id: I52967a0b2626bf2fe5324081c64ff14059d566fa
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
new file mode 100644
index 0000000..a84d916
--- /dev/null
+++ b/services/syncbase/vsync/dag.go
@@ -0,0 +1,862 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Syncbase DAG (directed acyclic graph) utility functions.
+//
+// The DAG is used to track the version history of synced objects in order
+// to detect and resolve conflicts (concurrent changes on different devices).
+//
+// Note: the sync code uses the words "object" and "object ID" (oid) as a
+// generic way to refer to syncable entities, whether they are actual user data
+// (table row and its row key), prefix-ACLs (permission entry and its prefix),
+// or other metadata such as SyncGroups (SyncGroup value and its internal key
+// based on the SyncGroup ID).
+//
+// * Object IDs are globally unique across all devices.
+// * Syncable objects have version numbers associated with their mutations.
+// * For a given object ID, the version number is globally unique across all
+//   devices, i.e. the (oid, version) tuple is globally unique.
+// * Each (oid, version) tuple is represented by a node in the DAG.
+// * The previous version of an object is its parent in the DAG, i.e. the
+//   new version is derived from that parent.
+// * DAG nodes have child-to-parent pointers.
+// * When there are no conflicts, the parent node has a single child node
+//   that points to it.
+// * When a parent node has more than one child, this indicates concurrent
+//   mutations which are treated as a conflict to be resolved.
+// * When a conflict is resolved, the new version has pointers back to each of
+//   the two parents to indicate that it is derived from both nodes.
+// * During a sync operation from a source device to a target device, the
+//   target receives a DAG fragment from the source.  That fragment has to
+//   be incorporated (grafted) into the target device's DAG.  It may be a
+//   continuation of the DAG of an object, with the attachment (graft) point
+//   being the current head of DAG, in which case there are no conflicts.
+//   Or the graft point(s) may be older nodes, which means the new fragment
+//   is a divergence in the graph causing a conflict that must be resolved
+//   in order to re-converge the two DAG fragments.
+//
+// In the diagrams below:
+// (h) represents the head node in the local device.
+// (nh) represents the new head node received from the remote device.
+// (g) represents a graft node, where new nodes attach to the existing DAG.
+// <- represents a derived-from mutation, i.e. a child-to-parent pointer
+//
+// a- No-conflict example: the new nodes (v4, v5) attach to the head node (v3).
+//    In this case the new head becomes the head node, the new DAG fragment
+//    being a continuation of the existing DAG.
+//
+//    Before:
+//    v1 <- v2 <- v3(h)
+//
+//    Sync updates applied, no conflict detected:
+//    v1 <- v2 <- v3(h,g) <- v4 <- v5 (nh)
+//
+//    After:
+//    v1 <- v2 <- v3 <- v4 <- v5 (h)
+//
+// b- Conflict example: the new nodes (v4, v5) attach to an old node (v2).
+//    The current head node (v3) and the new head node (v5) are divergent
+//    (concurrent) mutations that need to be resolved.  The conflict
+//    resolution function is passed the old head (v3), new head (v5), and
+//    the common ancestor (v2).  It resolves the conflict with (v6) which
+//    is represented in the DAG as derived from both v3 and v5 (2 parents).
+//
+//    Before:
+//    v1 <- v2 <- v3(h)
+//
+//    Sync updates applied, conflict detected (v3 not a graft node):
+//    v1 <- v2(g) <- v3(h)
+//                <- v4 <- v5 (nh)
+//
+//    After: conflict resolver creates v6 having 2 parents (v3, v5):
+//    v1 <- v2(g) <- v3 <------- v6(h)
+//                <- v4 <- v5 <-
+//
+// The DAG does not grow indefinitely.  During a sync operation each device
+// learns what the other device already knows -- where it's at in the version
+// history for the objects.  When a device determines that all devices that
+// sync an object (members of matching SyncGroups) have moved past some version
+// for that object, the DAG for that object can be pruned up to that common
+// version, deleting all prior (ancestor) nodes.
+//
+// The DAG contains three tables persisted to disk (nodes, heads, batches):
+//
+//   * nodes:   one entry per (oid, version) with references to parent node(s)
+//              it is derived from, a reference to the log record identifying
+//              that mutation, a reference to its write batch (if any), and a
+//              boolean to indicate whether this was an object deletion.
+//
+//   * heads:   one entry per object pointing to its most recent version.
+//
+//   * batches: one entry per batch ID containing the set of objects in the
+//              write batch and their versions.
+
+import (
+	"container/list"
+	"fmt"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+const (
+	NoVersion = ""
+	NoBatchId = uint64(0)
+)
+
+// dagNode holds the information on a object mutation in the DAG.
+// Note: the batch ID and deleted flag are copies of information in the log
+// record.  They are also stored in the DAG node to improve DAG traversal for
+// conflict resolution and pruning without having to fetch the full log record
+// every time.
+type dagNode struct {
+	Level   uint64   // node distance from root
+	Parents []string // references to parent versions
+	Logrec  string   // reference to log record
+	BatchId uint64   // ID of a write batch
+	Deleted bool     // true if the change was a delete
+}
+
+// batchSet holds information on a set of write batches.
+type batchSet map[uint64]*batchInfo
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+	Objects map[string]string
+	Count   uint64
+}
+
+// graftMap holds the state of DAG node grafting (attaching) per object.
+type graftMap map[string]*graftInfo
+
+// graftInfo holds the state of an object's node grafting in the DAG.
+// It is ephemeral (in-memory), used during a single sync operation to track
+// where the new DAG fragments are attached to the existing DAG for the object:
+// - newNodes:   the set of newly added nodes; used to detect the type of edges
+//               between nodes (new-node to old-node or vice versa).
+// - newHeads:   the set of new candidate head nodes; used to detect conflicts.
+// - graftNodes: the set of old nodes on which new nodes were added, and their
+//               level in the DAG; used to find common ancestors for conflicts.
+//
+// After the received mutations are applied, if there are two heads in the
+// newHeads set, there is a conflict to be resolved for the object.  Otherwise,
+// if there is one head, no conflict was triggered and the new head becomes the
+// current object version.  In case of conflict, the graftNodes set is used to
+// select a common ancestor.
+// TODO(rdaoud): support open DAGs to handle delayed conflict resolution by
+// tracking multiple dangling remote heads in addition to the local head node.
+type graftInfo struct {
+	newNodes   map[string]bool
+	newHeads   map[string]bool
+	graftNodes map[string]uint64
+}
+
+// newBatchInfo allocates and initializes a batch info entry.
+func newBatchInfo() *batchInfo {
+	return &batchInfo{Objects: make(map[string]string), Count: 0}
+}
+
+// startBatch marks the start of a batch.  It generates a batch ID and returns
+// it to the caller, if an ID is not given.  The batch ID is used to track DAG
+// nodes that are part of the same batch and it is stored in the log records.
+// If a batch ID is given by the caller, its information is accessed.
+func (s *syncService) startBatch(ctx *context.T, st store.StoreReader, btid uint64) uint64 {
+	s.batchesLock.Lock()
+	defer s.batchesLock.Unlock()
+
+	// If no batch ID is given, generate a new unused one.
+	if btid == NoBatchId {
+		for (btid == NoBatchId) || (s.batches[btid] != nil) {
+			btid = rand64()
+		}
+
+		s.batches[btid] = newBatchInfo()
+		return btid
+	}
+
+	// Use the given batch ID and, if needed, refetch its in-memory entry
+	// from the store.  It is OK not to find it in the store; it means sync
+	// is learning about this batch ID the first time from another sync.
+	if s.batches[btid] == nil {
+		info, err := getBatch(ctx, st, btid)
+		if err != nil {
+			info = newBatchInfo()
+		}
+		s.batches[btid] = info
+	}
+
+	return btid
+}
+
+// addNodeToBatch adds a node (oid, version) to a batch under construction.
+func (s *syncService) addNodeToBatch(ctx *context.T, btid uint64, oid, version string) error {
+	s.batchesLock.Lock()
+	defer s.batchesLock.Unlock()
+
+	if btid == NoBatchId {
+		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+	}
+
+	info := s.batches[btid]
+	if info == nil {
+		return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid)
+	}
+
+	info.Objects[oid] = version
+	return nil
+}
+
+// endBatch marks the end of a given batch.  The batch information is persisted
+// to the store and removed from the temporary in-memory entry.
+func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error {
+	_ = tx.(store.Transaction)
+
+	s.batchesLock.Lock()
+	defer s.batchesLock.Unlock()
+
+	if btid == NoBatchId || count == 0 {
+		return verror.New(verror.ErrInternal, ctx, "invalid batch info", btid, count)
+	}
+
+	info := s.batches[btid]
+	if info == nil {
+		return verror.New(verror.ErrInternal, ctx, "unknown batch id", btid)
+	}
+
+	// The first time a batch is ended, info.Count is zero.  Subsequently,
+	// if this batch ID is started and ended again, info.Count should be
+	// the same as the "count" value given.
+	if info.Count != 0 && info.Count != count {
+		return verror.New(verror.ErrInternal, ctx, "wrong counts for batch", btid, info.Count, count)
+	}
+
+	// Only save non-empty batches.
+	if len(info.Objects) > 0 {
+		info.Count = count
+		if err := setBatch(ctx, tx, btid, info); err != nil {
+			return err
+		}
+	}
+
+	delete(s.batches, btid)
+	return nil
+}
+
+// addNode adds a new node for a DAG object, linking it to its parent nodes.
+// It verifies that the node does not exist and its parent nodes are valid.
+// If a batch ID is given, track the node membership in the batch.
+//
+// Note: an in-memory grafting structure is passed to track DAG attachments
+// during a sync operation.  This is needed when nodes are being added due to
+// remote changes fetched by the sync protocol.  The Initiator allocates a
+// grafting structure at the start of a sync operation and passes it across
+// calls to addNode() to update the DAG grafting state:
+// - If a parent node is not new, mark it as a DAG graft point.
+// - Mark this version as a new node.
+// - Update the new head node pointer of the grafted DAG.
+//
+// The grafting structure is not needed when nodes are being added locally by
+// the Watcher, passing a nil grafting structure.
+func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
+	_ = tx.(store.Transaction)
+
+	if parents != nil {
+		if len(parents) > 2 {
+			return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
+		}
+		if len(parents) == 0 {
+			parents = nil // replace an empty array with a nil
+		}
+	}
+
+	// The new node must not exist.
+	if hasNode(ctx, tx, oid, version) {
+		return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
+	}
+
+	// A new root node (no parents) is allowed only for new objects.
+	// TODO(rdaoud): remove that limitation, we now can have key-collision.
+	if parents == nil {
+		if _, err := getHead(ctx, tx, oid); err == nil {
+			return verror.New(verror.ErrInternal, ctx,
+				"cannot add another root node for this object", oid, version)
+		}
+	}
+
+	// Verify the parents, determine the node level.  Also save the levels
+	// of the parent nodes for later in this function in graft updates.
+	parentLevels := make(map[string]uint64)
+	var level uint64
+	for _, parent := range parents {
+		pnode, err := getNode(ctx, tx, oid, parent)
+		if err != nil {
+			return err
+		}
+		parentLevels[parent] = pnode.Level
+		if level <= pnode.Level {
+			level = pnode.Level + 1
+		}
+	}
+
+	// If a batch ID is given, add the node to that batch.
+	if btid != NoBatchId {
+		if err := s.addNodeToBatch(ctx, btid, oid, version); err != nil {
+			return err
+		}
+	}
+
+	// Add the node entry to the DAG.
+	node := &dagNode{
+		Level:   level,
+		Parents: parents,
+		Logrec:  logrec,
+		BatchId: btid,
+		Deleted: deleted,
+	}
+	if err := setNode(ctx, tx, oid, version, node); err != nil {
+		return err
+	}
+
+	// We are done if grafting is not being tracked (a local node add).
+	if graft == nil {
+		return nil
+	}
+
+	// Get the object's graft info entry in order to update it.  It happens
+	// when addNode() is called by the sync Initiator and the DAG is updated
+	// with new nodes fetched from other devices.
+	//
+	// During a sync operation, each mutated object gets new nodes added in
+	// its DAG.  These new nodes are either derived from nodes that were
+	// previously known on this device (i.e. their parent nodes are pre-
+	// existing), or they are derived from other new DAG nodes being
+	// discovered during this sync (i.e. their parent nodes were also just
+	// added to the DAG).
+	//
+	// To detect a conflict and find the most recent common ancestor to
+	// pass to the conflict resolver, the DAG graft info keeps track of the
+	// new nodes that have old parent nodes.  These old-to-new edges are
+	// points where new DAG fragments are attached (grafted) onto the
+	// existing DAG.  The old nodes are the graft nodes forming the set of
+	// common ancestors to use in conflict resolution:
+	//
+	// 1- A conflict happens when the current "head node" for an object is
+	//    not in the set of graft nodes.  It means the object mutations
+	//    were not derived from what the device knows, but are divergent
+	//    changes at a prior point.
+	//
+	// 2- The most recent common ancestor to use in resolving the conflict
+	//    is the object graft node with the deepest level (furthest from
+	//    the root node), representing the most up-to-date common knowledge
+	//    between the devices.
+	info := getObjectGraftInfo(ctx, tx, graft, oid)
+
+	for _, parent := range parents {
+		// If this parent is an old node, it's a graft point.
+		if !info.newNodes[parent] {
+			info.graftNodes[parent] = parentLevels[parent]
+		}
+
+		// A parent cannot be a candidate for a new head.
+		delete(info.newHeads, parent)
+	}
+
+	// This new node is a candidate for new head version.
+	info.newNodes[version] = true
+	info.newHeads[version] = true
+	return nil
+}
+
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+//
+// Note: as with the addNode() call, an in-memory grafting structure is passed
+// to track DAG attachements during a sync operation.  It is not needed if the
+// parent linkage is due to a local change (from conflict resolution selecting
+// an existing version).
+func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error {
+	_ = tx.(store.Transaction)
+
+	if version == parent {
+		return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
+	}
+
+	node, err := getNode(ctx, tx, oid, version)
+	if err != nil {
+		return err
+	}
+	pnode, err := getNode(ctx, tx, oid, parent)
+	if err != nil {
+		return err
+	}
+
+	// Check if the parent is already linked to this node.
+	found := false
+	for _, p := range node.Parents {
+		if p == parent {
+			found = true
+			break
+		}
+	}
+
+	// Add the parent if it is not yet linked.
+	if !found {
+		// Make sure that adding the link does not create a DAG cycle.
+		// Verify that the node is not an ancestor of the parent that
+		// it is being linked to.
+		err = forEachAncestor(ctx, tx, oid, pnode.Parents, func(v string, nd *dagNode) error {
+			if v == version {
+				return verror.New(verror.ErrInternal, ctx, "cycle on object",
+					oid, ": node", version, "is ancestor of parent", parent)
+			}
+			return nil
+		})
+		if err != nil {
+			return err
+		}
+		node.Parents = append(node.Parents, parent)
+		if err = setNode(ctx, tx, oid, version, node); err != nil {
+			return err
+		}
+	}
+
+	// If no grafting structure is given (i.e. local changes), we are done.
+	if graft == nil {
+		return nil
+	}
+
+	// Update graft: if the node and its parent are new/old or old/new then
+	// add the parent as a graft point (a potential common ancestor).
+	info := getObjectGraftInfo(ctx, tx, graft, oid)
+
+	_, nodeNew := info.newNodes[version]
+	_, parentNew := info.newNodes[parent]
+	if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+		info.graftNodes[parent] = pnode.Level
+	}
+
+	// The parent node can no longer be a candidate for a new head version.
+	delete(info.newHeads, parent)
+	return nil
+}
+
+// moveHead moves the object head node in the DAG.
+func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error {
+	_ = tx.(store.Transaction)
+
+	// Verify that the node exists.
+	if !hasNode(ctx, tx, oid, head) {
+		return verror.New(verror.ErrInternal, ctx, "node", oid, head, "does not exist")
+	}
+
+	return setHead(ctx, tx, oid, head)
+}
+
+// hasConflict determines if an object has a conflict between its new and old
+// head nodes.
+// - Yes: return (true, newHead, oldHead, ancestor)
+// - No:  return (false, newHead, oldHead, NoVersion)
+// A conflict exists when there are two new-head nodes in the graft structure.
+// It means the newly added object versions are not derived in part from this
+// device's current knowledge.  If there is a single new-head, the object
+// changes were applied without triggering a conflict.
+func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
+	isConflict = false
+	oldHead = NoVersion
+	newHead = NoVersion
+	ancestor = NoVersion
+	err = nil
+
+	info := graft[oid]
+	if info == nil {
+		err = verror.New(verror.ErrInternal, ctx, "node", oid, "has no DAG graft info")
+		return
+	}
+
+	numHeads := len(info.newHeads)
+	if numHeads < 1 || numHeads > 2 {
+		err = verror.New(verror.ErrInternal, ctx, "node", oid, "invalid count of new heads", numHeads)
+		return
+	}
+
+	// Fetch the current head for this object if it exists.  The error from
+	// getHead() is ignored because a newly received object is not yet known
+	// on this device and will not trigger a conflict.
+	oldHead, _ = getHead(ctx, st, oid)
+
+	// If there is only one new head node there is no conflict.  The new
+	// head is that single one, even if it might also be the same old node.
+	if numHeads == 1 {
+		for head := range info.newHeads {
+			newHead = head
+		}
+		return
+	}
+
+	// With two candidate head nodes, the new head is the non-old one.
+	for head := range info.newHeads {
+		if head != oldHead {
+			newHead = head
+			break
+		}
+	}
+
+	// There is a conflict: the best choice ancestor is the graft node with
+	// the largest level (farthest from the root).  It is possible in some
+	// corner cases to have multiple graft nodes at the same level.  This
+	// would still be a single conflict, but the multiple same-level graft
+	// nodes representing equivalent conflict resolutions on different
+	// devices that are now merging their resolutions.  In such a case it
+	// does not matter which node is chosen as the ancestor because the
+	// conflict resolver function is assumed to be convergent.  However it
+	// is nicer to make that selection deterministic so all devices see the
+	// same choice: the version number is used as a tie-breaker.
+	isConflict = true
+	var maxLevel uint64
+	for node, level := range info.graftNodes {
+		if maxLevel < level || (maxLevel == level && ancestor < node) {
+			maxLevel = level
+			ancestor = node
+		}
+	}
+	return
+}
+
+// newGraft allocates a graftMap to track DAG node grafting during sync.
+func newGraft() graftMap {
+	return make(graftMap)
+}
+
+// getObjectGraft returns the graftInfo for an object ID.  If the graftMap is
+// nil, a nil graftInfo is returned because grafting is not being tracked.
+func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo {
+	if graft == nil {
+		return nil
+	}
+	if info := graft[oid]; info != nil {
+		return info
+	}
+
+	info := &graftInfo{
+		newNodes:   make(map[string]bool),
+		newHeads:   make(map[string]bool),
+		graftNodes: make(map[string]uint64),
+	}
+
+	// If the object has a head node, include it in the set of new heads.
+	if head, err := getHead(ctx, st, oid); err == nil {
+		info.newHeads[head] = true
+	}
+
+	graft[oid] = info
+	return info
+}
+
+// forEachAncestor loops over the DAG ancestor nodes of an object in a breadth-
+// first traversal starting from given version nodes.  It calls the given
+// callback function once for each ancestor node.
+func forEachAncestor(ctx *context.T, st store.StoreReader, oid string, startVersions []string, callback func(version string, node *dagNode) error) error {
+	visited := make(map[string]bool)
+	queue := list.New()
+	for _, version := range startVersions {
+		queue.PushBack(version)
+		visited[version] = true
+	}
+
+	for queue.Len() > 0 {
+		version := queue.Remove(queue.Front()).(string)
+		node, err := getNode(ctx, st, oid, version)
+		if err != nil {
+			// Ignore it, the parent was previously pruned.
+			continue
+		}
+		for _, parent := range node.Parents {
+			if !visited[parent] {
+				queue.PushBack(parent)
+				visited[parent] = true
+			}
+		}
+		if err = callback(version, node); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// newBatchPruning allocates an in-memory structure to track batches affected
+// by a DAG pruning operation across objects.
+func newBatchPruning() batchSet {
+	return make(batchSet)
+}
+
+// prune trims the DAG of an object at a given version (node) by deleting all
+// its ancestor nodes, making it the new root node.  For each deleted node it
+// calls the given callback function to delete its log record.
+//
+// Note: this function should only be used when sync determines that all devices
+// that know about this object have gotten past this version.
+//
+// The batch set passed is used to track batches affected by the deletion of DAG
+// objects across multiple calls to prune().  It is later given to pruneDone()
+// to do GC on these batches.
+func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error {
+	_ = tx.(store.Transaction)
+
+	if batches == nil {
+		return verror.New(verror.ErrInternal, ctx, "missing batch set")
+	}
+
+	// Get the node at the pruning point and set its parents to nil.
+	// It will become the oldest DAG node (root) for the object.
+	node, err := getNode(ctx, tx, oid, version)
+	if err != nil {
+		return err
+	}
+	if node.Parents == nil {
+		// Nothing to do, this node is already the root.
+		return nil
+	}
+
+	parents := node.Parents
+	node.Parents = nil
+	if err = setNode(ctx, tx, oid, version, node); err != nil {
+		return err
+	}
+
+	// Delete all ancestor nodes and their log records. Delete as many as
+	// possible and track the error counts.  Update the batch set to track
+	// their pruning.
+	nodeErrs, logErrs := 0, 0
+	forEachAncestor(ctx, tx, oid, parents, func(v string, nd *dagNode) error {
+		if btid := nd.BatchId; btid != NoBatchId {
+			if batches[btid] == nil {
+				batches[btid] = newBatchInfo()
+			}
+			batches[btid].Objects[oid] = v
+		}
+
+		if err := delLogRec(ctx, tx, nd.Logrec); err != nil {
+			logErrs++
+		}
+		if err := delNode(ctx, tx, oid, v); err != nil {
+			nodeErrs++
+		}
+		return nil
+	})
+	if nodeErrs != 0 || logErrs != 0 {
+		return verror.New(verror.ErrInternal, ctx,
+			"prune failed to delete nodes and logs:", nodeErrs, logErrs)
+	}
+	return nil
+}
+
+// pruneDone is called when object pruning is finished within a single pass of
+// the sync garbage collector.  It updates the batch sets affected by objects
+// deleted by prune().
+func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error {
+	_ = tx.(store.Transaction)
+
+	// Update batch sets by removing the pruned objects from them.
+	for btid, pruneInfo := range batches {
+		info, err := getBatch(ctx, tx, btid)
+		if err != nil {
+			return err
+		}
+
+		for oid := range pruneInfo.Objects {
+			delete(info.Objects, oid)
+		}
+
+		if len(info.Objects) > 0 {
+			err = setBatch(ctx, tx, btid, info)
+		} else {
+			err = delBatch(ctx, tx, btid)
+		}
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// getLogrec returns the log record information for a given object version.
+func getLogrec(ctx *context.T, st store.StoreReader, oid, version string) (string, error) {
+	node, err := getNode(ctx, st, oid, version)
+	if err != nil {
+		return "", err
+	}
+	return node.Logrec, nil
+}
+
+// Low-level utility functions to access DB entries without tracking their
+// relationships.  Use the functions above to manipulate the DAG.
+
+// nodeKey returns the key used to access a DAG node (oid, version).
+func nodeKey(oid, version string) string {
+	return util.JoinKeyParts(util.SyncPrefix, "dag", "n", oid, version)
+}
+
+// setNode stores the DAG node entry.
+func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error {
+	_ = tx.(store.Transaction)
+
+	if version == NoVersion {
+		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+	}
+
+	if err := util.PutObject(tx, nodeKey(oid, version), node); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getNode retrieves the DAG node entry for the given (oid, version).
+func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) {
+	if version == NoVersion {
+		return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version)
+	}
+
+	var node dagNode
+	if err := util.GetObject(st, nodeKey(oid, version), &node); err != nil {
+		return nil, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return &node, nil
+}
+
+// delNode deletes the DAG node entry.
+func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
+	_ = tx.(store.Transaction)
+
+	if version == NoVersion {
+		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+	}
+
+	if err := tx.Delete([]byte(nodeKey(oid, version))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// hasNode returns true if the node (oid, version) exists in the DAG.
+func hasNode(ctx *context.T, st store.StoreReader, oid, version string) bool {
+	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
+	if _, err := getNode(ctx, st, oid, version); err != nil {
+		return false
+	}
+	return true
+}
+
+// headKey returns the key used to access the DAG object head.
+func headKey(oid string) string {
+	return util.JoinKeyParts(util.SyncPrefix, "dag", "h", oid)
+}
+
+// setHead stores version as the DAG object head.
+func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
+	_ = tx.(store.Transaction)
+
+	if version == NoVersion {
+		return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
+	}
+
+	if err := util.PutObject(tx, headKey(oid), version); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getHead retrieves the DAG object head.
+func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) {
+	var version string
+	if err := util.GetObject(st, headKey(oid), &version); err != nil {
+		return NoVersion, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return version, nil
+}
+
+// delHead deletes the DAG object head.
+func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
+	_ = tx.(store.Transaction)
+
+	if err := tx.Delete([]byte(headKey(oid))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// batchKey returns the key used to access the DAG batch info.
+func batchKey(btid uint64) string {
+	return util.JoinKeyParts(util.SyncPrefix, "dag", "b", fmt.Sprintf("%d", btid))
+}
+
+// setBatch stores the DAG batch entry.
+func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error {
+	_ = tx.(store.Transaction)
+
+	if btid == NoBatchId {
+		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+	}
+
+	if err := util.PutObject(tx, batchKey(btid), info); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getBatch retrieves the DAG batch entry.
+func getBatch(ctx *context.T, st store.StoreReader, btid uint64) (*batchInfo, error) {
+	if btid == NoBatchId {
+		return nil, verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+	}
+
+	var info batchInfo
+	if err := util.GetObject(st, batchKey(btid), &info); err != nil {
+		return nil, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return &info, nil
+}
+
+// delBatch deletes the DAG batch entry.
+func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error {
+	_ = tx.(store.Transaction)
+
+	if btid == NoBatchId {
+		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
+	}
+
+	if err := tx.Delete([]byte(batchKey(btid))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getParentMap is a testing and debug helper function that returns for an
+// object a map of its DAG (node-to-parents relations).  If a graft structure
+// is given, include its fragments in the map.
+func getParentMap(ctx *context.T, st store.StoreReader, oid string, graft graftMap) map[string][]string {
+	parentMap := make(map[string][]string)
+	var start []string
+
+	if head, err := getHead(ctx, st, oid); err == nil {
+		start = append(start, head)
+	}
+	if graft != nil && graft[oid] != nil {
+		for v := range graft[oid].newHeads {
+			start = append(start, v)
+		}
+	}
+
+	forEachAncestor(ctx, st, oid, start, func(v string, nd *dagNode) error {
+		parentMap[v] = nd.Parents
+		return nil
+	})
+	return parentMap
+}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
new file mode 100644
index 0000000..1b899df
--- /dev/null
+++ b/services/syncbase/vsync/dag_test.go
@@ -0,0 +1,809 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Tests for the Syncbase DAG.
+
+import (
+	"errors"
+	"fmt"
+	"reflect"
+	"strconv"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+
+	"v.io/v23/context"
+)
+
+// TestSetNode tests setting and getting a DAG node.
+func TestSetNode(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+
+	oid, version := "1111", "1"
+
+	node, err := getNode(nil, st, oid, version)
+	if err == nil || node != nil {
+		t.Errorf("found non-existent object %s:%s: %v", oid, version, node)
+	}
+
+	if hasNode(nil, st, oid, version) {
+		t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+		t.Errorf("non-existent object %s:%s has a logrec: %v", oid, version, logrec)
+	}
+
+	node = &dagNode{Level: 15, Parents: []string{"444", "555"}, Logrec: "logrec-23"}
+
+	tx := st.NewTransaction()
+	if err = setNode(nil, tx, oid, version, node); err != nil {
+		t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+	}
+	tx.Commit()
+
+	node2, err := getNode(nil, st, oid, version)
+	if err != nil || node2 == nil {
+		t.Errorf("cannot find stored object %s:%s: %v", oid, version, err)
+	}
+
+	if !hasNode(nil, st, oid, version) {
+		t.Errorf("hasNode() did not find object %s:%s", oid, version)
+	}
+
+	if !reflect.DeepEqual(node, node2) {
+		t.Errorf("object %s:%s has wrong data: %v instead of %v", oid, version, node2, node)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, version); err != nil || logrec != "logrec-23" {
+		t.Errorf("object %s:%s has wrong logrec: %s", oid, version, logrec)
+	}
+}
+
+// TestDelNode tests deleting a DAG node.
+func TestDelNode(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+
+	oid, version := "2222", "2"
+
+	node := &dagNode{Level: 123, Parents: []string{"333"}, Logrec: "logrec-789"}
+
+	tx := st.NewTransaction()
+	if err := setNode(nil, tx, oid, version, node); err != nil {
+		t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+	}
+	tx.Commit()
+
+	tx = st.NewTransaction()
+	if err := delNode(nil, tx, oid, version); err != nil {
+		t.Fatalf("cannot delete object %s:%s: %v", oid, version, err)
+	}
+	tx.Commit()
+
+	node2, err := getNode(nil, st, oid, version)
+	if err == nil || node2 != nil {
+		t.Errorf("found deleted object %s:%s (%v)", oid, version, node2)
+	}
+
+	if hasNode(nil, st, oid, version) {
+		t.Errorf("hasNode() found deleted object %s:%s", oid, version)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, version); err == nil || logrec != "" {
+		t.Errorf("deleted object %s:%s has logrec: %s", oid, version, logrec)
+	}
+}
+
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid, version := "1234", "7"
+
+	tx := st.NewTransaction()
+	if err := s.addParent(nil, tx, oid, version, "haha", nil); err == nil {
+		t.Errorf("addParent() did not fail for an unknown object %s:%s", oid, version)
+	}
+	tx.Abort()
+
+	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	node := &dagNode{Level: 15, Logrec: "logrec-22"}
+
+	tx = st.NewTransaction()
+	if err := setNode(nil, tx, oid, version, node); err != nil {
+		t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
+	}
+	tx.Commit()
+
+	graft := newGraft()
+	tx = st.NewTransaction()
+	if err := s.addParent(nil, tx, oid, version, version, graft); err == nil {
+		t.Errorf("addParent() did not fail on a self-parent for object %s:%s", oid, version)
+	}
+	tx.Abort()
+
+	remote := true
+	expParents := []string{"4", "5", "6"}
+
+	for _, parent := range expParents {
+		tx = st.NewTransaction()
+		if err := s.addParent(nil, tx, oid, version, parent, graft); err == nil {
+			t.Errorf("addParent() did not reject invalid parent %s for object %s:%s",
+				parent, oid, version)
+		}
+		tx.Abort()
+
+		pnode := &dagNode{Level: 11, Logrec: fmt.Sprintf("logrec-%s", parent), Parents: []string{"3"}}
+
+		tx = st.NewTransaction()
+		if err := setNode(nil, tx, oid, parent, pnode); err != nil {
+			t.Fatalf("cannot set parent object %s:%s (%v): %v", oid, parent, pnode, err)
+		}
+		tx.Commit()
+
+		var g graftMap
+		if remote {
+			g = graft
+		}
+
+		// addParent() twice to verify it is idempotent.
+		for i := 0; i < 2; i++ {
+			tx = st.NewTransaction()
+			if err := s.addParent(nil, tx, oid, version, parent, g); err != nil {
+				t.Errorf("addParent() failed on parent %s, remote %t (i=%d) for %s:%s: %v",
+					parent, remote, i, oid, version, err)
+			}
+			tx.Commit()
+		}
+
+		remote = !remote
+	}
+
+	node2, err := getNode(nil, st, oid, version)
+	if err != nil || node2 == nil {
+		t.Errorf("cannot find object %s:%s: %v", oid, version, err)
+	}
+
+	if !reflect.DeepEqual(node2.Parents, expParents) {
+		t.Errorf("invalid parents for object %s:%s: %v instead of %v",
+			oid, version, node2.Parents, expParents)
+	}
+
+	// Creating cycles should fail.
+	for v := 1; v < 7; v++ {
+		ver := fmt.Sprintf("%d", v)
+		tx = st.NewTransaction()
+		if err = s.addParent(nil, tx, oid, ver, version, nil); err == nil {
+			t.Errorf("addParent() failed to reject a cycle for %s: from ancestor %s to node %s",
+				oid, ver, version)
+		}
+		tx.Abort()
+	}
+}
+
+// TestSetHead tests setting and getting a DAG head node.
+func TestSetHead(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+
+	oid := "3333"
+
+	version, err := getHead(nil, st, oid)
+	if err == nil {
+		t.Errorf("found non-existent object head %s:%s", oid, version)
+	}
+
+	for i := 0; i < 2; i++ {
+		version = fmt.Sprintf("v%d", 555+i)
+
+		tx := st.NewTransaction()
+		if err = setHead(nil, tx, oid, version); err != nil {
+			t.Fatalf("cannot set object head %s:%s (i=%d)", oid, version, i)
+		}
+		tx.Commit()
+
+		version2, err := getHead(nil, st, oid)
+		if err != nil {
+			t.Errorf("cannot find stored object head %s (i=%d)", oid, i)
+		}
+		if version != version2 {
+			t.Errorf("object %s has wrong head data (i=%d): %s instead of %s",
+				oid, i, version2, version)
+		}
+	}
+}
+
+// TestLocalUpdates tests the sync handling of initial local updates: an object
+// is created (v1) and updated twice (v2, v3) on this device.  The DAG should
+// show: v1 -> v2 -> v3 and the head should point to v3.
+func TestLocalUpdates(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must have moved to v3 and the parent map shows the updated DAG.
+	if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+		t.Errorf("invalid object %s head: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, nil)
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+
+	// Make sure an existing node cannot be added again.
+	tx := st.NewTransaction()
+	if err := s.addNode(nil, tx, oid, "2", "foo", false, []string{"1", "3"}, NoBatchId, nil); err == nil {
+		t.Errorf("addNode() did not fail when given an existing node")
+	}
+
+	// Make sure a new node cannot have more than 2 parents.
+	if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
+		t.Errorf("addNode() did not fail when given 3 parents")
+	}
+
+	// Make sure a new node cannot have an invalid parent.
+	if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "555"}, NoBatchId, nil); err == nil {
+		t.Errorf("addNode() did not fail when using an invalid parent")
+	}
+
+	// Make sure a new root node (no parents) cannot be added once a root exists.
+	// For the parents array, check both the "nil" and the empty array as input.
+	if err := s.addNode(nil, tx, oid, "6789", "foo", false, nil, NoBatchId, nil); err == nil {
+		t.Errorf("adding a 2nd root node (nil parents) for object %s did not fail", oid)
+	}
+	if err := s.addNode(nil, tx, oid, "6789", "foo", false, []string{}, NoBatchId, nil); err == nil {
+		t.Errorf("adding a 2nd root node (empty parents) for object %s did not fail", oid)
+	}
+	tx.Abort()
+}
+
+// TestRemoteUpdates tests the sync handling of initial remote updates:
+// an object is created (v1) and updated twice (v2, v3) on another device and
+// we learn about it during sync.  The updated DAG should show: v1 -> v2 -> v3
+// and report no conflicts with the new head pointing at v3.
+func TestRemoteUpdates(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	graft, err := s.dagReplayCommands(nil, "remote-init-00.log.sync")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still undefined) and the parent
+	// map shows the newly grafted DAG fragment.
+	if head, err := getHead(nil, st, oid); err == nil {
+		t.Errorf("object %s head found: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, graft)
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	g := graft[oid]
+
+	expNewHeads := map[string]bool{"3": true}
+
+	if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+		t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+	}
+
+	expGrafts := map[string]uint64{}
+	if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+		t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+	if !(!isConflict && newHead == "3" && oldHead == NoVersion && ancestor == NoVersion && errConflict == nil) {
+		t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
+	}
+
+	// Make sure an unknown node cannot become the new head.
+	tx := st.NewTransaction()
+	if err := moveHead(nil, tx, oid, "55"); err == nil {
+		t.Errorf("moveHead() did not fail on an invalid node")
+	}
+	tx.Abort()
+
+	// Then move the head.
+	tx = st.NewTransaction()
+	if err := moveHead(nil, tx, oid, newHead); err != nil {
+		t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
+	}
+	tx.Commit()
+}
+
+// TestRemoteNoConflict tests sync of remote updates on top of a local initial
+// state without conflict.  An object is created locally and updated twice
+// (v1 -> v2 -> v3).  Another device, having gotten this info, makes 3 updates
+// on top of that (v3 -> v4 -> v5 -> v6) and sends this info in a later sync.
+// The updated DAG should show (v1 -> v2 -> v3 -> v4 -> v5 -> v6) and report
+// no conflicts with the new head pointing at v6.  It should also report v3 as
+// the graft point on which the new fragment (v4 -> v5 -> v6) gets attached.
+func TestRemoteNoConflict(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+	graft, err := s.dagReplayCommands(nil, "remote-noconf-00.log.sync")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v3) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+		t.Errorf("object %s has wrong head: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, graft)
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"3"}, "5": {"4"}, "6": {"5"}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	g := graft[oid]
+
+	expNewHeads := map[string]bool{"6": true}
+	if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+		t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+	}
+
+	expGrafts := map[string]uint64{"3": 2}
+	if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+		t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+	if !(!isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+		t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+		t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
+	}
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
+	}
+
+	// Then move the head.
+	tx := st.NewTransaction()
+	if err := moveHead(nil, tx, oid, newHead); err != nil {
+		t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
+	}
+	tx.Commit()
+
+	// Verify that hasConflict() fails without graft data.
+	isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, nil)
+	if errConflict == nil {
+		t.Errorf("hasConflict() on %s did not fail w/o graft data: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+}
+
+// TestRemoteConflict tests sync handling remote updates that build on the
+// local initial state and trigger a conflict.  An object is created locally
+// and updated twice (v1 -> v2 -> v3).  Another device, having only gotten
+// the v1 -> v2 history, makes 3 updates on top of v2 (v2 -> v4 -> v5 -> v6)
+// and sends this info during a later sync.  Separately, the local device
+// makes a conflicting (concurrent) update v2 -> v3.  The updated DAG should
+// show the branches: (v1 -> v2 -> v3) and (v1 -> v2 -> v4 -> v5 -> v6) and
+// report the conflict between v3 and v6 (current and new heads).  It should
+// also report v2 as the graft point and the common ancestor in the conflict.
+// The conflict is resolved locally by creating v7 that is derived from both
+// v3 and v6 and it becomes the new head.
+func TestRemoteConflict(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+	graft, err := s.dagReplayCommands(nil, "remote-conf-00.log.sync")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v3) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+		t.Errorf("object %s has wrong head: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, graft)
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"4"}, "6": {"5"}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	g := graft[oid]
+
+	expNewHeads := map[string]bool{"3": true, "6": true}
+	if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+		t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+	}
+
+	expGrafts := map[string]uint64{"2": 1}
+	if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+		t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+	}
+
+	// There should be a conflict between v3 and v6 with v2 as ancestor.
+	isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+	if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
+		t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
+	}
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
+	}
+	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
+	}
+
+	// Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
+	if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// Verify that the head moved to v7 and the parent map shows the resolution.
+	if head, err := getHead(nil, st, oid); err != nil || head != "7" {
+		t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
+	}
+
+	exp["7"] = []string{"3", "6"}
+	pmap = getParentMap(nil, st, oid, nil)
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
+			oid, pmap, exp)
+	}
+}
+
+// TestRemoteConflictTwoGrafts tests sync handling remote updates that build
+// on the local initial state and trigger a conflict with 2 graft points.
+// An object is created locally and updated twice (v1 -> v2 -> v3).  Another
+// device, first learns about v1 and makes it own conflicting update v1 -> v4.
+// That remote device later learns about v2 and resolves the v2/v4 confict by
+// creating v5.  Then it makes a last v5 -> v6 update -- which will conflict
+// with v3 but it doesn't know that.
+// Now the sync order is reversed and the local device learns all of what
+// happened on the remote device.  The local DAG should get be augmented by
+// a subtree with 2 graft points: v1 and v2.  It receives this new branch:
+// v1 -> v4 -> v5 -> v6.  Note that v5 is also derived from v2 as a remote
+// conflict resolution.  This should report a conflict between v3 and v6
+// (current and new heads), with v1 and v2 as graft points, and v2 as the
+// most-recent common ancestor for that conflict.  The conflict is resolved
+// locally by creating v7, derived from both v3 and v6, becoming the new head.
+func TestRemoteConflictTwoGrafts(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+	graft, err := s.dagReplayCommands(nil, "remote-conf-01.log.sync")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v3) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+		t.Errorf("object %s has wrong head: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, graft)
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1"}, "5": {"2", "4"}, "6": {"5"}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	g := graft[oid]
+
+	expNewHeads := map[string]bool{"3": true, "6": true}
+	if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+		t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+	}
+
+	expGrafts := map[string]uint64{"1": 0, "2": 1}
+	if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+		t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+	}
+
+	// There should be a conflict between v3 and v6 with v2 as ancestor.
+	isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+	if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
+		t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
+	}
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:1" {
+		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
+	}
+	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
+	}
+
+	// Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
+	if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// Verify that the head moved to v7 and the parent map shows the resolution.
+	if head, err := getHead(nil, st, oid); err != nil || head != "7" {
+		t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
+	}
+
+	exp["7"] = []string{"3", "6"}
+	pmap = getParentMap(nil, st, oid, nil)
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
+			oid, pmap, exp)
+	}
+}
+
+// TestAncestorIterator checks that the iterator goes over the correct set
+// of ancestor nodes for an object given a starting node.  It should traverse
+// reconvergent DAG branches only visiting each ancestor once:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+//        |--> v4 ---|           |
+//        +--> v7 ---------------+
+// - Starting at v1 it should only cover v1.
+// - Starting at v3 it should only cover v1-v3.
+// - Starting at v6 it should only cover v1-v6.
+// - Starting at v9 it should cover all nodes (v1-v9).
+func TestAncestorIterator(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// Loop checking the iteration behavior for different starting nodes.
+	for _, start := range []int{1, 3, 6, 9} {
+		visitCount := make(map[string]int)
+		vstart := fmt.Sprintf("%d", start)
+		forEachAncestor(nil, st, oid, []string{vstart}, func(v string, nd *dagNode) error {
+			visitCount[v]++
+			return nil
+		})
+
+		// Check that all prior nodes are visited only once.
+		for i := 1; i < (start + 1); i++ {
+			vv := fmt.Sprintf("%d", i)
+			if visitCount[vv] != 1 {
+				t.Errorf("wrong visit count on object %s:%s starting from %s: %d instead of 1",
+					oid, vv, vstart, visitCount[vv])
+			}
+		}
+	}
+
+	// Make sure an error in the callback is returned.
+	cbErr := errors.New("callback error")
+	err := forEachAncestor(nil, st, oid, []string{"9"}, func(v string, nd *dagNode) error {
+		if v == "1" {
+			return cbErr
+		}
+		return nil
+	})
+	if err != cbErr {
+		t.Errorf("wrong error returned from callback: %v instead of %v", err, cbErr)
+	}
+}
+
+// TestPruning tests sync pruning of the DAG for an object with 3 concurrent
+// updates (i.e. 2 conflict resolution convergent points).  The pruning must
+// get rid of the DAG branches across the reconvergence points:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+//        |--> v4 ---|           |
+//        +--> v7 ---------------+
+// By pruning at v1, nothing is deleted.
+// Then by pruning at v2, only v1 is deleted.
+// Then by pruning at v6, v2-v5 are deleted leaving v6 and "v7 -> v8 -> v9".
+// Then by pruning at v8, v6-v7 are deleted leaving "v8 -> v9".
+// Then by pruning at v9, v8 is deleted leaving v9 as the head.
+// Then by pruning again at v9 nothing changes.
+func TestPruning(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
+
+	// Loop pruning at an invalid version (333) then at different valid versions.
+	testVersions := []string{"333", "1", "2", "6", "8", "9", "9"}
+	delCounts := []int{0, 0, 1, 4, 2, 1, 0}
+	which := "prune-snip-"
+	remain := 9
+
+	for i, version := range testVersions {
+		batches := newBatchPruning()
+		tx := st.NewTransaction()
+		del := 0
+		err := prune(nil, tx, oid, version, batches,
+			func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+				del++
+				return nil
+			})
+		tx.Commit()
+
+		if i == 0 && err == nil {
+			t.Errorf("pruning non-existent object %s:%s did not fail", oid, version)
+		} else if i > 0 && err != nil {
+			t.Errorf("pruning object %s:%s failed: %v", oid, version, err)
+		}
+
+		if del != delCounts[i] {
+			t.Errorf("pruning object %s:%s deleted %d log records instead of %d",
+				oid, version, del, delCounts[i])
+		}
+
+		which += "*"
+		remain -= del
+
+		if head, err := getHead(nil, st, oid); err != nil || head != "9" {
+			t.Errorf("object %s has wrong head: %s", oid, head)
+		}
+
+		tx = st.NewTransaction()
+		err = pruneDone(nil, tx, batches)
+		if err != nil {
+			t.Errorf("pruneDone() failed: %v", err)
+		}
+		tx.Commit()
+
+		// Remove pruned nodes from the expected parent map used to validate
+		// and set the parents of the pruned node to nil.
+		intVersion, err := strconv.ParseInt(version, 10, 32)
+		if err != nil {
+			t.Errorf("invalid version: %s", version)
+		}
+
+		if intVersion < 10 {
+			for j := int64(0); j < intVersion; j++ {
+				delete(exp, fmt.Sprintf("%d", j))
+			}
+			exp[version] = nil
+		}
+
+		pmap := getParentMap(nil, st, oid, nil)
+		if !reflect.DeepEqual(pmap, exp) {
+			t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+		}
+	}
+}
+
+// TestPruningCallbackError tests sync pruning of the DAG when the callback
+// function returns an error.  The pruning must try to delete as many nodes
+// and log records as possible and properly adjust the parent pointers of
+// the pruning node.  The object DAG is:
+// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
+//        |--> v4 ---|           |
+//        +--> v7 ---------------+
+// By pruning at v9 and having the callback function fail for v4, all other
+// nodes must be deleted and only v9 remains as the head.
+func TestPruningCallbackError(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+	s := svc.sync
+
+	oid := "1234"
+
+	if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	exp := map[string][]string{"9": nil}
+
+	// Prune at v9 with a callback function that fails for v4.
+	del, expDel := 0, 8
+	version := "9"
+
+	batches := newBatchPruning()
+	tx := st.NewTransaction()
+	err := prune(nil, tx, oid, version, batches,
+		func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+			del++
+			if lr == "logrec-03" {
+				return fmt.Errorf("refuse to delete %s", lr)
+			}
+			return nil
+		})
+	tx.Commit()
+
+	if err == nil {
+		t.Errorf("pruning object %s:%s did not fail", oid, version)
+	}
+	if del != expDel {
+		t.Errorf("pruning object %s:%s deleted %d log records instead of %d", oid, version, del, expDel)
+	}
+
+	tx = st.NewTransaction()
+	err = pruneDone(nil, tx, batches)
+	if err != nil {
+		t.Errorf("pruneDone() failed: %v", err)
+	}
+	tx.Commit()
+
+	if head, err := getHead(nil, st, oid); err != nil || head != version {
+		t.Errorf("object %s has wrong head: %s", oid, head)
+	}
+
+	pmap := getParentMap(nil, st, oid, nil)
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+	}
+}
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
new file mode 100644
index 0000000..50fbf14
--- /dev/null
+++ b/services/syncbase/vsync/replay_test.go
@@ -0,0 +1,183 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Used to ease the setup of sync test scenarios.
+// Parses a sync command file and returns a vector of commands to execute.
+// dagReplayCommands() executes the parsed commands at the DAG API level.
+
+import (
+	"bufio"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+
+	"v.io/v23/context"
+)
+
+const (
+	addLocal = iota
+	addRemote
+	linkLocal
+	linkRemote
+)
+
+type syncCommand struct {
+	cmd     int
+	oid     string
+	version string
+	parents []string
+	logrec  string
+	deleted bool
+}
+
+// parseSyncCommands parses a sync test file and returns its commands.
+func parseSyncCommands(file string) ([]syncCommand, error) {
+	cmds := []syncCommand{}
+
+	sf, err := os.Open("testdata/" + file)
+	if err != nil {
+		return nil, err
+	}
+	defer sf.Close()
+
+	scanner := bufio.NewScanner(sf)
+	lineno := 0
+	for scanner.Scan() {
+		lineno++
+		line := strings.TrimSpace(scanner.Text())
+		if line == "" || line[0] == '#' {
+			continue
+		}
+
+		args := strings.Split(line, "|")
+		nargs := len(args)
+
+		switch args[0] {
+		case "addl", "addr":
+			expNargs := 9
+			if nargs != expNargs {
+				return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
+					file, lineno, expNargs, nargs)
+			}
+			var parents []string
+			for i := 3; i <= 4; i++ {
+				if args[i] != "" {
+					parents = append(parents, args[i])
+				}
+			}
+
+			del, err := strconv.ParseBool(args[8])
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
+			}
+			cmd := syncCommand{
+				oid:     args[1],
+				version: args[2],
+				parents: parents,
+				logrec:  args[5],
+				deleted: del,
+			}
+			if args[0] == "addl" {
+				cmd.cmd = addLocal
+			} else {
+				cmd.cmd = addRemote
+			}
+			cmds = append(cmds, cmd)
+
+		case "linkl", "linkr":
+			expNargs := 6
+			if nargs != expNargs {
+				return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
+					file, lineno, expNargs, nargs)
+			}
+
+			if args[3] == "" {
+				return nil, fmt.Errorf("%s:%d: parent version not specified", file, lineno)
+			}
+			if args[4] != "" {
+				return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent: %s",
+					file, lineno, args[4])
+			}
+
+			cmd := syncCommand{
+				oid:     args[1],
+				version: args[2],
+				parents: []string{args[3]},
+				logrec:  args[5],
+			}
+			if args[0] == "linkl" {
+				cmd.cmd = linkLocal
+			} else {
+				cmd.cmd = linkRemote
+			}
+			cmds = append(cmds, cmd)
+
+		default:
+			return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
+		}
+	}
+
+	err = scanner.Err()
+	return cmds, err
+}
+
+// dagReplayCommands parses a sync test file and replays its commands, updating
+// the DAG structures associated with the sync service.
+func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (graftMap, error) {
+	cmds, err := parseSyncCommands(syncfile)
+	if err != nil {
+		return nil, err
+	}
+
+	st := s.sv.St()
+	graft := newGraft()
+
+	for _, cmd := range cmds {
+		tx := st.NewTransaction()
+
+		switch cmd.cmd {
+		case addLocal:
+			err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
+				cmd.deleted, cmd.parents, NoBatchId, nil)
+			if err != nil {
+				return nil, fmt.Errorf("cannot add local node %s:%s: %v",
+					cmd.oid, cmd.version, err)
+			}
+			tx.Commit()
+
+			tx = st.NewTransaction()
+			if err = moveHead(ctx, tx, cmd.oid, cmd.version); err != nil {
+				return nil, fmt.Errorf("cannot move head to %s:%s: %v",
+					cmd.oid, cmd.version, err)
+			}
+
+		case addRemote:
+			err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
+				cmd.deleted, cmd.parents, NoBatchId, graft)
+			if err != nil {
+				return nil, fmt.Errorf("cannot add remote node %s:%s: %v",
+					cmd.oid, cmd.version, err)
+			}
+
+		case linkLocal:
+			if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], nil); err != nil {
+				return nil, fmt.Errorf("cannot add local parent %s to node %s:%s: %v",
+					cmd.parents[0], cmd.oid, cmd.version, err)
+			}
+
+		case linkRemote:
+			if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], graft); err != nil {
+				return nil, fmt.Errorf("cannot add remote parent %s to node %s:%s: %v",
+					cmd.parents[0], cmd.oid, cmd.version, err)
+			}
+		}
+
+		tx.Commit()
+	}
+
+	return graft, nil
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 567d588..bded17b 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -47,6 +47,13 @@
 
 	// In-memory sync membership info aggregated across databases.
 	allMembers *memberView
+
+	// In-memory tracking of batches during their construction.
+	// The sync Initiator and Watcher build batches incrementally here
+	// and then persist them in DAG batch entries.  The mutex guards
+	// access to the batch set.
+	batchesLock sync.Mutex
+	batches     batchSet
 }
 
 // syncDatabase contains the metadata for syncing a database. This struct is
@@ -62,6 +69,11 @@
 	_   util.Layer                   = (*syncService)(nil)
 )
 
+// rand64 generates an unsigned 64-bit pseudo-random number.
+func rand64() uint64 {
+	return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
+}
+
 // New creates a new sync module.
 //
 // Concurrency: sync initializes two goroutines at startup: a "watcher" and an
@@ -70,7 +82,10 @@
 // periodically contacting peers to fetch changes from them. In addition, the
 // sync module responds to incoming RPCs from remote sync modules.
 func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
-	s := &syncService{sv: sv}
+	s := &syncService{
+		sv:      sv,
+		batches: make(batchSet),
+	}
 
 	data := &syncData{}
 	if err := util.GetObject(sv.St(), s.StKey(), data); err != nil {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 72e8533..9e6efd4 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -61,12 +61,16 @@
 
 // newSyncGroupVersion generates a random SyncGroup version ("etag").
 func newSyncGroupVersion() string {
-	return fmt.Sprintf("%x", rng.Int63())
+	return fmt.Sprintf("%x", rand64())
 }
 
 // newSyncGroupId generates a random SyncGroup ID.
 func newSyncGroupId() interfaces.GroupId {
-	return interfaces.GroupId(rng.Int63())
+	id := interfaces.NoGroupId
+	for id == interfaces.NoGroupId {
+		id = interfaces.GroupId(rand64())
+	}
+	return id
 }
 
 // verifySyncGroup verifies if a SyncGroup struct is well-formed.
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
new file mode 100644
index 0000000..46b3502
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -0,0 +1,6 @@
+# Create an object locally and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-00|0|1|false
+addl|1234|2|1||logrec-01|0|1|false
+addl|1234|3|2||logrec-02|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-init-01.sync b/services/syncbase/vsync/testdata/local-init-01.sync
new file mode 100644
index 0000000..86e24de
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-01.sync
@@ -0,0 +1,12 @@
+# Create an object DAG locally with branches and resolved conflicts.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-00|0|1|false
+addl|1234|2|1||logrec-01|0|1|false
+addl|1234|3|2||logrec-02|0|1|false
+addl|1234|4|2||logrec-03|0|1|false
+addl|1234|5|3|4|logrec-04|0|1|false
+addl|1234|6|5||logrec-05|0|1|false
+addl|1234|7|2||logrec-06|0|1|false
+addl|1234|8|6|7|logrec-07|0|1|false
+addl|1234|9|8||logrec-08|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-init-03.sync b/services/syncbase/vsync/testdata/local-init-03.sync
new file mode 100644
index 0000000..202a752
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-03.sync
@@ -0,0 +1,10 @@
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-01|0|1|false
+addl|1234|2|1||logrec-02|0|1|false
+addl|1234|3|1||logrec-03|0|1|false
+addl|1234|4|2||logrec-04|0|1|false
+addl|1234|5|2||logrec-05|0|1|true
+addl|1234|6|4|5|logrec-06|0|1|false
+addl|1234|7|3|5|logrec-07|0|1|false
+addl|1234|8|6|7|logrec-08|0|1|false
diff --git a/services/syncbase/vsync/testdata/local-resolve-00.sync b/services/syncbase/vsync/testdata/local-resolve-00.sync
new file mode 100644
index 0000000..2c87de5
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-resolve-00.sync
@@ -0,0 +1,4 @@
+# Create an object locally and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|7|3|6|logrec-06|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
new file mode 100644
index 0000000..1f9bb5b
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -0,0 +1,8 @@
+# Update an object remotely three times triggering one conflict after
+# it was created locally up to v3 (i.e. assume the remote sync received
+# it from the local sync at v2, then updated separately).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|2||VeyronPhone:10:1:0|0|1|false
+addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
new file mode 100644
index 0000000..9581f69
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -0,0 +1,10 @@
+# Update an object remotely three times triggering a conflict with
+# 2 graft points: v1 and v4.  This assumes that the remote sync got
+# v1, made its own conflicting v4 that it resolved into v5 (against v2)
+# then made a v6 change.  When the local sync gets back this info it
+# sees 2 graft points: v1-v4 and v2-v5.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronLaptop:10:1:0|0|1|false
+addr|1234|5|2|4|VeyronPhone:10:1:0|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:1|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
new file mode 100644
index 0000000..9795d53
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -0,0 +1,6 @@
+# Create an object remotely and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|1|||VeyronPhone:10:1:0|0|1|false
+addr|1234|2|1||VeyronPhone:10:1:1|0|1|false
+addr|1234|3|2||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
new file mode 100644
index 0000000..e2e2afa
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -0,0 +1,8 @@
+# Update an object remotely three times without triggering a conflict
+# after it was created locally up to v3 (i.e. assume the remote sync
+# received it from the local sync first, then updated it).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|3||VeyronPhone:10:1:0|0|1|false
+addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 4c6f852..bb34c27 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -9,19 +9,20 @@
 import (
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
+	"v.io/x/lib/vlog"
 )
 
 // forEachDatabaseStore iterates over all Databases in all Apps within the
-// service and invokes the provided callback function on each database. The
-// callback returns a "done" flag to make forEachDatabaseStore() stop the
-// iteration earlier; otherwise the function loops across all databases of all
-// apps.
+// service and invokes the callback function on each database. The callback
+// returns a "done" flag to make forEachDatabaseStore() stop the iteration
+// earlier; otherwise the function loops across all databases of all apps.
 func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
 	// Get the apps and iterate over them.
 	// TODO(rdaoud): use a "privileged call" parameter instead of nil (here and
 	// elsewhere).
 	appNames, err := s.sv.AppNames(ctx, nil)
 	if err != nil {
+		vlog.Errorf("forEachDatabaseStore: cannot get all app names: %v", err)
 		return
 	}
 
@@ -29,10 +30,12 @@
 		// For each app, get its databases and iterate over them.
 		app, err := s.sv.App(ctx, nil, a)
 		if err != nil {
+			vlog.Errorf("forEachDatabaseStore: cannot get app %s: %v", a, err)
 			continue
 		}
 		dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
 		if err != nil {
+			vlog.Errorf("forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
 			continue
 		}
 
@@ -40,6 +43,7 @@
 			// For each database, get its Store and invoke the callback.
 			db, err := app.NoSQLDatabase(ctx, nil, d)
 			if err != nil {
+				vlog.Errorf("forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
 				continue
 			}