Allow adding more parents to a DAG node.

This is used to track conflict resolutions that select (bless) an
existing object version instead of creating a new one.  Capturing
this info allows Sync to avoid a type of false conflict.

Change-Id: Ice6b1d53c37f1212aebfcf7c5add7d8fecb88f9f
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9b47cc9..ac1a342 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -97,6 +97,7 @@
 	"fmt"
 
 	"veyron2/storage"
+	"veyron2/vlog"
 )
 
 type dag struct {
@@ -114,9 +115,9 @@
 }
 
 type graftInfo struct {
-	newNodes   map[storage.Version]bool   // set of newly added nodes during a sync
-	graftNodes map[storage.Version]uint64 // set of graft nodes and their level
-	newHead    storage.Version            // new (pending) head node
+	newNodes   map[storage.Version]struct{} // set of newly added nodes during a sync
+	graftNodes map[storage.Version]uint64   // set of graft nodes and their level
+	newHeads   map[storage.Version]struct{} // set of candidate new head nodes
 }
 
 // openDAG opens or creates a DAG for the given filename.
@@ -177,6 +178,46 @@
 	}
 }
 
+// getObjectGraft returns the graft structure for an object ID.
+// The graftInfo struct for an object is ephemeral (in-memory) and it
+// tracks the following information:
+// - newNodes:   the set of newly added nodes used to detect the type of
+//               edges between nodes (new-node to old-node or vice versa).
+// - newHeads:   the set of new candidate head nodes used to detect conflicts.
+// - graftNodes: the set of nodes used to find common ancestors between
+//               conflicting nodes.
+//
+// After the received Sync logs are applied, if there are two new heads in
+// the newHeads set, there is a conflict to be resolved for this object.
+// Otherwise if there is only one head, no conflict was triggered and the
+// new head becomes the current version for the object.
+//
+// In case of conflict, the graftNodes set is used to select the common
+// ancestor to pass to the conflict resolver.
+//
+// Note: if an object's graft structure does not exist only create it
+// if the "create" parameter is set to true.
+func (d *dag) getObjectGraft(oid storage.ID, create bool) *graftInfo {
+	graft := d.graft[oid]
+	if graft == nil && create {
+		graft = &graftInfo{
+			newNodes:   make(map[storage.Version]struct{}),
+			graftNodes: make(map[storage.Version]uint64),
+			newHeads:   make(map[storage.Version]struct{}),
+		}
+
+		// If a current head node exists for this object, initialize
+		// the set of candidate new heads to include it.
+		head, err := d.getHead(oid)
+		if err == nil {
+			graft.newHeads[head] = struct{}{}
+		}
+
+		d.graft[oid] = graft
+	}
+	return graft
+}
+
 // addNode adds a new node for an object in the DAG, linking it to its parent nodes.
 // It verifies that this node does not exist and that its parent nodes are valid.
 // It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -237,21 +278,10 @@
 	//    the origin root node), representing the most up-to-date common
 	//    knowledge between this device and the divergent changes.
 	//
-	// The graftInfo struct for an object is ephemeral (in-memory) and it
-	// tracks the set of newly added nodes, which in turn is used to
-	// detect an old-to-new edge which identifies the old node as a graft
-	// node.  The graft nodes and their DAG levels are tracked in a map
-	// which is used to (1) detect conflicts and (2) select the common
-	// ancestor to pass to the conflict resolver.
-	//
 	// Note: at the end of a sync operation between 2 devices, the whole
 	// graft info is cleared (Syncd calls clearGraft()) to prepare it for
 	// the new pairwise sync operation.
-	graft := d.graft[oid]
-	if graft == nil && remote {
-		graft = &graftInfo{newNodes: make(map[storage.Version]bool), graftNodes: make(map[storage.Version]uint64)}
-		d.graft[oid] = graft
-	}
+	graft := d.getObjectGraft(oid, remote)
 
 	// Verify the parents and determine the node level.
 	// Update the graft info in the DAG for this object.
