Track the transaction scope in the Sync DAG.

When new DAG nodes are added as part of a Store transaction, track their
grouping.  This will be used to determine the scope of the conflict
resolution when one object inside a transaction has a conflict.

Change-Id: Id7025200b3e2ca3ec66be8405e756170f3f9d35c
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index ac1a342..7f97c99 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -68,13 +68,16 @@
 // in the Veyron Store) have moved past some version for that object, the
 // DAG for that object can be pruned, deleting all prior (ancestor) nodes.
 //
-// The DAG DB contains two tables persisted to disk (nodes, heads) and
-// an in-memory (ephemeral) graft map:
+// The DAG DB contains three tables persisted to disk (nodes, heads, trans)
+// and three in-memory (ephemeral) maps (graft, txSet, txGC):
 //   * nodes: one entry per (object, version) with references to the
-//            parent node(s) it is derived from, and a reference to the
-//            log record identifying that change
+//            parent node(s) it is derived from, a reference to the
+//            log record identifying that change, and a reference to
+//            its transaction set (or NoTxID if none)
 //   * heads: one entry per object pointing to its most recent version
-//            in the nodes collection
+//            in the nodes table
+//   * trans: one entry per transaction ID containing the set of objects
+//            that forms the transaction and their versions.
 //   * graft: during a sync operation, it tracks the nodes where the new
 //            DAG fragments are attached to the existing graph for each
 //            mutated object.  This map is used to determine whether a
@@ -82,6 +85,14 @@
 //            recent common ancestor from these graft points to use when
 //            resolving the conflict.  At the end of a sync operation the
 //            graft map is destroyed.
+//   * txSet: used to incrementally construct the transaction sets that
+//            are stored in the "trans" table once all the nodes of a
+//            transaction have been added.  Multiple transaction sets
+//            can be constructed to support the concurrency between the
+//            Sync Initiator and Watcher threads.
+//   * txGC:  used to track the transactions impacted by objects being
+//            pruned.  At the end of the pruning operation the records
+//            of the "trans" table are updated from the txGC information.
 //
 // Note: for regular (no-conflict) changes, a node has a reference to
 // one parent from which it was derived.  When a conflict is resolved,
@@ -95,23 +106,37 @@
 	"container/list"
 	"errors"
 	"fmt"
+	"math/rand"
+	"time"
 
 	"veyron2/storage"
 	"veyron2/vlog"
 )
 
+const (
+	NoTxID = TxID(0)
+)
+
+type TxID uint64
+type dagTxMap map[storage.ID]storage.Version
+
 type dag struct {
 	fname string                    // file pathname
 	store *kvdb                     // underlying K/V store
 	heads *kvtable                  // pointer to "heads" table in the store
 	nodes *kvtable                  // pointer to "nodes" table in the store
+	trans *kvtable                  // pointer to "trans" table in the store
 	graft map[storage.ID]*graftInfo // in-memory state of DAG object grafting
+	txSet map[TxID]dagTxMap         // in-memory construction of transaction sets
+	txGC  map[TxID]dagTxMap         // in-memory tracking of transaction sets to cleanup
+	txGen *rand.Rand                // transaction ID random number generator
 }
 
 type dagNode struct {
 	Level   uint64            // node distance from root
 	Parents []storage.Version // references to parent versions
 	Logrec  string            // reference to log record change
+	TxID    TxID              // ID of a transaction set
 }
 
 type graftInfo struct {
@@ -123,8 +148,8 @@
 // openDAG opens or creates a DAG for the given filename.
 func openDAG(filename string) (*dag, error) {
 	// Open the file and create it if it does not exist.
-	// Also initialize the store and its two collections.
-	db, tbls, err := kvdbOpen(filename, []string{"heads", "nodes"})
+	// Also initialize the store and its tables.
+	db, tbls, err := kvdbOpen(filename, []string{"heads", "nodes", "trans"})
 	if err != nil {
 		return nil, err
 	}
@@ -134,9 +159,13 @@
 		store: db,
 		heads: tbls[0],
 		nodes: tbls[1],
+		trans: tbls[2],
+		txGen: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
+		txSet: make(map[TxID]dagTxMap),
 	}
 
 	d.clearGraft()
+	d.clearTxGC()
 
 	return d, nil
 }
@@ -161,13 +190,14 @@
 	if d.store == nil {
 		return errors.New("invalid DAG")
 	}
-	db, tbls, err := d.store.compact(d.fname, []string{"heads", "nodes"})
+	db, tbls, err := d.store.compact(d.fname, []string{"heads", "nodes", "trans"})
 	if err != nil {
 		return err
 	}
 	d.store = db
 	d.heads = tbls[0]
 	d.nodes = tbls[1]
