Allow adding more parents to a DAG node.
This is used to track conflict resolutions that select (bless) an
existing object version instead of creating a new one. Capturing
this info allows Sync to avoid a type of false conflict.
Change-Id: Ice6b1d53c37f1212aebfcf7c5add7d8fecb88f9f
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9b47cc9..ac1a342 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -97,6 +97,7 @@
"fmt"
"veyron2/storage"
+ "veyron2/vlog"
)
type dag struct {
@@ -114,9 +115,9 @@
}
type graftInfo struct {
- newNodes map[storage.Version]bool // set of newly added nodes during a sync
- graftNodes map[storage.Version]uint64 // set of graft nodes and their level
- newHead storage.Version // new (pending) head node
+ newNodes map[storage.Version]struct{} // set of newly added nodes during a sync
+ graftNodes map[storage.Version]uint64 // set of graft nodes and their level
+ newHeads map[storage.Version]struct{} // set of candidate new head nodes
}
// openDAG opens or creates a DAG for the given filename.
@@ -177,6 +178,46 @@
}
}
+// getObjectGraft returns the graft structure for an object ID.
+// The graftInfo struct for an object is ephemeral (in-memory) and it
+// tracks the following information:
+// - newNodes: the set of newly added nodes used to detect the type of
+// edges between nodes (new-node to old-node or vice versa).
+// - newHeads: the set of new candidate head nodes used to detect conflicts.
+// - graftNodes: the set of nodes used to find common ancestors between
+// conflicting nodes.
+//
+// After the received Sync logs are applied, if there are two new heads in
+// the newHeads set, there is a conflict to be resolved for this object.
+// Otherwise if there is only one head, no conflict was triggered and the
+// new head becomes the current version for the object.
+//
+// In case of conflict, the graftNodes set is used to select the common
+// ancestor to pass to the conflict resolver.
+//
+// Note: if an object's graft structure does not exist only create it
+// if the "create" parameter is set to true.
+func (d *dag) getObjectGraft(oid storage.ID, create bool) *graftInfo {
+ graft := d.graft[oid]
+ if graft == nil && create {
+ graft = &graftInfo{
+ newNodes: make(map[storage.Version]struct{}),
+ graftNodes: make(map[storage.Version]uint64),
+ newHeads: make(map[storage.Version]struct{}),
+ }
+
+ // If a current head node exists for this object, initialize
+ // the set of candidate new heads to include it.
+ head, err := d.getHead(oid)
+ if err == nil {
+ graft.newHeads[head] = struct{}{}
+ }
+
+ d.graft[oid] = graft
+ }
+ return graft
+}
+
// addNode adds a new node for an object in the DAG, linking it to its parent nodes.
// It verifies that this node does not exist and that its parent nodes are valid.
// It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -237,21 +278,10 @@
// the origin root node), representing the most up-to-date common
// knowledge between this device and the divergent changes.
//
- // The graftInfo struct for an object is ephemeral (in-memory) and it
- // tracks the set of newly added nodes, which in turn is used to
- // detect an old-to-new edge which identifies the old node as a graft
- // node. The graft nodes and their DAG levels are tracked in a map
- // which is used to (1) detect conflicts and (2) select the common
- // ancestor to pass to the conflict resolver.
- //
// Note: at the end of a sync operation between 2 devices, the whole
// graft info is cleared (Syncd calls clearGraft()) to prepare it for
// the new pairwise sync operation.
- graft := d.graft[oid]
- if graft == nil && remote {
- graft = &graftInfo{newNodes: make(map[storage.Version]bool), graftNodes: make(map[storage.Version]uint64)}
- d.graft[oid] = graft
- }
+ graft := d.getObjectGraft(oid, remote)
// Verify the parents and determine the node level.
// Update the graft info in the DAG for this object.
@@ -267,16 +297,21 @@
if remote {
// If this parent is an old node, it's a graft point in the DAG
// and may be a common ancestor used during conflict resolution.
- if !graft.newNodes[parent] {
+ if _, ok := graft.newNodes[parent]; !ok {
graft.graftNodes[parent] = node.Level
}
+
+ // The parent nodes can no longer be candidates for new head versions.
+ if _, ok := graft.newHeads[parent]; ok {
+ delete(graft.newHeads, parent)
+ }
}
}
if remote {
- // This new node is so far the candidate for new head version.
- graft.newNodes[version] = true
- graft.newHead = version
+ // This new node is a candidate for new head version.
+ graft.newNodes[version] = struct{}{}
+ graft.newHeads[version] = struct{}{}
}
// Insert the new node in the kvdb.
@@ -293,6 +328,73 @@
return d.nodes.hasKey(key)
}
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+// If the parent linkage is due to a local change (from conflict resolution
+// by blessing an existing version), no need to update the grafting structure.
+// Otherwise a remote change (from the Sync protocol) updates the graft.
+//
+// TODO(rdaoud): add a check to avoid the creation of cycles in the DAG.
+// TODO(rdaoud): recompute the levels of reachable child-nodes if the new
+// parent's level is greater or equal to the node's current level.
+func (d *dag) addParent(oid storage.ID, version, parent storage.Version, remote bool) error {
+ node, err := d.getNode(oid, version)
+ if err != nil {
+ return err
+ }
+
+ pnode, err := d.getNode(oid, parent)
+ if err != nil {
+ vlog.VI(1).Infof("addParent: object %v, node %d, parent %d: parent node not found", oid, version, parent)
+ return err
+ }
+
+ // Check if the parent is already linked to this node.
+ found := false
+ for i := range node.Parents {
+ if node.Parents[i] == parent {
+ found = true
+ break
+ }
+ }
+
+ // If the parent is not yet linked (local or remote) add it.
+ if !found {
+ node.Parents = append(node.Parents, parent)
+ err = d.setNode(oid, version, node)
+ if err != nil {
+ return err
+ }
+ }
+
+ // For local changes we are done, the grafting structure is not updated.
+ if !remote {
+ return nil
+ }
+
+ // If the node and its parent are new/old or old/new then add
+ // the parent as a graft point (a potential common ancestor).
+ graft := d.getObjectGraft(oid, true)
+
+ _, nodeNew := graft.newNodes[version]
+ _, parentNew := graft.newNodes[parent]
+ if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+ graft.graftNodes[parent] = pnode.Level
+ }
+
+ // The parent node can no longer be a candidate for a new head version.
+ // The addParent() function only removes candidates from newHeads that
+ // have become parents. It does not add the child nodes to newHeads
+ // because they are not necessarily new-head candidates. If they are
+ // new nodes, the addNode() function handles adding them to newHeads.
+ // For old nodes, only the current head could be a candidate and it is
+ // added to newHeads when the graft struct is initialized.
+ if _, ok := graft.newHeads[parent]; ok {
+ delete(graft.newHeads, parent)
+ }
+
+ return nil
+}
+
// moveHead moves the object head node in the DAG.
func (d *dag) moveHead(oid storage.ID, head storage.Version) error {
if d.store == nil {
@@ -311,9 +413,10 @@
// new and old head nodes.
// - Yes: return (true, newHead, oldHead, ancestor)
// - No: return (false, newHead, oldHead, NoVersion)
-// A conflict exists if the current (old) head node is not one of the graft
-// point of the graph fragment just added. It means the newly added object
-// versions are not derived in part from this device's current knowledge.
+// A conflict exists when there are two new-head nodes. It means the newly
+// added object versions are not derived in part from this device's current
+// knowledge. If there is a single new-head, the object changes were applied
+// without triggering a conflict.
func (d *dag) hasConflict(oid storage.ID) (isConflict bool, newHead, oldHead, ancestor storage.Version, err error) {
oldHead = storage.NoVersion
newHead = storage.NoVersion
@@ -329,19 +432,33 @@
return
}
- newHead = graft.newHead
- oldHead, err2 := d.getHead(oid)
- if err2 != nil {
- // This is a new object not previously known on this device, so
- // it does not yet have a "head node" here. This means all the
- // DAG updates for it are new and to taken as-is (no conflict).
- // That's why we ignore the error which is only indicating the
- // lack of a prior head node.
+ numHeads := len(graft.newHeads)
+ if numHeads < 1 || numHeads > 2 {
+ err = fmt.Errorf("node %d has invalid number of new head candidates %d: %v", oid, numHeads, graft.newHeads)
return
}
- if _, ok := graft.graftNodes[oldHead]; ok {
- return // no conflict, current head is a graft point
+ // Fetch the current head for this object if it exists. The error from getHead()
+ // is ignored because a newly received object is not yet known on this device and
+ // will not trigger a conflict.
+ oldHead, _ = d.getHead(oid)
+
+ // If there is only one new head node there is no conflict.
+ // The new head is that single one, even if it might also be the same old node.
+ if numHeads == 1 {
+ for k := range graft.newHeads {
+ newHead = k
+ }
+ return
+ }
+
+ // With two candidate head nodes, the new one is the node that is
+ // not the current (old) head node.
+ for k := range graft.newHeads {
+ if k != oldHead {
+ newHead = k
+ break
+ }
}
// There is a conflict: the best choice ancestor is the graft point
@@ -514,10 +631,10 @@
// getHead retrieves the object head from the DAG.
func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
- if d.store == nil {
- return 0, errors.New("invalid DAG")
- }
var version storage.Version
+ if d.store == nil {
+ return version, errors.New("invalid DAG")
+ }
key := objHeadKey(oid)
err := d.heads.get(key, &version)
if err != nil {
@@ -537,7 +654,9 @@
iterVersions = append(iterVersions, head)
}
if graft := d.graft[oid]; graft != nil {
- iterVersions = append(iterVersions, graft.newHead)
+ for k := range graft.newHeads {
+ iterVersions = append(iterVersions, k)
+ }
}
// Breadth-first traversal starting from the object head.
@@ -551,15 +670,15 @@
// getGraftNodes is a testing and debug helper function that returns for
// an object the graft information built and used during a sync operation.
-// The newHead version identifies the head node reported by the other device
-// during a sync operation. The graftNodes map identifies the set of old
-// nodes where the new DAG fragments were attached and their depth level
-// in the DAG.
-func (d *dag) getGraftNodes(oid storage.ID) (storage.Version, map[storage.Version]uint64) {
+// The newHeads map identifies the candidate head nodes based on the data
+// reported by the other device during a sync operation. The graftNodes map
+// identifies the set of old nodes where the new DAG fragments were attached
+// and their depth level in the DAG.
+func (d *dag) getGraftNodes(oid storage.ID) (map[storage.Version]struct{}, map[storage.Version]uint64) {
if d.store != nil {
if ginfo := d.graft[oid]; ginfo != nil {
- return ginfo.newHead, ginfo.graftNodes
+ return ginfo.newHeads, ginfo.graftNodes
}
}
- return 0, nil
+ return nil, nil
}
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0b5d4c8..5f94836 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -141,6 +141,11 @@
t.Errorf("delNode() did not fail on a closed DAG: %v", err)
}
+ err = dag.addParent(oid, 4, 2, true)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("addParent() did not fail on a closed DAG: %v", err)
+ }
+
err = dag.setHead(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("setHead() did not fail on a closed DAG: %v", err)
@@ -166,8 +171,8 @@
if pmap := dag.getParentMap(oid); len(pmap) != 0 {
t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
}
- if head, gmap := dag.getGraftNodes(oid); head != 0 || len(gmap) != 0 {
- t.Errorf("getGraftNodes() found data on a closed DAG: head: %d, map: %v", head, gmap)
+ if hmap, gmap := dag.getGraftNodes(oid); hmap != nil || gmap != nil {
+ t.Errorf("getGraftNodes() found data on a closed DAG: head map: %v, graft map: %v", hmap, gmap)
}
}
@@ -294,6 +299,65 @@
dag.close()
}
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ version := storage.Version(7)
+ oid, err := strToObjID("789")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err = dag.addParent(oid, version, 1, true); err == nil {
+ t.Errorf("addParent() did not fail for an unknown object %d:%d in DAG file %s", oid, version, dagfile)
+ }
+
+ node := &dagNode{Level: 15, Logrec: "logrec-22"}
+ if err = dag.setNode(oid, version, node); err != nil {
+ t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
+ }
+
+ for _, parent := range []storage.Version{1, 2, 3} {
+ if err = dag.addParent(oid, version, parent, true); err == nil {
+ t.Errorf("addParent() did not reject invalid parent %d for object %d:%d in DAG file %s",
+ parent, oid, version, dagfile)
+ }
+
+ pnode := &dagNode{Level: 11, Logrec: fmt.Sprint("logrec-%d", parent)}
+ if err = dag.setNode(oid, parent, pnode); err != nil {
+ t.Fatalf("Cannot set parent object %d:%d (%v) in DAG file %s", oid, parent, pnode, dagfile)
+ }
+
+ remote := parent%2 == 0
+ for i := 0; i < 2; i++ {
+ if err = dag.addParent(oid, version, parent, remote); err != nil {
+ t.Errorf("addParent() failed on parent %d, remote %d (i=%d) for object %d:%d in DAG file %s: %v",
+ parent, remote, i, oid, version, dagfile, err)
+ }
+ }
+ }
+
+ node2, err := dag.getNode(oid, version)
+ if err != nil || node2 == nil {
+ t.Errorf("Cannot find stored object %d:%d in DAG file %s", oid, version, dagfile)
+ }
+
+ expParents := []storage.Version{1, 2, 3}
+ if !reflect.DeepEqual(node2.Parents, expParents) {
+ t.Errorf("invalid parents for object %d:%d in DAG file %s: %v instead of %v",
+ oid, version, dagfile, node2.Parents, expParents)
+ }
+
+ dag.close()
+}
+
// TestSetHead tests setting and getting a DAG head node across DAG open/close/reopen.
func TestSetHead(t *testing.T) {
dagfile := dagFilename()
@@ -357,9 +421,9 @@
d.clearGraft()
// There should be no grafting info, and hasConflict() should fail.
- newHead, grafts := d.getGraftNodes(oid)
- if newHead != 0 || grafts != nil {
- return fmt.Errorf("Object %d: graft info not cleared: newhead (%d), grafts (%v)", oid, newHead, grafts)
+ newHeads, grafts := d.getGraftNodes(oid)
+ if newHeads != nil || grafts != nil {
+ return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
}
isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
@@ -472,9 +536,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 2 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{}
@@ -551,9 +617,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{2: 2}
@@ -640,9 +708,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{1: 1}
@@ -740,9 +810,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
@@ -1090,3 +1162,339 @@
}
dag.close()
}
+
+// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict. An object is created locally and
+// updated twice (v0 -> v1 -> v2). Another device has learned about v0, created
+// (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by selecting
+// v1 over v3. Now it sends that new info (v3 and the v1/v3 link) back to the
+// original (local) device. Instead of a v2/v3 conflict, the device sees that
+// v1 was chosen over v3 and resolves it as a no-conflict case.
+func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0, 3}, 2: {1}, 3: {0}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 3: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 2 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedConflict tests sync of remote updates that contain linked
+// nodes (conflict resolution by selecting an existing version) on top of a local
+// initial state triggering a local conflict. An object is created locally and
+// updated twice (v0 -> v1 -> v2). Another device has along the way learned about v0,
+// created (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by
+// selecting v3 over v1. Now it sends that new info (v3 and the v3/v1 link) back
+// to the original (local) device. The device sees a v2/v3 conflict.
+func TestRemoteLinkedConflict(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-conf-link.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 1}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 3: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(isConflict && newHead == 3 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict, but moves the head node to a new one.
+// An object is created locally and updated twice (v0 -> v1 -> v2). Another device
+// has along the way learned about v0, created (v0 -> v3), then learned about
+// (v0 -> v1 -> v2) and resolved that conflict by selecting v3 over v2. Now it
+// sends that new info (v3 and the v3/v2 link) back to the original (local) device.
+// The device sees that the new head v3 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHead(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-01.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 2}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 2: 2}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 3 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
+// contain linked nodes (conflict resolution by selecting an existing version)
+// on top of a local initial state without conflict, but moves the head node
+// to a new one that overtook the linked node.
+// An object is created locally and updated twice (v0 -> v1 -> v2). Another
+// device has along the way learned about v0, created (v0 -> v3), then learned
+// about (v0 -> v1 -> v2) and resolved that conflict by selecting v2 over v3.
+// Then it creates a new update v4 from v2 (v2 -> v4). Now it sends that new
+// info (v3, the v2/v3 link, and v4) back to the original (local) device.
+// The device sees that the new head v4 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-02.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1, 3}, 3: {0}, 4: {2}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 2: 2, 3: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 4 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Then we can move the head and clear the grafting data.
+ if err = dag.moveHead(oid, newHead); err != nil {
+ t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Now new info comes from another device repeating the v2/v3 link.
+ // Verify that it is a NOP (no changes).
+ if err = dagReplayCommands(dag, "remote-noconf-link-repeat.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 4 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ newHeads, grafts = dag.getGraftNodes(oid)
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts = map[storage.Version]uint64{}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if !(!isConflict && newHead == 4 && oldHead == 4 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 41e82cd..02cd561 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -22,6 +22,8 @@
addLocal = iota
addRemote
setDevTable
+ linkLocal
+ linkRemote
)
type syncCommand struct {
@@ -140,6 +142,38 @@
cmd := syncCommand{cmd: setDevTable, devID: DeviceID(args[1]), genVec: genVec}
cmds = append(cmds, cmd)
+ case "linkl", "linkr":
+ expNargs := 6
+ if nargs != expNargs {
+ return nil, fmt.Errorf("%s:%d: need %d args instead of %d", file, lineno, expNargs, nargs)
+ }
+
+ version, err := strToVersion(args[2])
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid version: %s", file, lineno, args[2])
+ }
+ if args[3] == "" {
+ return nil, fmt.Errorf("%s:%d: parent (to-node) version not specified", file, lineno)
+ }
+ if args[4] != "" {
+ return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent (to-node): %s", file, lineno, args[4])
+ }
+ parent, err := strToVersion(args[3])
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid parent (to-node) version: %s", file, lineno, args[3])
+ }
+
+ cmd := syncCommand{version: version, parents: []storage.Version{parent}, logrec: args[5]}
+ if args[0] == "linkl" {
+ cmd.cmd = linkLocal
+ } else {
+ cmd.cmd = linkRemote
+ }
+ if cmd.objID, err = strToObjID(args[1]); err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid object ID: %s", file, lineno, args[1])
+ }
+ cmds = append(cmds, cmd)
+
default:
return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
}
@@ -171,6 +205,20 @@
return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
}
dag.flush()
+
+ case linkLocal:
+ if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], false); err != nil {
+ return fmt.Errorf("cannot add local parent %d to DAG node %d:%d: %v",
+ cmd.parents[0], cmd.objID, cmd.version, err)
+ }
+ dag.flush()
+
+ case linkRemote:
+ if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], true); err != nil {
+ return fmt.Errorf("cannot add remote parent %d to DAG node %d:%d: %v",
+ cmd.parents[0], cmd.objID, cmd.version, err)
+ }
+ dag.flush()
}
}
return nil