@@ -267,16 +297,21 @@
 		if remote {
 			// If this parent is an old node, it's a graft point in the DAG
 			// and may be a common ancestor used during conflict resolution.
-			if !graft.newNodes[parent] {
+			if _, ok := graft.newNodes[parent]; !ok {
 				graft.graftNodes[parent] = node.Level
 			}
+
+			// The parent nodes can no longer be candidates for new head versions.
+			if _, ok := graft.newHeads[parent]; ok {
+				delete(graft.newHeads, parent)
+			}
 		}
 	}
 
 	if remote {
-		// This new node is so far the candidate for new head version.
-		graft.newNodes[version] = true
-		graft.newHead = version
+		// This new node is a candidate for new head version.
+		graft.newNodes[version] = struct{}{}
+		graft.newHeads[version] = struct{}{}
 	}
 
 	// Insert the new node in the kvdb.
@@ -293,6 +328,73 @@
 	return d.nodes.hasKey(key)
 }
 
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+// If the parent linkage is due to a local change (from conflict resolution
+// by blessing an existing version), no need to update the grafting structure.
+// Otherwise a remote change (from the Sync protocol) updates the graft.
+//
+// TODO(rdaoud): add a check to avoid the creation of cycles in the DAG.
+// TODO(rdaoud): recompute the levels of reachable child-nodes if the new
+// parent's level is greater or equal to the node's current level.
+func (d *dag) addParent(oid storage.ID, version, parent storage.Version, remote bool) error {
+	node, err := d.getNode(oid, version)
+	if err != nil {
+		return err
+	}
+
+	pnode, err := d.getNode(oid, parent)
+	if err != nil {
+		vlog.VI(1).Infof("addParent: object %v, node %d, parent %d: parent node not found", oid, version, parent)
+		return err
+	}
+
+	// Check if the parent is already linked to this node.
+	found := false
+	for i := range node.Parents {
+		if node.Parents[i] == parent {
+			found = true
+			break
+		}
+	}
+
+	// If the parent is not yet linked (local or remote) add it.
+	if !found {
+		node.Parents = append(node.Parents, parent)
+		err = d.setNode(oid, version, node)
+		if err != nil {
+			return err
+		}
+	}
+
+	// For local changes we are done, the grafting structure is not updated.
+	if !remote {
+		return nil
+	}
+
+	// If the node and its parent are new/old or old/new then add
+	// the parent as a graft point (a potential common ancestor).
+	graft := d.getObjectGraft(oid, true)
+
+	_, nodeNew := graft.newNodes[version]
+	_, parentNew := graft.newNodes[parent]
+	if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+		graft.graftNodes[parent] = pnode.Level
+	}
+
+	// The parent node can no longer be a candidate for a new head version.
+	// The addParent() function only removes candidates from newHeads that
+	// have become parents.  It does not add the child nodes to newHeads
+	// because they are not necessarily new-head candidates.  If they are
+	// new nodes, the addNode() function handles adding them to newHeads.
+	// For old nodes, only the current head could be a candidate and it is
+	// added to newHeads when the graft struct is initialized.
+	if _, ok := graft.newHeads[parent]; ok {
+		delete(graft.newHeads, parent)
+	}
+
+	return nil
+}
+
 // moveHead moves the object head node in the DAG.
 func (d *dag) moveHead(oid storage.ID, head storage.Version) error {
 	if d.store == nil {
@@ -311,9 +413,10 @@
 // new and old head nodes.
 // - Yes: return (true, newHead, oldHead, ancestor)
 // - No:  return (false, newHead, oldHead, NoVersion)
-// A conflict exists if the current (old) head node is not one of the graft
-// point of the graph fragment just added.  It means the newly added object
-// versions are not derived in part from this device's current knowledge.
+// A conflict exists when there are two new-head nodes.  It means the newly
+// added object versions are not derived in part from this device's current
+// knowledge.  If there is a single new-head, the object changes were applied
+// without triggering a conflict.
 func (d *dag) hasConflict(oid storage.ID) (isConflict bool, newHead, oldHead, ancestor storage.Version, err error) {
 	oldHead = storage.NoVersion
 	newHead = storage.NoVersion
@@ -329,19 +432,33 @@
 		return
 	}
 
-	newHead = graft.newHead
-	oldHead, err2 := d.getHead(oid)
-	if err2 != nil {
-		// This is a new object not previously known on this device, so
-		// it does not yet have a "head node" here.  This means all the
-		// DAG updates for it are new and to taken as-is (no conflict).
-		// That's why we ignore the error which is only indicating the
-		// lack of a prior head node.
+	numHeads := len(graft.newHeads)
+	if numHeads < 1 || numHeads > 2 {
+		err = fmt.Errorf("node %d has invalid number of new head candidates %d: %v", oid, numHeads, graft.newHeads)
 		return
 	}
 
-	if _, ok := graft.graftNodes[oldHead]; ok {
-		return // no conflict, current head is a graft point
+	// Fetch the current head for this object if it exists.  The error from getHead()
+	// is ignored because a newly received object is not yet known on this device and
+	// will not trigger a conflict.
+	oldHead, _ = d.getHead(oid)
+
+	// If there is only one new head node there is no conflict.
+	// The new head is that single one, even if it might also be the same old node.
+	if numHeads == 1 {
+		for k := range graft.newHeads {
+			newHead = k
+		}
+		return
+	}
+
+	// With two candidate head nodes, the new one is the node that is
+	// not the current (old) head node.
+	for k := range graft.newHeads {
+		if k != oldHead {
+			newHead = k
+			break
+		}
 	}
 
 	// There is a conflict: the best choice ancestor is the graft point
@@ -514,10 +631,10 @@
 
 // getHead retrieves the object head from the DAG.
 func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
-	if d.store == nil {
-		return 0, errors.New("invalid DAG")
-	}
 	var version storage.Version
+	if d.store == nil {
+		return version, errors.New("invalid DAG")
+	}
 	key := objHeadKey(oid)
 	err := d.heads.get(key, &version)
 	if err != nil {
@@ -537,7 +654,9 @@
 		iterVersions = append(iterVersions, head)
 	}
 	if graft := d.graft[oid]; graft != nil {
-		iterVersions = append(iterVersions, graft.newHead)
+		for k := range graft.newHeads {
+			iterVersions = append(iterVersions, k)
+		}
 	}
 
 	// Breadth-first traversal starting from the object head.
@@ -551,15 +670,15 @@
 
 // getGraftNodes is a testing and debug helper function that returns for
 // an object the graft information built and used during a sync operation.
-// The newHead version identifies the head node reported by the other device
-// during a sync operation.  The graftNodes map identifies the set of old
-// nodes where the new DAG fragments were attached and their depth level
-// in the DAG.
-func (d *dag) getGraftNodes(oid storage.ID) (storage.Version, map[storage.Version]uint64) {
+// The newHeads map identifies the candidate head nodes based on the data
+// reported by the other device during a sync operation.  The graftNodes map
+// identifies the set of old nodes where the new DAG fragments were attached
+// and their depth level in the DAG.
+func (d *dag) getGraftNodes(oid storage.ID) (map[storage.Version]struct{}, map[storage.Version]uint64) {
 	if d.store != nil {
 		if ginfo := d.graft[oid]; ginfo != nil {
-			return ginfo.newHead, ginfo.graftNodes
+			return ginfo.newHeads, ginfo.graftNodes
 		}
 	}
-	return 0, nil
+	return nil, nil
 }
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0b5d4c8..5f94836 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -141,6 +141,11 @@
 		t.Errorf("delNode() did not fail on a closed DAG: %v", err)
 	}
 
+	err = dag.addParent(oid, 4, 2, true)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("addParent() did not fail on a closed DAG: %v", err)
+	}
+
 	err = dag.setHead(oid, 4)
 	if err == nil || err.Error() != "invalid DAG" {
 		t.Errorf("setHead() did not fail on a closed DAG: %v", err)
@@ -166,8 +171,8 @@
 	if pmap := dag.getParentMap(oid); len(pmap) != 0 {
 		t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
 	}
-	if head, gmap := dag.getGraftNodes(oid); head != 0 || len(gmap) != 0 {
-		t.Errorf("getGraftNodes() found data on a closed DAG: head: %d, map: %v", head, gmap)
+	if hmap, gmap := dag.getGraftNodes(oid); hmap != nil || gmap != nil {
+		t.Errorf("getGraftNodes() found data on a closed DAG: head map: %v, graft map: %v", hmap, gmap)
 	}
 }
 
@@ -294,6 +299,65 @@
 	dag.close()
 }
 
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	version := storage.Version(7)
+	oid, err := strToObjID("789")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err = dag.addParent(oid, version, 1, true); err == nil {
+		t.Errorf("addParent() did not fail for an unknown object %d:%d in DAG file %s", oid, version, dagfile)
+	}
+
+	node := &dagNode{Level: 15, Logrec: "logrec-22"}
+	if err = dag.setNode(oid, version, node); err != nil {
+		t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
+	}
+
+	for _, parent := range []storage.Version{1, 2, 3} {
+		if err = dag.addParent(oid, version, parent, true); err == nil {
+			t.Errorf("addParent() did not reject invalid parent %d for object %d:%d in DAG file %s",
+				parent, oid, version, dagfile)
+		}
+
+		pnode := &dagNode{Level: 11, Logrec: fmt.Sprint("logrec-%d", parent)}
+		if err = dag.setNode(oid, parent, pnode); err != nil {
+			t.Fatalf("Cannot set parent object %d:%d (%v) in DAG file %s", oid, parent, pnode, dagfile)
+		}
+
+		remote := parent%2 == 0
+		for i := 0; i < 2; i++ {
+			if err = dag.addParent(oid, version, parent, remote); err != nil {
+				t.Errorf("addParent() failed on parent %d, remote %d (i=%d) for object %d:%d in DAG file %s: %v",
+					parent, remote, i, oid, version, dagfile, err)
+			}
+		}
+	}
+
+	node2, err := dag.getNode(oid, version)
+	if err != nil || node2 == nil {
+		t.Errorf("Cannot find stored object %d:%d in DAG file %s", oid, version, dagfile)
+	}
+
+	expParents := []storage.Version{1, 2, 3}
+	if !reflect.DeepEqual(node2.Parents, expParents) {
+		t.Errorf("invalid parents for object %d:%d in DAG file %s: %v instead of %v",
+			oid, version, dagfile, node2.Parents, expParents)
+	}
+
+	dag.close()
+}
+
 // TestSetHead tests setting and getting a DAG head node across DAG open/close/reopen.
 func TestSetHead(t *testing.T) {
 	dagfile := dagFilename()
@@ -357,9 +421,9 @@
 	d.clearGraft()
 
 	// There should be no grafting info, and hasConflict() should fail.
-	newHead, grafts := d.getGraftNodes(oid)
-	if newHead != 0 || grafts != nil {
-		return fmt.Errorf("Object %d: graft info not cleared: newhead (%d), grafts (%v)", oid, newHead, grafts)
+	newHeads, grafts := d.getGraftNodes(oid)
+	if newHeads != nil || grafts != nil {
+		return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
 	}
 
 	isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
@@ -472,9 +536,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 2 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{}
@@ -551,9 +617,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{2: 2}
@@ -640,9 +708,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{1: 1}
@@ -740,9 +810,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
@@ -1090,3 +1162,339 @@
 	}
 	dag.close()
 }
+
+// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict.  An object is created locally and
+// updated twice (v0 -> v1 -> v2).  Another device has learned about v0, created
+// (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by selecting
+// v1 over v3.  Now it sends that new info (v3 and the v1/v3 link) back to the
+// original (local) device.  Instead of a v2/v3 conflict, the device sees that
+// v1 was chosen over v3 and resolves it as a no-conflict case.
+func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0, 3}, 2: {1}, 3: {0}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 3: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 2 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedConflict tests sync of remote updates that contain linked
+// nodes (conflict resolution by selecting an existing version) on top of a local
+// initial state triggering a local conflict.  An object is created locally and
+// updated twice (v0 -> v1 -> v2).  Another device has along the way learned about v0,
+// created (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by
+// selecting v3 over v1.  Now it sends that new info (v3 and the v3/v1 link) back
+// to the original (local) device.  The device sees a v2/v3 conflict.
+func TestRemoteLinkedConflict(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-conf-link.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 1}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 3: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(isConflict && newHead == 3 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict, but moves the head node to a new one.
+// An object is created locally and updated twice (v0 -> v1 -> v2).  Another device
+// has along the way learned about v0, created (v0 -> v3), then learned about
+// (v0 -> v1 -> v2) and resolved that conflict by selecting v3 over v2.  Now it
+// sends that new info (v3 and the v3/v2 link) back to the original (local) device.
+// The device sees that the new head v3 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHead(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-01.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 2}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 2: 2}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 3 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
+// contain linked nodes (conflict resolution by selecting an existing version)
+// on top of a local initial state without conflict, but moves the head node
+// to a new one that overtook the linked node.
+// An object is created locally and updated twice (v0 -> v1 -> v2).  Another
+// device has along the way learned about v0, created (v0 -> v3), then learned
+// about (v0 -> v1 -> v2) and resolved that conflict by selecting v2 over v3.
+// Then it creates a new update v4 from v2 (v2 -> v4).  Now it sends that new
+// info (v3, the v2/v3 link, and v4) back to the original (local) device.
+// The device sees that the new head v4 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-02.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1, 3}, 3: {0}, 4: {2}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 2: 2, 3: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 4 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Then we can move the head and clear the grafting data.
+	if err = dag.moveHead(oid, newHead); err != nil {
+		t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Now new info comes from another device repeating the v2/v3 link.
+	// Verify that it is a NOP (no changes).
+	if err = dagReplayCommands(dag, "remote-noconf-link-repeat.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 4 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	newHeads, grafts = dag.getGraftNodes(oid)
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts = map[storage.Version]uint64{}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if !(!isConflict && newHead == 4 && oldHead == 4 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 41e82cd..02cd561 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -22,6 +22,8 @@
 	addLocal = iota
 	addRemote
 	setDevTable
+	linkLocal
+	linkRemote
 )
 
 type syncCommand struct {
@@ -140,6 +142,38 @@
 			cmd := syncCommand{cmd: setDevTable, devID: DeviceID(args[1]), genVec: genVec}
 			cmds = append(cmds, cmd)
 
+		case "linkl", "linkr":
+			expNargs := 6
+			if nargs != expNargs {
+				return nil, fmt.Errorf("%s:%d: need %d args instead of %d", file, lineno, expNargs, nargs)
+			}
+
+			version, err := strToVersion(args[2])
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid version: %s", file, lineno, args[2])
+			}
+			if args[3] == "" {
+				return nil, fmt.Errorf("%s:%d: parent (to-node) version not specified", file, lineno)
+			}
+			if args[4] != "" {
+				return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent (to-node): %s", file, lineno, args[4])
+			}
+			parent, err := strToVersion(args[3])
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid parent (to-node) version: %s", file, lineno, args[3])
+			}
+
+			cmd := syncCommand{version: version, parents: []storage.Version{parent}, logrec: args[5]}
+			if args[0] == "linkl" {
+				cmd.cmd = linkLocal
+			} else {
+				cmd.cmd = linkRemote
+			}
+			if cmd.objID, err = strToObjID(args[1]); err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid object ID: %s", file, lineno, args[1])
+			}
+			cmds = append(cmds, cmd)
+
 		default:
 			return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
 		}
@@ -171,6 +205,20 @@
 				return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
 			}
 			dag.flush()
+
+		case linkLocal:
+			if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], false); err != nil {
+				return fmt.Errorf("cannot add local parent %d to DAG node %d:%d: %v",
+					cmd.parents[0], cmd.objID, cmd.version, err)
+			}
+			dag.flush()
+
+		case linkRemote:
+			if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], true); err != nil {
+				return fmt.Errorf("cannot add remote parent %d to DAG node %d:%d: %v",
+					cmd.parents[0], cmd.objID, cmd.version, err)
+			}
+			dag.flush()
 		}
 	}
 	return nil