+	d.trans = tbls[2]
 	return nil
 }
 
@@ -178,6 +208,13 @@
 	}
 }
 
+// clearTxGC clears the temporary in-memory transaction garbage collection maps.
+func (d *dag) clearTxGC() {
+	if d.store != nil {
+		d.txGC = make(map[TxID]dagTxMap)
+	}
+}
+
 // getObjectGraft returns the graft structure for an object ID.
 // The graftInfo struct for an object is ephemeral (in-memory) and it
 // tracks the following information:
@@ -218,6 +255,52 @@
 	return graft
 }
 
+// addNodeTxStart generates a transaction ID and returns it to the caller.
+// The transaction ID is purely internal to the DAG.  It is used to track
+// DAG nodes that are part of the same transaction.
+func (d *dag) addNodeTxStart() TxID {
+	if d.store == nil {
+		return NoTxID
+	}
+
+	// Generate a random 64-bit transaction ID different than NoTxID.
+	// Also make sure the ID is not already being used.
+	tid := NoTxID
+	for (tid == NoTxID) || (d.txSet[tid] != nil) {
+		// Generate an unsigned 64-bit random value by combining a
+		// random 63-bit value and a random 1-bit value.
+		tid = (TxID(d.txGen.Int63()) << 1) | TxID(d.txGen.Int63n(2))
+	}
+
+	// Initialize the in-memory object/version map for that transaction ID.
+	d.txSet[tid] = make(dagTxMap)
+
+	return tid
+}
+
+// addNodeTxEnd marks the end of a given transaction.
+// The DAG completes its internal tracking of the transaction information.
+func (d *dag) addNodeTxEnd(tid TxID) error {
+	if d.store == nil {
+		return errors.New("invalid DAG")
+	}
+	if tid == NoTxID {
+		return fmt.Errorf("invalid TxID: %v", tid)
+	}
+
+	txMap, ok := d.txSet[tid]
+	if !ok {
+		return fmt.Errorf("unknown transaction ID: %v", tid)
+	}
+
+	if err := d.setTransaction(tid, txMap); err != nil {
+		return err
+	}
+
+	delete(d.txSet, tid)
+	return nil
+}
+
 // addNode adds a new node for an object in the DAG, linking it to its parent nodes.
 // It verifies that this node does not exist and that its parent nodes are valid.
 // It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -228,10 +311,15 @@
 // - If a parent node is not new, mark it as a DAG graft point.
 // - Mark this version as a new node.
 // - Update the new head node pointer of the grafted DAG.
