Merge "veyron/runtimes/google/vsync: find deleted descendants in DAG"
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9e860d9..94fea55 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -428,83 +428,6 @@
 	return d.nodes.hasKey(key)
 }
 
-// childOf returns true if the node is a child of the parent version.
-// It means that the parent version is found in the node's Parents array.
-func childOf(node *dagNode, parent storage.Version) bool {
-	if node == nil || parent == storage.NoVersion {
-		return false
-	}
-	for _, pver := range node.Parents {
-		if pver == parent {
-			return true
-		}
-	}
-	return false
-}
-
-// hasParent returns true if the node (oid, version) exists in the DAG DB
-// and has (oid, parent) as a parent node. Either "version" or "parent"
-// could be NoVersion (zero).  Thus the 4 cases:
-// 1- "version" and "parent" are _not_ NoVersion: return true if both nodes
-//    exist and have a parent/child relationship.
-// 2- Only "parent" is NoVersion: return true if (oid, version) exists and
-//    either it has no parents (root of the DAG) or at least one of its
-//    parent nodes is a deleted node (i.e. has its "Deleted" flag set true).
-// 3- Only "version" is NoVersion: return true if (oid, parent) exists and
-//    at least one of its children is a deleted node.
-// 4- Both "version" and "parent" are NoVersion: return false
-func (d *dag) hasParent(oid storage.ID, version, parent storage.Version) bool {
-	if d.store == nil {
-		return false
-	}
-
-	switch {
-	case version != storage.NoVersion && parent != storage.NoVersion:
-		if !d.hasNode(oid, parent) {
-			return false
-		}
-		node, err := d.getNode(oid, version)
-		if err != nil {
-			return false
-		}
-		return childOf(node, parent)
-
-	case version != storage.NoVersion && parent == storage.NoVersion:
-		node, err := d.getNode(oid, version)
-		if err != nil {
-			return false
-		}
-		if node.Parents == nil {
-			return true
-		}
-		for _, pver := range node.Parents {
-			if pnode, err := d.getNode(oid, pver); err == nil && pnode.Deleted {
-				return true
-			}
-		}
-		return false
-
-	case version == storage.NoVersion && parent != storage.NoVersion:
-		if !d.hasNode(oid, parent) {
-			return false
-		}
-		head, err := d.getHead(oid)
-		if err != nil {
-			return false
-		}
-		found := false
-		d.ancestorIter(oid, []storage.Version{head}, func(oid storage.ID, v storage.Version, node *dagNode) error {
-			if node.Deleted && childOf(node, parent) {
-				found = true
-				return errors.New("found it -- stop the iteration")
-			}
-			return nil
-		})
-		return found
-	}
-	return false
-}
-
 // addParent adds to the DAG node (oid, version) linkage to this parent node.
 // If the parent linkage is due to a local change (from conflict resolution
 // by blessing an existing version), no need to update the grafting structure.
@@ -695,6 +618,70 @@
 	return nil
 }
 
+// hasDeletedDescendant returns true if the node (oid, version) exists in the
+// DAG DB and one of its descendants is a deleted node (i.e. has its "Deleted"
+// flag set true).  This means that at some object mutation after this version,
+// the object was deleted.
+func (d *dag) hasDeletedDescendant(oid storage.ID, version storage.Version) bool {
+	if d.store == nil {
+		return false
+	}
+	if !d.hasNode(oid, version) {
+		return false
+	}
+
+	// Do a breadth-first traversal from the object's head node back to
+	// the given version.  Along the way, track whether a deleted node is
+	// traversed.  Return true only if a traversal reaches the given version
+	// and had seen a deleted node along the way.
+
+	// nodeStep tracks a step along a traversal.  It stores the node to visit
+	// when taking that step and a boolean tracking whether a deleted node
+	// was seen so far along that trajectory.
+	head, err := d.getHead(oid)
+	if err != nil {
+		return false
+	}
+
+	type nodeStep struct {
+		node    storage.Version
+		deleted bool
+	}
+
+	visited := make(map[nodeStep]struct{})
+	queue := list.New()
+
+	step := nodeStep{node: head, deleted: false}
+	queue.PushBack(&step)
+	visited[step] = struct{}{}
+
+	for queue.Len() > 0 {
+		step := queue.Remove(queue.Front()).(*nodeStep)
+		if step.node == version {
+			if step.deleted {
+				return true
+			}
+			continue
+		}
+		node, err := d.getNode(oid, step.node)
+		if err != nil {
+			// Ignore it, the parent was previously pruned.
+			continue
+		}
+		nextDel := step.deleted || node.Deleted
+
+		for _, parent := range node.Parents {
+			nextStep := nodeStep{node: parent, deleted: nextDel}
+			if _, ok := visited[nextStep]; !ok {
+				queue.PushBack(&nextStep)
+				visited[nextStep] = struct{}{}
+			}
+		}
+	}
+
+	return false
+}
+
 // prune trims the DAG of an object at a given version (node) by deleting
 // all its ancestor nodes, making it the new root node.  For each deleted
 // node it calls the given callback function to delete its log record.
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0a50cdf..4fa945a 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -199,8 +199,8 @@
 	if dag.hasNode(oid, 4) {
 		t.Errorf("hasNode() found an object on a closed DAG")
 	}
