Merge "veyron/runtimes/google/vsync: find deleted descendants in DAG"
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9e860d9..94fea55 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -428,83 +428,6 @@
return d.nodes.hasKey(key)
}
-// childOf returns true if the node is a child of the parent version.
-// It means that the parent version is found in the node's Parents array.
-func childOf(node *dagNode, parent storage.Version) bool {
- if node == nil || parent == storage.NoVersion {
- return false
- }
- for _, pver := range node.Parents {
- if pver == parent {
- return true
- }
- }
- return false
-}
-
-// hasParent returns true if the node (oid, version) exists in the DAG DB
-// and has (oid, parent) as a parent node. Either "version" or "parent"
-// could be NoVersion (zero). Thus the 4 cases:
-// 1- "version" and "parent" are _not_ NoVersion: return true if both nodes
-// exist and have a parent/child relationship.
-// 2- Only "parent" is NoVersion: return true if (oid, version) exists and
-// either it has no parents (root of the DAG) or at least one of its
-// parent nodes is a deleted node (i.e. has its "Deleted" flag set true).
-// 3- Only "version" is NoVersion: return true if (oid, parent) exists and
-// at least one of its children is a deleted node.
-// 4- Both "version" and "parent" are NoVersion: return false
-func (d *dag) hasParent(oid storage.ID, version, parent storage.Version) bool {
- if d.store == nil {
- return false
- }
-
- switch {
- case version != storage.NoVersion && parent != storage.NoVersion:
- if !d.hasNode(oid, parent) {
- return false
- }
- node, err := d.getNode(oid, version)
- if err != nil {
- return false
- }
- return childOf(node, parent)
-
- case version != storage.NoVersion && parent == storage.NoVersion:
- node, err := d.getNode(oid, version)
- if err != nil {
- return false
- }
- if node.Parents == nil {
- return true
- }
- for _, pver := range node.Parents {
- if pnode, err := d.getNode(oid, pver); err == nil && pnode.Deleted {
- return true
- }
- }
- return false
-
- case version == storage.NoVersion && parent != storage.NoVersion:
- if !d.hasNode(oid, parent) {
- return false
- }
- head, err := d.getHead(oid)
- if err != nil {
- return false
- }
- found := false
- d.ancestorIter(oid, []storage.Version{head}, func(oid storage.ID, v storage.Version, node *dagNode) error {
- if node.Deleted && childOf(node, parent) {
- found = true
- return errors.New("found it -- stop the iteration")
- }
- return nil
- })
- return found
- }
- return false
-}
-
// addParent adds to the DAG node (oid, version) linkage to this parent node.
// If the parent linkage is due to a local change (from conflict resolution
// by blessing an existing version), no need to update the grafting structure.
@@ -695,6 +618,70 @@
return nil
}
+// hasDeletedDescendant returns true if the node (oid, version) exists in the
+// DAG DB and one of its descendants is a deleted node (i.e. has its "Deleted"
+// flag set true). This means that at some object mutation after this version,
+// the object was deleted.
+func (d *dag) hasDeletedDescendant(oid storage.ID, version storage.Version) bool {
+ if d.store == nil {
+ return false
+ }
+ if !d.hasNode(oid, version) {
+ return false
+ }
+
+ // Do a breadth-first traversal from the object's head node back to
+ // the given version. Along the way, track whether a deleted node is
+ // traversed. Return true only if a traversal reaches the given version
+ // and had seen a deleted node along the way.
+
+ // nodeStep tracks a step along a traversal. It stores the node to visit
+ // when taking that step and a boolean tracking whether a deleted node
+ // was seen so far along that trajectory.
+ head, err := d.getHead(oid)
+ if err != nil {
+ return false
+ }
+
+ type nodeStep struct {
+ node storage.Version
+ deleted bool
+ }
+
+ visited := make(map[nodeStep]struct{})
+ queue := list.New()
+
+ step := nodeStep{node: head, deleted: false}
+ queue.PushBack(&step)
+ visited[step] = struct{}{}
+
+ for queue.Len() > 0 {
+ step := queue.Remove(queue.Front()).(*nodeStep)
+ if step.node == version {
+ if step.deleted {
+ return true
+ }
+ continue
+ }
+ node, err := d.getNode(oid, step.node)
+ if err != nil {
+ // Ignore it, the parent was previously pruned.
+ continue
+ }
+ nextDel := step.deleted || node.Deleted
+
+ for _, parent := range node.Parents {
+ nextStep := nodeStep{node: parent, deleted: nextDel}
+ if _, ok := visited[nextStep]; !ok {
+ queue.PushBack(&nextStep)
+ visited[nextStep] = struct{}{}
+ }
+ }
+ }
+
+ return false
+}
+
// prune trims the DAG of an object at a given version (node) by deleting
// all its ancestor nodes, making it the new root node. For each deleted
// node it calls the given callback function to delete its log record.
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0a50cdf..4fa945a 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -199,8 +199,8 @@
if dag.hasNode(oid, 4) {
t.Errorf("hasNode() found an object on a closed DAG")
}
- if dag.hasParent(oid, 3, 2) {
- t.Errorf("hasParent() found an parent/child relationship on a closed DAG")
+ if dag.hasDeletedDescendant(oid, 3) {
+ t.Errorf("hasDeletedDescendant() returned true on a closed DAG")
}
if pmap := dag.getParentMap(oid); len(pmap) != 0 {
t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
@@ -1920,8 +1920,8 @@
}
}
-// TestHasParent tests lookup of DAG parent/child relationship (i.e. hasParent()).
-func TestHasParent(t *testing.T) {
+// TestHasDeletedDescendant tests lookup of DAG deleted nodes descending from a given node.
+func TestHasDeletedDescendant(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1939,45 +1939,35 @@
t.Fatal(err)
}
- // The object mutations are: v1 -> v2 (deleted) -> v3 with v3 as head node.
+ // Delete node v3 to create a dangling parent link from v7 (increase code coverage).
+ if err = dag.delNode(oid, 3); err != nil {
+ t.Errorf("cannot delete node %d:3 in DAG file %s: %v", oid, dagfile, err)
+ }
- type hasParentTest struct {
- parent storage.Version
- child storage.Version
+ type hasDelDescTest struct {
+ node storage.Version
result bool
}
- tests := []hasParentTest{
- {storage.NoVersion, storage.NoVersion, false},
- {1, 2, true},
- {2, 3, true},
- {1, 999, false},
- {999, 1, false},
- {1, 3, false},
- {1, storage.NoVersion, true},
- {2, storage.NoVersion, false},
- {3, storage.NoVersion, false},
- {999, storage.NoVersion, false},
- {storage.NoVersion, 1, true},
- {storage.NoVersion, 2, false},
- {storage.NoVersion, 3, true},
- {storage.NoVersion, 999, false},
+ tests := []hasDelDescTest{
+ {storage.NoVersion, false},
+ {999, false},
+ {1, true},
+ {2, true},
+ {3, false},
+ {4, false},
+ {5, false},
+ {6, false},
+ {7, false},
+ {8, false},
}
for _, test := range tests {
- result := dag.hasParent(oid, test.child, test.parent)
+ result := dag.hasDeletedDescendant(oid, test.node)
if result != test.result {
- t.Errorf("hasParent() for parent/child (%d/%d) in DAG file %s: %v instead of %v",
- test.parent, test.child, dagfile, result, test.result)
+ t.Errorf("hasDeletedDescendant() for node %d in DAG file %s: %v instead of %v",
+ test.node, dagfile, result, test.result)
}
}
- // Increase coverage of internal helper function.
- if childOf(nil, 3) {
- t.Errorf("childOf() returned true on a nil node")
- }
- if childOf(&dagNode{}, storage.NoVersion) {
- t.Errorf("childOf() returned true on a NoVersion parent")
- }
-
dag.close()
}
diff --git a/runtimes/google/vsync/testdata/local-init-03.sync b/runtimes/google/vsync/testdata/local-init-03.sync
index 63f4fc6..3b3fba3 100644
--- a/runtimes/google/vsync/testdata/local-init-03.sync
+++ b/runtimes/google/vsync/testdata/local-init-03.sync
@@ -1,5 +1,10 @@
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
addl|12345|1|||logrec-01|false|false
-addl|12345|2|1||logrec-02|false|true
-addl|12345|3|2||logrec-03|false|false
+addl|12345|2|1||logrec-02|false|false
+addl|12345|3|1||logrec-03|false|false
+addl|12345|4|2||logrec-04|false|false
+addl|12345|5|2||logrec-05|false|true
+addl|12345|6|4|5|logrec-06|false|false
+addl|12345|7|3|5|logrec-07|false|false
+addl|12345|8|6|7|logrec-08|false|false