-func (d *dag) addNode(oid storage.ID, version storage.Version, remote bool, parents []storage.Version, logrec string) error {
+//
+// If the transaction ID is set to NoTxID, this node is not part of a transaction.
+// Otherwise, track its membership in the given transaction ID.
+func (d *dag) addNode(oid storage.ID, version storage.Version, remote bool,
+	parents []storage.Version, logrec string, tid TxID) error {
 	if d.store == nil {
 		return errors.New("invalid DAG")
 	}
+
 	if parents != nil {
 		if len(parents) > 2 {
 			return fmt.Errorf("cannot have more than 2 parents, not %d", len(parents))
@@ -314,12 +402,22 @@
 		graft.newHeads[version] = struct{}{}
 	}
 
+	// If this node is part of a transaction, add it to that set.
+	if tid != NoTxID {
+		txMap, ok := d.txSet[tid]
+		if !ok {
+			return fmt.Errorf("unknown transaction ID: %v", tid)
+		}
+
+		txMap[oid] = version
+	}
+
 	// Insert the new node in the kvdb.
-	node := &dagNode{Level: level, Parents: parents, Logrec: logrec}
+	node := &dagNode{Level: level, Parents: parents, Logrec: logrec, TxID: tid}
 	return d.setNode(oid, version, node)
 }
 
-// hasNode returns true if the node (oid, version) exists in the DAG.
+// hasNode returns true if the node (oid, version) exists in the DAG DB.
 func (d *dag) hasNode(oid storage.ID, version storage.Version) bool {
 	if d.store == nil {
 		return false
@@ -523,6 +621,9 @@
 // node it calls the given callback function to delete its log record.
 // This function should only be called when Sync determines that all devices
 // that know about the object have gotten past this version.
+// Also track any transaction sets affected by deleting DAG objects that
+// have transaction IDs.  This is later used to do garbage collection
+// on transaction sets when pruneDone() is called.
 func (d *dag) prune(oid storage.ID, version storage.Version, delLogRec func(logrec string) error) error {
 	if d.store == nil {
 		return errors.New("invalid DAG")
@@ -548,8 +649,17 @@
 
 	// Delete all ancestor nodes and their log records.
 	// Delete as many as possible and track the error counts.
+	// Keep track of objects deleted from transaction in order
+	// to cleanup transaction sets when pruneDone() is called.
 	numNodeErrs, numLogErrs := 0, 0
 	err = d.ancestorIter(oid, iterVersions, func(oid storage.ID, v storage.Version, node *dagNode) error {
+		if tid := node.TxID; tid != NoTxID {
+			if d.txGC[tid] == nil {
+				d.txGC[tid] = make(dagTxMap)
+			}
+			d.txGC[tid][oid] = v
+		}
+
 		if err := delLogRec(node.Logrec); err != nil {
 			numLogErrs++
 		}
@@ -567,6 +677,40 @@
 	return nil
 }
 
+// pruneDone is called when object pruning is finished within a single pass
+// of the Sync garbage collector.  It updates the transaction sets affected
+// by the objects deleted by the prune() calls.
+func (d *dag) pruneDone() error {
+	if d.store == nil {
+		return errors.New("invalid DAG")
+	}
+
+	// Update transaction sets by removing from them the objects that
+	// were pruned.  If the resulting set is empty, delete it.
+	for tid, txMapGC := range d.txGC {
+		txMap, err := d.getTransaction(tid)
+		if err != nil {
+			return err
+		}
+
+		for oid := range txMapGC {
+			delete(txMap, oid)
+		}
+
+		if len(txMap) > 0 {
+			err = d.setTransaction(tid, txMap)
+		} else {
+			err = d.delTransaction(tid)
+		}
+		if err != nil {
+			return err
+		}
+	}
+
+	d.clearTxGC()
+	return nil
+}
+
 // getLogrec returns the log record information for a given object version.
 func (d *dag) getLogrec(oid storage.ID, version storage.Version) (string, error) {
 	node, err := d.getNode(oid, version)
@@ -577,13 +721,13 @@
 }
 
 // objNodeKey returns the key used to access the object node (oid, version)
-// in the DAG.
+// in the DAG DB.
 func objNodeKey(oid storage.ID, version storage.Version) string {
 	return fmt.Sprintf("%s:%d", oid.String(), version)
 }
 
 // setNode stores the dagNode structure for the object node (oid, version)
-// in the DAG.
+// in the DAG DB.
 func (d *dag) setNode(oid storage.ID, version storage.Version, node *dagNode) error {
 	if d.store == nil {
 		return errors.New("invalid DAG")
@@ -593,7 +737,7 @@
 }
 
 // getNode retrieves the dagNode structure for the object node (oid, version)
-// from the DAG.
+// from the DAG DB.
 func (d *dag) getNode(oid storage.ID, version storage.Version) (*dagNode, error) {
 	if d.store == nil {
 		return nil, errors.New("invalid DAG")
@@ -606,7 +750,7 @@
 	return &node, nil
 }
 
-// delNode deletes the object node (oid, version) from the DAG.
+// delNode deletes the object node (oid, version) from the DAG DB.
 func (d *dag) delNode(oid storage.ID, version storage.Version) error {
 	if d.store == nil {
 		return errors.New("invalid DAG")
@@ -615,12 +759,12 @@
 	return d.nodes.del(key)
 }
 
-// objHeadKey returns the key used to access the object head in the DAG.
+// objHeadKey returns the key used to access the object head in the DAG DB.
 func objHeadKey(oid storage.ID) string {
 	return oid.String()
 }
 
-// setHead stores version as the object head in the DAG.
+// setHead stores version as the object head in the DAG DB.
 func (d *dag) setHead(oid storage.ID, version storage.Version) error {
 	if d.store == nil {
 		return errors.New("invalid DAG")
@@ -629,7 +773,7 @@
 	return d.heads.set(key, version)
 }
 
-// getHead retrieves the object head from the DAG.
+// getHead retrieves the object head from the DAG DB.
 func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
 	var version storage.Version
 	if d.store == nil {
@@ -643,6 +787,51 @@
 	return version, err
 }
 
+// dagTransactionKey returns the key used to access the transaction in the DAG DB.
+func dagTransactionKey(tid TxID) string {
+	return fmt.Sprintf("%v", tid)
+}
+
+// setTransaction stores the transaction object/version map in the DAG DB.
+func (d *dag) setTransaction(tid TxID, txMap dagTxMap) error {
+	if d.store == nil {
+		return errors.New("invalid DAG")
+	}
+	if tid == NoTxID {
+		return fmt.Errorf("invalid TxID: %v", tid)
+	}
+	key := dagTransactionKey(tid)
+	return d.trans.set(key, txMap)
+}
+
+// getTransaction retrieves the transaction object/version map from the DAG DB.
+func (d *dag) getTransaction(tid TxID) (dagTxMap, error) {
+	if d.store == nil {
+		return nil, errors.New("invalid DAG")
+	}
+	if tid == NoTxID {
+		return nil, fmt.Errorf("invalid TxID: %v", tid)
+	}
+	var txMap dagTxMap
+	key := dagTransactionKey(tid)
+	if err := d.trans.get(key, &txMap); err != nil {
+		return nil, err
+	}
+	return txMap, nil
+}
+
+// delTransaction deletes the transation object/version map from the DAG DB.
+func (d *dag) delTransaction(tid TxID) error {
+	if d.store == nil {
+		return errors.New("invalid DAG")
+	}
+	if tid == NoTxID {
+		return fmt.Errorf("invalid TxID: %v", tid)
+	}
+	key := dagTransactionKey(tid)
+	return d.trans.del(key)
+}
+
 // getParentMap is a testing and debug helper function that returns for
 // an object a map of all the object version in the DAG and their parents.
 // The map represents the graph of the object version history.
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 659a586..a36bc4c 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -99,7 +99,7 @@
 		t.Error(err)
 	}
 
-	err = dag.addNode(oid, 4, false, []storage.Version{2, 3}, "foobar")
+	err = dag.addNode(oid, 4, false, []storage.Version{2, 3}, "foobar", NoTxID)
 	if err == nil || err.Error() != "invalid DAG" {
 		t.Errorf("addNode() did not fail on a closed DAG: %v", err)
 	}
@@ -126,6 +126,11 @@
 		t.Errorf("prune() did not fail on a closed DAG: %v", err)
 	}
 
+	err = dag.pruneDone()
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("pruneDone() did not fail on a closed DAG: %v", err)
+	}
+
 	node := &dagNode{Level: 15, Parents: []storage.Version{444, 555}, Logrec: "logrec-23"}
 	err = dag.setNode(oid, 4, node)
 	if err == nil || err.Error() != "invalid DAG" {
@@ -162,8 +167,33 @@
 		t.Errorf("compact() did not fail on a closed DAG: %v", err)
 	}
 
+	if tid := dag.addNodeTxStart(); tid != NoTxID {
+		t.Errorf("addNodeTxStart() did not fail on a closed DAG: TxID %v", tid)
+	}
+
+	err = dag.addNodeTxEnd(1)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("addNodeTxEnd() did not fail on a closed DAG: %v", err)
+	}
+
+	err = dag.setTransaction(1, nil)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("setTransaction() did not fail on a closed DAG: %v", err)
+	}
+
+	_, err = dag.getTransaction(1)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("getTransaction() did not fail on a closed DAG: %v", err)
+	}
+
+	err = dag.delTransaction(1)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("delTransaction() did not fail on a closed DAG: %v", err)
+	}
+
 	// These calls should be harmless NOPs.
 	dag.clearGraft()
+	dag.clearTxGC()
 	dag.flush()
 	dag.close()
 	if dag.hasNode(oid, 4) {
@@ -421,12 +451,16 @@
 	// Clear grafting info; this happens at the end of a sync log replay.
 	d.clearGraft()
 
-	// There should be no grafting info, and hasConflict() should fail.
+	// There should be no grafting or transaction info, and hasConflict() should fail.
 	newHeads, grafts := d.getGraftNodes(oid)
 	if newHeads != nil || grafts != nil {
 		return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
 	}
 
+	if n := len(d.txSet); n != 0 {
+		return fmt.Errorf("transaction set not empty: %d entries found", n)
+	}
+
 	isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
 	if errConflict == nil {
 		return fmt.Errorf("Object %d: conflict did not fail: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
@@ -472,26 +506,26 @@
 	}
 
 	// Make sure an existing node cannot be added again.
-	if err = dag.addNode(oid, 1, false, []storage.Version{0, 2}, "foobar"); err == nil {
+	if err = dag.addNode(oid, 1, false, []storage.Version{0, 2}, "foobar", NoTxID); err == nil {
 		t.Errorf("addNode() did not fail when given an existing node")
 	}
 
 	// Make sure a new node cannot have more than 2 parents.
-	if err = dag.addNode(oid, 3, false, []storage.Version{0, 1, 2}, "foobar"); err == nil {
+	if err = dag.addNode(oid, 3, false, []storage.Version{0, 1, 2}, "foobar", NoTxID); err == nil {
 		t.Errorf("addNode() did not fail when given 3 parents")
 	}
 
 	// Make sure a new node cannot have an invalid parent.
-	if err = dag.addNode(oid, 3, false, []storage.Version{0, 555}, "foobar"); err == nil {
+	if err = dag.addNode(oid, 3, false, []storage.Version{0, 555}, "foobar", NoTxID); err == nil {
 		t.Errorf("addNode() did not fail when using an invalid parent")
 	}
 
 	// Make sure a new root node (no parents) cannot be added once a root exists.
 	// For the parents array, check both the "nil" and the empty array as input.
-	if err = dag.addNode(oid, 6789, false, nil, "foobar"); err == nil {
+	if err = dag.addNode(oid, 6789, false, nil, "foobar", NoTxID); err == nil {
 		t.Errorf("Adding a 2nd root node (nil parents) for object %d in DAG file %s did not fail", oid, dagfile)
 	}
-	if err = dag.addNode(oid, 6789, false, []storage.Version{}, "foobar"); err == nil {
+	if err = dag.addNode(oid, 6789, false, []storage.Version{}, "foobar", NoTxID); err == nil {
 		t.Errorf("Adding a 2nd root node (empty parents) for object %d in DAG file %s did not fail", oid, dagfile)
 	}
 
@@ -982,6 +1016,11 @@
 			t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
 		}
 
+		err = dag.pruneDone()
+		if err != nil {
+			t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+		}
+
 		// Remove pruned nodes from the expected parent map used to validate
 		// and set the parents of the pruned node to nil.
 		if version < 10 {
@@ -1049,6 +1088,11 @@
 		t.Errorf("pruning object %d:%d deleted %d log records instead of %d", oid, version, del, expDel)
 	}
 
+	err = dag.pruneDone()
+	if err != nil {
+		t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+	}
+
 	if head, err := dag.getHead(oid); err != nil || head != 8 {
 		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
 	}
@@ -1499,3 +1543,376 @@
 		t.Fatal(err)
 	}
 }
+
+// TestAddNodeTransactional tests adding multiple DAG nodes grouped within a transaction.
+func TestAddNodeTransactional(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-02.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	oid_a, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+	oid_b, err := strToObjID("67890")
+	if err != nil {
+		t.Fatal(err)
+	}
+	oid_c, err := strToObjID("222")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Verify NoTxID is reported as an error.
+	if err := dag.addNodeTxEnd(NoTxID); err == nil {
+		t.Errorf("addNodeTxEnd() did not fail for invalid 'NoTxID' value")
+	}
+	if _, err := dag.getTransaction(NoTxID); err == nil {
+		t.Errorf("getTransaction() did not fail for invalid 'NoTxID' value")
+	}
+	if err := dag.setTransaction(NoTxID, nil); err == nil {
+		t.Errorf("setTransaction() did not fail for invalid 'NoTxID' value")
+	}
+	if err := dag.delTransaction(NoTxID); err == nil {
+		t.Errorf("delTransaction() did not fail for invalid 'NoTxID' value")
+	}
+
+	// Mutate 2 objects within a transaction.
+	tid_1 := dag.addNodeTxStart()
+	if tid_1 == NoTxID {
+		t.Fatal("Cannot start 1st DAG addNode() transaction")
+	}
+
+	txMap, ok := dag.txSet[tid_1]
+	if !ok {
+		t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_1, dagfile)
+	}
+	if n := len(txMap); n != 0 {
+		t.Errorf("Transactions map for Tx ID %v has length %d instead of 0 in DAG file %s", tid_1, n, dagfile)
+	}
+
+	if err := dag.addNode(oid_a, 3, false, []storage.Version{2}, "logrec-a-03", tid_1); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+	}
+	if err := dag.addNode(oid_b, 3, false, []storage.Version{2}, "logrec-b-03", tid_1); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_1, dagfile, err)
+	}
+
+	// At the same time mutate the 3rd object in another transaction.
+	tid_2 := dag.addNodeTxStart()
+	if tid_2 == NoTxID {
+		t.Fatal("Cannot start 2nd DAG addNode() transaction")
+	}
+
+	txMap, ok = dag.txSet[tid_2]
+	if !ok {
+		t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+	}
+	if n := len(txMap); n != 0 {
+		t.Errorf("Transactions map for Tx ID %v has length %d instead of 0 in DAG file %s", tid_2, n, dagfile)
+	}
+
+	if err := dag.addNode(oid_c, 2, false, []storage.Version{1}, "logrec-c-02", tid_2); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_c, tid_2, dagfile, err)
+	}
+
+	// Verify the in-memory transaction sets constructed.
+	txMap, ok = dag.txSet[tid_1]
+	if !ok {
+		t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_1, dagfile)
+	}
+
+	expTxMap := dagTxMap{oid_a: 3, oid_b: 3}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_1, dagfile, txMap, expTxMap)
+	}
+
+	txMap, ok = dag.txSet[tid_2]
+	if !ok {
+		t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+	}
+
+	expTxMap = dagTxMap{oid_c: 2}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+	}
+
+	// Verify failing to use a Tx ID not returned by addNodeTxStart().
+	bad_tid := tid_1 + 1
+	for bad_tid == NoTxID || bad_tid == tid_2 {
+		bad_tid++
+	}
+
+	if err := dag.addNode(oid_c, 3, false, []storage.Version{2}, "logrec-c-03", bad_tid); err == nil {
+		t.Errorf("addNode() did not fail on object %d for a bad Tx ID %v in DAG file %s", oid_c, bad_tid, dagfile)
+	}
+	if err := dag.addNodeTxEnd(bad_tid); err == nil {
+		t.Errorf("addNodeTxEnd() did not fail for a bad Tx ID %v in DAG file %s", bad_tid, dagfile)
+	}
+
+	// End the 1st transaction and verify the in-memory and in-DAG data.
+	if err := dag.addNodeTxEnd(tid_1); err != nil {
+		t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+	}
+
+	if _, ok = dag.txSet[tid_1]; ok {
+		t.Errorf("Transactions map for Tx ID %v still exists in DAG file %s", tid_1, dagfile)
+	}
+
+	txMap, err = dag.getTransaction(tid_1)
+	if err != nil {
+		t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+	}
+
+	expTxMap = dagTxMap{oid_a: 3, oid_b: 3}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map from DAG storage for Tx ID %v in DAG file %s: %v instead of %v",
+			tid_1, dagfile, txMap, expTxMap)
+	}
+
+	txMap, ok = dag.txSet[tid_2]
+	if !ok {
+		t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+	}
+
+	expTxMap = dagTxMap{oid_c: 2}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+	}
+
+	// End the 2nd transaction and re-verify the in-memory and in-DAG data.
+	if err := dag.addNodeTxEnd(tid_2); err != nil {
+		t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+	}
+
+	if _, ok = dag.txSet[tid_2]; ok {
+		t.Errorf("Transactions map for Tx ID %v still exists in DAG file %s", tid_2, dagfile)
+	}
+
+	txMap, err = dag.getTransaction(tid_2)
+	if err != nil {
+		t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+	}
+
+	expTxMap = dagTxMap{oid_c: 2}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+	}
+
+	if n := len(dag.txSet); n != 0 {
+		t.Errorf("Transaction sets in-memory: %d entries found, should be empty in DAG file %s", n, dagfile)
+	}
+
+	// Get the 3 new nodes from the DAG and verify their Tx IDs.
+	node, err := dag.getNode(oid_a, 3)
+	if err != nil {
+		t.Errorf("Cannot find object %d:3 in DAG file %s: %v", oid_a, dagfile, err)
+	}
+	if node.TxID != tid_1 {
+		t.Errorf("Invalid TxID for object %d:3 in DAG file %s: %v instead of %v", oid_a, dagfile, node.TxID, tid_1)
+	}
+	node, err = dag.getNode(oid_b, 3)
+	if err != nil {
+		t.Errorf("Cannot find object %d:3 in DAG file %s: %v", oid_b, dagfile, err)
+	}
+	if node.TxID != tid_1 {
+		t.Errorf("Invalid TxID for object %d:3 in DAG file %s: %v instead of %v", oid_b, dagfile, node.TxID, tid_1)
+	}
+	node, err = dag.getNode(oid_c, 2)
+	if err != nil {
+		t.Errorf("Cannot find object %d:2 in DAG file %s: %v", oid_c, dagfile, err)
+	}
+	if node.TxID != tid_2 {
+		t.Errorf("Invalid TxID for object %d:2 in DAG file %s: %v instead of %v", oid_c, dagfile, node.TxID, tid_2)
+	}
+
+	for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+		if err := checkEndOfSync(dag, oid); err != nil {
+			t.Fatal(err)
+		}
+	}
+}
+
+// TestPruningTransactions tests pruning DAG nodes grouped within transactions.
+func TestPruningTransactions(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-02.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	oid_a, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+	oid_b, err := strToObjID("67890")
+	if err != nil {
+		t.Fatal(err)
+	}
+	oid_c, err := strToObjID("222")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Mutate objects in 2 transactions then add non-transactional mutations
+	// to act as the pruning points.  Before pruning the DAG is:
+	// a1 -- a2 -- (a3) --- a4
+	// b1 -- b2 -- (b3) -- (b4) -- b5
+	// c1 ---------------- (c2)
+	// Now by pruning at (a4, b5, c2), the new DAG should be:
+	// a4
+	// b5
+	// (c2)
+	// Transaction 1 (a3, b3) gets deleted, but transaction 2 (b4, c2) still
+	// has (c2) dangling waiting for a future pruning.
+	tid_1 := dag.addNodeTxStart()
+	if tid_1 == NoTxID {
+		t.Fatal("Cannot start 1st DAG addNode() transaction")
+	}
+	if err := dag.addNode(oid_a, 3, false, []storage.Version{2}, "logrec-a-03", tid_1); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+	}
+	if err := dag.addNode(oid_b, 3, false, []storage.Version{2}, "logrec-b-03", tid_1); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_1, dagfile, err)
+	}
+	if err := dag.addNodeTxEnd(tid_1); err != nil {
+		t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+	}
+
+	tid_2 := dag.addNodeTxStart()
+	if tid_2 == NoTxID {
+		t.Fatal("Cannot start 2nd DAG addNode() transaction")
+	}
+	if err := dag.addNode(oid_b, 4, false, []storage.Version{3}, "logrec-b-04", tid_2); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_2, dagfile, err)
+	}
+	if err := dag.addNode(oid_c, 2, false, []storage.Version{1}, "logrec-c-02", tid_2); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_c, tid_2, dagfile, err)
+	}
+	if err := dag.addNodeTxEnd(tid_2); err != nil {
+		t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+	}
+
+	if err := dag.addNode(oid_a, 4, false, []storage.Version{3}, "logrec-a-04", NoTxID); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+	}
+	if err := dag.addNode(oid_b, 5, false, []storage.Version{4}, "logrec-b-05", NoTxID); err != nil {
+		t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_2, dagfile, err)
+	}
+
+	if err = dag.moveHead(oid_a, 4); err != nil {
+		t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_a, dagfile, err)
+	}
+	if err = dag.moveHead(oid_b, 5); err != nil {
+		t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_b, dagfile, err)
+	}
+	if err = dag.moveHead(oid_c, 2); err != nil {
+		t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_c, dagfile, err)
+	}
+
+	// Verify the transaction sets.
+	txMap, err := dag.getTransaction(tid_1)
+	if err != nil {
+		t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+	}
+
+	expTxMap := dagTxMap{oid_a: 3, oid_b: 3}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map from DAG storage for Tx ID %v in DAG file %s: %v instead of %v",
+			tid_1, dagfile, txMap, expTxMap)
+	}
+
+	txMap, err = dag.getTransaction(tid_2)
+	if err != nil {
+		t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+	}
+
+	expTxMap = dagTxMap{oid_b: 4, oid_c: 2}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+	}
+
+	// Prune the 3 objects at their head nodes.
+	for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+		head, err := dag.getHead(oid)
+		if err != nil {
+			t.Errorf("Cannot getHead() on object %d in DAG file %s: %v", oid, dagfile, err)
+		}
+		err = dag.prune(oid, head, func(lr string) error {
+			return nil
+		})
+		if err != nil {
+			t.Errorf("Cannot prune() on object %d in DAG file %s: %v", oid, dagfile, err)
+		}
+	}
+
+	if err = dag.pruneDone(); err != nil {
+		t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+	}
+
+	if n := len(dag.txGC); n != 0 {
+		t.Errorf("Transaction GC map not empty after pruneDone() in DAG file %s: %d", dagfile, n)
+	}
+
+	// Verify that Tx-1 was deleted and Tx-2 still has c2 in it.
+	txMap, err = dag.getTransaction(tid_1)
+	if err == nil {
+		t.Errorf("getTransaction() did not fail for Tx ID %v in DAG file %s: %v", tid_1, dagfile, txMap)
+	}
+
+	txMap, err = dag.getTransaction(tid_2)
+	if err != nil {
+		t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+	}
+
+	expTxMap = dagTxMap{oid_c: 2}
+	if !reflect.DeepEqual(txMap, expTxMap) {
+		t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+	}
+
+	// Add c3 as a new head and prune at that point.  This should GC Tx-2.
+	if err := dag.addNode(oid_c, 3, false, []storage.Version{2}, "logrec-c-03", NoTxID); err != nil {
+		t.Errorf("Cannot addNode() on object %d in DAG file %s: %v", oid_c, dagfile, err)
+	}
+	if err = dag.moveHead(oid_c, 3); err != nil {
+		t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_c, dagfile, err)
+	}
+
+	err = dag.prune(oid_c, 3, func(lr string) error {
+		return nil
+	})
+	if err != nil {
+		t.Errorf("Cannot prune() on object %d in DAG file %s: %v", oid_c, dagfile, err)
+	}
+	if err = dag.pruneDone(); err != nil {
+		t.Errorf("pruneDone() #2 failed in DAG file %s: %v", dagfile, err)
+	}
+	if n := len(dag.txGC); n != 0 {
+		t.Errorf("Transaction GC map not empty after pruneDone() in DAG file %s: %d", dagfile, n)
+	}
+
+	txMap, err = dag.getTransaction(tid_2)
+	if err == nil {
+		t.Errorf("getTransaction() did not fail for Tx ID %v in DAG file %s: %v", tid_2, dagfile, txMap)
+	}
+
+	for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+		if err := checkEndOfSync(dag, oid); err != nil {
+			t.Fatal(err)
+		}
+	}
+}
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 930df89..7c22050 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -412,7 +412,7 @@
 	}
 
 	// Insert the new log record into dag.
