veyron/runtimes/google/vsync: find deleted descendants in DAG
Add the hasDeletedDescendant() DAG method and remove hasParent().
Sync needs it when tracking delete mutations coming from Watch.
It replaces the hasParent() method which is no longer needed.
Change-Id: I0d9372ff7cdb204a55c2237d5da8e475315776f9
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9e860d9..94fea55 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -428,83 +428,6 @@
return d.nodes.hasKey(key)
}
-// childOf returns true if the node is a child of the parent version.
-// It means that the parent version is found in the node's Parents array.
-func childOf(node *dagNode, parent storage.Version) bool {
- if node == nil || parent == storage.NoVersion {
- return false
- }
- for _, pver := range node.Parents {
- if pver == parent {
- return true
- }
- }
- return false
-}
-
-// hasParent returns true if the node (oid, version) exists in the DAG DB
-// and has (oid, parent) as a parent node. Either "version" or "parent"
-// could be NoVersion (zero). Thus the 4 cases:
-// 1- "version" and "parent" are _not_ NoVersion: return true if both nodes
-// exist and have a parent/child relationship.
-// 2- Only "parent" is NoVersion: return true if (oid, version) exists and
-// either it has no parents (root of the DAG) or at least one of its
-// parent nodes is a deleted node (i.e. has its "Deleted" flag set true).
-// 3- Only "version" is NoVersion: return true if (oid, parent) exists and
-// at least one of its children is a deleted node.
-// 4- Both "version" and "parent" are NoVersion: return false
-func (d *dag) hasParent(oid storage.ID, version, parent storage.Version) bool {
- if d.store == nil {
- return false
- }
-
- switch {
- case version != storage.NoVersion && parent != storage.NoVersion:
- if !d.hasNode(oid, parent) {
- return false
- }
- node, err := d.getNode(oid, version)
- if err != nil {
- return false
- }
- return childOf(node, parent)
-
- case version != storage.NoVersion && parent == storage.NoVersion:
- node, err := d.getNode(oid, version)
- if err != nil {
- return false
- }
- if node.Parents == nil {
- return true
- }
- for _, pver := range node.Parents {
- if pnode, err := d.getNode(oid, pver); err == nil && pnode.Deleted {
- return true
- }
- }
- return false
-
- case version == storage.NoVersion && parent != storage.NoVersion:
- if !d.hasNode(oid, parent) {
- return false
- }
- head, err := d.getHead(oid)
- if err != nil {
- return false
- }
- found := false
- d.ancestorIter(oid, []storage.Version{head}, func(oid storage.ID, v storage.Version, node *dagNode) error {
- if node.Deleted && childOf(node, parent) {
- found = true
- return errors.New("found it -- stop the iteration")
- }
- return nil
- })
- return found
- }
- return false
-}
-
// addParent adds to the DAG node (oid, version) linkage to this parent node.
// If the parent linkage is due to a local change (from conflict resolution
// by blessing an existing version), no need to update the grafting structure.
@@ -695,6 +618,70 @@
return nil
}
+// hasDeletedDescendant returns true if the node (oid, version) exists in the
+// DAG DB and one of its descendants is a deleted node (i.e. has its "Deleted"
+// flag set true). This means that at some object mutation after this version,
+// the object was deleted.
+func (d *dag) hasDeletedDescendant(oid storage.ID, version storage.Version) bool {
+ if d.store == nil {
+ return false
+ }
+ if !d.hasNode(oid, version) {
+ return false
+ }
+
+ // Do a breadth-first traversal from the object's head node back to
+ // the given version. Along the way, track whether a deleted node is
+ // traversed. Return true only if a traversal reaches the given version
+ // and had seen a deleted node along the way.
+
+ // nodeStep tracks a step along a traversal. It stores the node to visit
+ // when taking that step and a boolean tracking whether a deleted node
+ // was seen so far along that trajectory.
+ head, err := d.getHead(oid)
+ if err != nil {
+ return false
+ }
+
+ type nodeStep struct {
+ node storage.Version
+ deleted bool
+ }
+
+ visited := make(map[nodeStep]struct{})
+ queue := list.New()
+
+ step := nodeStep{node: head, deleted: false}
+ queue.PushBack(&step)
+ visited[step] = struct{}{}
+
+ for queue.Len() > 0 {
+ step := queue.Remove(queue.Front()).(*nodeStep)
+ if step.node == version {
+ if step.deleted {
+ return true
+ }
+ continue
+ }
+ node, err := d.getNode(oid, step.node)
+ if err != nil {
+ // Ignore it, the parent was previously pruned.
+ continue
+ }
+ nextDel := step.deleted || node.Deleted
+
+ for _, parent := range node.Parents {
+ nextStep := nodeStep{node: parent, deleted: nextDel}
+ if _, ok := visited[nextStep]; !ok {
+ queue.PushBack(&nextStep)
+ visited[nextStep] = struct{}{}
+ }
+ }
+ }
+
+ return false
+}
+
// prune trims the DAG of an object at a given version (node) by deleting
// all its ancestor nodes, making it the new root node. For each deleted
// node it calls the given callback function to delete its log record.
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0a50cdf..4fa945a 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -199,8 +199,8 @@
if dag.hasNode(oid, 4) {
t.Errorf("hasNode() found an object on a closed DAG")
}
- if dag.hasParent(oid, 3, 2) {
- t.Errorf("hasParent() found an parent/child relationship on a closed DAG")
+ if dag.hasDeletedDescendant(oid, 3) {
+ t.Errorf("hasDeletedDescendant() returned true on a closed DAG")
}
if pmap := dag.getParentMap(oid); len(pmap) != 0 {
t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
@@ -1920,8 +1920,8 @@
}
}
-// TestHasParent tests lookup of DAG parent/child relationship (i.e. hasParent()).
-func TestHasParent(t *testing.T) {
+// TestHasDeletedDescendant tests lookup of DAG deleted nodes descending from a given node.
+func TestHasDeletedDescendant(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1939,45 +1939,35 @@
t.Fatal(err)
}
- // The object mutations are: v1 -> v2 (deleted) -> v3 with v3 as head node.
+ // Delete node v3 to create a dangling parent link from v7 (increase code coverage).
+ if err = dag.delNode(oid, 3); err != nil {
+ t.Errorf("cannot delete node %d:3 in DAG file %s: %v", oid, dagfile, err)
+ }
- type hasParentTest struct {
- parent storage.Version
- child storage.Version
+ type hasDelDescTest struct {
+ node storage.Version
result bool
}
- tests := []hasParentTest{
- {storage.NoVersion, storage.NoVersion, false},
- {1, 2, true},
- {2, 3, true},
- {1, 999, false},
- {999, 1, false},
- {1, 3, false},
- {1, storage.NoVersion, true},
- {2, storage.NoVersion, false},
- {3, storage.NoVersion, false},
- {999, storage.NoVersion, false},
- {storage.NoVersion, 1, true},
- {storage.NoVersion, 2, false},
- {storage.NoVersion, 3, true},
- {storage.NoVersion, 999, false},
+ tests := []hasDelDescTest{
+ {storage.NoVersion, false},
+ {999, false},
+ {1, true},
+ {2, true},
+ {3, false},
+ {4, false},
+ {5, false},
+ {6, false},
+ {7, false},
+ {8, false},
}
for _, test := range tests {
- result := dag.hasParent(oid, test.child, test.parent)
+ result := dag.hasDeletedDescendant(oid, test.node)
if result != test.result {
- t.Errorf("hasParent() for parent/child (%d/%d) in DAG file %s: %v instead of %v",
- test.parent, test.child, dagfile, result, test.result)
+ t.Errorf("hasDeletedDescendant() for node %d in DAG file %s: %v instead of %v",
+ test.node, dagfile, result, test.result)
}
}
- // Increase coverage of internal helper function.
- if childOf(nil, 3) {
- t.Errorf("childOf() returned true on a nil node")
- }
- if childOf(&dagNode{}, storage.NoVersion) {
- t.Errorf("childOf() returned true on a NoVersion parent")
- }
-
dag.close()
}
diff --git a/runtimes/google/vsync/testdata/local-init-03.sync b/runtimes/google/vsync/testdata/local-init-03.sync
index 63f4fc6..3b3fba3 100644
--- a/runtimes/google/vsync/testdata/local-init-03.sync
+++ b/runtimes/google/vsync/testdata/local-init-03.sync
@@ -1,5 +1,10 @@
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
addl|12345|1|||logrec-01|false|false
-addl|12345|2|1||logrec-02|false|true
-addl|12345|3|2||logrec-03|false|false
+addl|12345|2|1||logrec-02|false|false
+addl|12345|3|1||logrec-03|false|false
+addl|12345|4|2||logrec-04|false|false
+addl|12345|5|2||logrec-05|false|true
+addl|12345|6|4|5|logrec-06|false|false
+addl|12345|7|3|5|logrec-07|false|false
+addl|12345|8|6|7|logrec-08|false|false