-	if dag.hasParent(oid, 3, 2) {
-		t.Errorf("hasParent() found an parent/child relationship on a closed DAG")
+	if dag.hasDeletedDescendant(oid, 3) {
+		t.Errorf("hasDeletedDescendant() returned true on a closed DAG")
 	}
 	if pmap := dag.getParentMap(oid); len(pmap) != 0 {
 		t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
@@ -1920,8 +1920,8 @@
 	}
 }
 
-// TestHasParent tests lookup of DAG parent/child relationship (i.e. hasParent()).
-func TestHasParent(t *testing.T) {
+// TestHasDeletedDescendant tests lookup of DAG deleted nodes descending from a given node.
+func TestHasDeletedDescendant(t *testing.T) {
 	dagfile := dagFilename()
 	defer os.Remove(dagfile)
 
@@ -1939,45 +1939,35 @@
 		t.Fatal(err)
 	}
 
-	// The object mutations are: v1 -> v2 (deleted) -> v3 with v3 as head node.
+	// Delete node v3 to create a dangling parent link from v7 (increase code coverage).
+	if err = dag.delNode(oid, 3); err != nil {
+		t.Errorf("cannot delete node %d:3 in DAG file %s: %v", oid, dagfile, err)
+	}
 
-	type hasParentTest struct {
-		parent storage.Version
-		child  storage.Version
+	type hasDelDescTest struct {
+		node   storage.Version
 		result bool
 	}
-	tests := []hasParentTest{
-		{storage.NoVersion, storage.NoVersion, false},
-		{1, 2, true},
-		{2, 3, true},
-		{1, 999, false},
-		{999, 1, false},
-		{1, 3, false},
-		{1, storage.NoVersion, true},
-		{2, storage.NoVersion, false},
-		{3, storage.NoVersion, false},
-		{999, storage.NoVersion, false},
-		{storage.NoVersion, 1, true},
-		{storage.NoVersion, 2, false},
-		{storage.NoVersion, 3, true},
-		{storage.NoVersion, 999, false},
+	tests := []hasDelDescTest{
+		{storage.NoVersion, false},
+		{999, false},
+		{1, true},
+		{2, true},
+		{3, false},
+		{4, false},
+		{5, false},
+		{6, false},
+		{7, false},
+		{8, false},
 	}
 
 	for _, test := range tests {
-		result := dag.hasParent(oid, test.child, test.parent)
+		result := dag.hasDeletedDescendant(oid, test.node)
 		if result != test.result {
-			t.Errorf("hasParent() for parent/child (%d/%d) in DAG file %s: %v instead of %v",
-				test.parent, test.child, dagfile, result, test.result)
+			t.Errorf("hasDeletedDescendant() for node %d in DAG file %s: %v instead of %v",
+				test.node, dagfile, result, test.result)
 		}
 	}
 
-	// Increase coverage of internal helper function.
-	if childOf(nil, 3) {
-		t.Errorf("childOf() returned true on a nil node")
-	}
-	if childOf(&dagNode{}, storage.NoVersion) {
-		t.Errorf("childOf() returned true on a NoVersion parent")
-	}
-
 	dag.close()
 }
diff --git a/runtimes/google/vsync/testdata/local-init-03.sync b/runtimes/google/vsync/testdata/local-init-03.sync
index 63f4fc6..3b3fba3 100644
--- a/runtimes/google/vsync/testdata/local-init-03.sync
+++ b/runtimes/google/vsync/testdata/local-init-03.sync
@@ -1,5 +1,10 @@
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
 
 addl|12345|1|||logrec-01|false|false
-addl|12345|2|1||logrec-02|false|true
-addl|12345|3|2||logrec-03|false|false
+addl|12345|2|1||logrec-02|false|false
+addl|12345|3|1||logrec-03|false|false
+addl|12345|4|2||logrec-04|false|false
+addl|12345|5|2||logrec-05|false|true
+addl|12345|6|4|5|logrec-06|false|false
+addl|12345|7|3|5|logrec-07|false|false
+addl|12345|8|6|7|logrec-08|false|false