-	if err = l.s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey); err != nil {
+	if err = l.s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey, NoTxID); err != nil {
 		return err
 	}
 
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 76aa7da..13b8e76 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -309,7 +309,7 @@
 	vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
 	switch rec.RecType {
 	case NodeRec:
-		return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+		return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey, NoTxID)
 	case LinkRec:
 		return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
 	default:
@@ -617,7 +617,7 @@
 			// Add a new DAG node.
 			switch rec.RecType {
 			case NodeRec:
-				err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+				err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey, NoTxID)
 			case LinkRec:
 				err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
 			default:
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 468c882..769aff8 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -48,7 +48,7 @@
 	if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
 		t.Errorf("GetLogRec didn't fail")
 	}
-	if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey); err != nil {
+	if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey, NoTxID); err != nil {
 		t.Errorf("AddNode failed with err %v", err)
 	}
 	curRec, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers)
@@ -97,7 +97,7 @@
 		if err != nil {
 			t.Errorf("PutLogRec failed with err %v", err)
 		}
-		if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey); err != nil {
+		if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey, NoTxID); err != nil {
 			t.Errorf("AddNode failed with err %v", err)
 		}
 	}
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 02cd561..82bff82 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -192,7 +192,7 @@
 	for _, cmd := range cmds {
 		switch cmd.cmd {
 		case addLocal:
-			if err = dag.addNode(cmd.objID, cmd.version, false, cmd.parents, cmd.logrec); err != nil {
+			if err = dag.addNode(cmd.objID, cmd.version, false, cmd.parents, cmd.logrec, NoTxID); err != nil {
 				return fmt.Errorf("cannot add local node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
 			}
 			if err := dag.moveHead(cmd.objID, cmd.version); err != nil {
@@ -201,7 +201,7 @@
 			dag.flush()
 
 		case addRemote:
-			if err = dag.addNode(cmd.objID, cmd.version, true, cmd.parents, cmd.logrec); err != nil {
+			if err = dag.addNode(cmd.objID, cmd.version, true, cmd.parents, cmd.logrec, NoTxID); err != nil {
 				return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
 			}
 			dag.flush()
diff --git a/runtimes/google/vsync/testdata/local-init-02.sync b/runtimes/google/vsync/testdata/local-init-02.sync
new file mode 100644
index 0000000..94307d9
--- /dev/null
+++ b/runtimes/google/vsync/testdata/local-init-02.sync
@@ -0,0 +1,10 @@
+# Create DAGs for 3 objects locally.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>
+
+addl|12345|1|||logrec-a-01
+addl|12345|2|1||logrec-a-02
+
+addl|67890|1|||logrec-b-01
+addl|67890|2|1||logrec-b-02
+
+addl|222|1|||logrec-c-01
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 5699fbc..3507aad 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -163,7 +163,7 @@
 	if err != nil {
 		return err
 	}
-	if err := s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey); err != nil {
+	if err := s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey, NoTxID); err != nil {
 		return err
 	}
 	if err := s.dag.moveHead(rec.ObjID, rec.CurVers); err != nil {