Track the transaction scope in the Sync DAG.
When new DAG nodes are added as part of a Store transaction, track their
grouping. This will be used to determine the scope of the conflict
resolution when one object inside a transaction has a conflict.
Change-Id: Id7025200b3e2ca3ec66be8405e756170f3f9d35c
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index ac1a342..7f97c99 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -68,13 +68,16 @@
// in the Veyron Store) have moved past some version for that object, the
// DAG for that object can be pruned, deleting all prior (ancestor) nodes.
//
-// The DAG DB contains two tables persisted to disk (nodes, heads) and
-// an in-memory (ephemeral) graft map:
+// The DAG DB contains three tables persisted to disk (nodes, heads, trans)
+// and three in-memory (ephemeral) maps (graft, txSet, txGC):
// * nodes: one entry per (object, version) with references to the
-// parent node(s) it is derived from, and a reference to the
-// log record identifying that change
+// parent node(s) it is derived from, a reference to the
+// log record identifying that change, and a reference to
+// its transaction set (or NoTxID if none)
// * heads: one entry per object pointing to its most recent version
-// in the nodes collection
+// in the nodes table
+// * trans: one entry per transaction ID containing the set of objects
+// that forms the transaction and their versions.
// * graft: during a sync operation, it tracks the nodes where the new
// DAG fragments are attached to the existing graph for each
// mutated object. This map is used to determine whether a
@@ -82,6 +85,14 @@
// recent common ancestor from these graft points to use when
// resolving the conflict. At the end of a sync operation the
// graft map is destroyed.
+// * txSet: used to incrementally construct the transaction sets that
+// are stored in the "trans" table once all the nodes of a
+// transaction have been added. Multiple transaction sets
+// can be constructed to support the concurrency between the
+// Sync Initiator and Watcher threads.
+// * txGC: used to track the transactions impacted by objects being
+// pruned. At the end of the pruning operation the records
+// of the "trans" table are updated from the txGC information.
//
// Note: for regular (no-conflict) changes, a node has a reference to
// one parent from which it was derived. When a conflict is resolved,
@@ -95,23 +106,37 @@
"container/list"
"errors"
"fmt"
+ "math/rand"
+ "time"
"veyron2/storage"
"veyron2/vlog"
)
+const (
+ NoTxID = TxID(0)
+)
+
+type TxID uint64
+type dagTxMap map[storage.ID]storage.Version
+
type dag struct {
fname string // file pathname
store *kvdb // underlying K/V store
heads *kvtable // pointer to "heads" table in the store
nodes *kvtable // pointer to "nodes" table in the store
+ trans *kvtable // pointer to "trans" table in the store
graft map[storage.ID]*graftInfo // in-memory state of DAG object grafting
+ txSet map[TxID]dagTxMap // in-memory construction of transaction sets
+ txGC map[TxID]dagTxMap // in-memory tracking of transaction sets to cleanup
+ txGen *rand.Rand // transaction ID random number generator
}
type dagNode struct {
Level uint64 // node distance from root
Parents []storage.Version // references to parent versions
Logrec string // reference to log record change
+ TxID TxID // ID of a transaction set
}
type graftInfo struct {
@@ -123,8 +148,8 @@
// openDAG opens or creates a DAG for the given filename.
func openDAG(filename string) (*dag, error) {
// Open the file and create it if it does not exist.
- // Also initialize the store and its two collections.
- db, tbls, err := kvdbOpen(filename, []string{"heads", "nodes"})
+ // Also initialize the store and its tables.
+ db, tbls, err := kvdbOpen(filename, []string{"heads", "nodes", "trans"})
if err != nil {
return nil, err
}
@@ -134,9 +159,13 @@
store: db,
heads: tbls[0],
nodes: tbls[1],
+ trans: tbls[2],
+ txGen: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
+ txSet: make(map[TxID]dagTxMap),
}
d.clearGraft()
+ d.clearTxGC()
return d, nil
}
@@ -161,13 +190,14 @@
if d.store == nil {
return errors.New("invalid DAG")
}
- db, tbls, err := d.store.compact(d.fname, []string{"heads", "nodes"})
+ db, tbls, err := d.store.compact(d.fname, []string{"heads", "nodes", "trans"})
if err != nil {
return err
}
d.store = db
d.heads = tbls[0]
d.nodes = tbls[1]
+ d.trans = tbls[2]
return nil
}
@@ -178,6 +208,13 @@
}
}
+// clearTxGC clears the temporary in-memory transaction garbage collection maps.
+func (d *dag) clearTxGC() {
+ if d.store != nil {
+ d.txGC = make(map[TxID]dagTxMap)
+ }
+}
+
// getObjectGraft returns the graft structure for an object ID.
// The graftInfo struct for an object is ephemeral (in-memory) and it
// tracks the following information:
@@ -218,6 +255,52 @@
return graft
}
+// addNodeTxStart generates a transaction ID and returns it to the caller.
+// The transaction ID is purely internal to the DAG. It is used to track
+// DAG nodes that are part of the same transaction.
+func (d *dag) addNodeTxStart() TxID {
+ if d.store == nil {
+ return NoTxID
+ }
+
+ // Generate a random 64-bit transaction ID different than NoTxID.
+ // Also make sure the ID is not already being used.
+ tid := NoTxID
+ for (tid == NoTxID) || (d.txSet[tid] != nil) {
+ // Generate an unsigned 64-bit random value by combining a
+ // random 63-bit value and a random 1-bit value.
+ tid = (TxID(d.txGen.Int63()) << 1) | TxID(d.txGen.Int63n(2))
+ }
+
+ // Initialize the in-memory object/version map for that transaction ID.
+ d.txSet[tid] = make(dagTxMap)
+
+ return tid
+}
+
+// addNodeTxEnd marks the end of a given transaction.
+// The DAG completes its internal tracking of the transaction information.
+func (d *dag) addNodeTxEnd(tid TxID) error {
+ if d.store == nil {
+ return errors.New("invalid DAG")
+ }
+ if tid == NoTxID {
+ return fmt.Errorf("invalid TxID: %v", tid)
+ }
+
+ txMap, ok := d.txSet[tid]
+ if !ok {
+ return fmt.Errorf("unknown transaction ID: %v", tid)
+ }
+
+ if err := d.setTransaction(tid, txMap); err != nil {
+ return err
+ }
+
+ delete(d.txSet, tid)
+ return nil
+}
+
// addNode adds a new node for an object in the DAG, linking it to its parent nodes.
// It verifies that this node does not exist and that its parent nodes are valid.
// It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -228,10 +311,15 @@
// - If a parent node is not new, mark it as a DAG graft point.
// - Mark this version as a new node.
// - Update the new head node pointer of the grafted DAG.
-func (d *dag) addNode(oid storage.ID, version storage.Version, remote bool, parents []storage.Version, logrec string) error {
+//
+// If the transaction ID is set to NoTxID, this node is not part of a transaction.
+// Otherwise, track its membership in the given transaction ID.
+func (d *dag) addNode(oid storage.ID, version storage.Version, remote bool,
+ parents []storage.Version, logrec string, tid TxID) error {
if d.store == nil {
return errors.New("invalid DAG")
}
+
if parents != nil {
if len(parents) > 2 {
return fmt.Errorf("cannot have more than 2 parents, not %d", len(parents))
@@ -314,12 +402,22 @@
graft.newHeads[version] = struct{}{}
}
+ // If this node is part of a transaction, add it to that set.
+ if tid != NoTxID {
+ txMap, ok := d.txSet[tid]
+ if !ok {
+ return fmt.Errorf("unknown transaction ID: %v", tid)
+ }
+
+ txMap[oid] = version
+ }
+
// Insert the new node in the kvdb.
- node := &dagNode{Level: level, Parents: parents, Logrec: logrec}
+ node := &dagNode{Level: level, Parents: parents, Logrec: logrec, TxID: tid}
return d.setNode(oid, version, node)
}
-// hasNode returns true if the node (oid, version) exists in the DAG.
+// hasNode returns true if the node (oid, version) exists in the DAG DB.
func (d *dag) hasNode(oid storage.ID, version storage.Version) bool {
if d.store == nil {
return false
@@ -523,6 +621,9 @@
// node it calls the given callback function to delete its log record.
// This function should only be called when Sync determines that all devices
// that know about the object have gotten past this version.
+// Also track any transaction sets affected by deleting DAG objects that
+// have transaction IDs. This is later used to do garbage collection
+// on transaction sets when pruneDone() is called.
func (d *dag) prune(oid storage.ID, version storage.Version, delLogRec func(logrec string) error) error {
if d.store == nil {
return errors.New("invalid DAG")
@@ -548,8 +649,17 @@
// Delete all ancestor nodes and their log records.
// Delete as many as possible and track the error counts.
+ // Keep track of objects deleted from transaction in order
+ // to cleanup transaction sets when pruneDone() is called.
numNodeErrs, numLogErrs := 0, 0
err = d.ancestorIter(oid, iterVersions, func(oid storage.ID, v storage.Version, node *dagNode) error {
+ if tid := node.TxID; tid != NoTxID {
+ if d.txGC[tid] == nil {
+ d.txGC[tid] = make(dagTxMap)
+ }
+ d.txGC[tid][oid] = v
+ }
+
if err := delLogRec(node.Logrec); err != nil {
numLogErrs++
}
@@ -567,6 +677,40 @@
return nil
}
+// pruneDone is called when object pruning is finished within a single pass
+// of the Sync garbage collector. It updates the transaction sets affected
+// by the objects deleted by the prune() calls.
+func (d *dag) pruneDone() error {
+ if d.store == nil {
+ return errors.New("invalid DAG")
+ }
+
+ // Update transaction sets by removing from them the objects that
+ // were pruned. If the resulting set is empty, delete it.
+ for tid, txMapGC := range d.txGC {
+ txMap, err := d.getTransaction(tid)
+ if err != nil {
+ return err
+ }
+
+ for oid := range txMapGC {
+ delete(txMap, oid)
+ }
+
+ if len(txMap) > 0 {
+ err = d.setTransaction(tid, txMap)
+ } else {
+ err = d.delTransaction(tid)
+ }
+ if err != nil {
+ return err
+ }
+ }
+
+ d.clearTxGC()
+ return nil
+}
+
// getLogrec returns the log record information for a given object version.
func (d *dag) getLogrec(oid storage.ID, version storage.Version) (string, error) {
node, err := d.getNode(oid, version)
@@ -577,13 +721,13 @@
}
// objNodeKey returns the key used to access the object node (oid, version)
-// in the DAG.
+// in the DAG DB.
func objNodeKey(oid storage.ID, version storage.Version) string {
return fmt.Sprintf("%s:%d", oid.String(), version)
}
// setNode stores the dagNode structure for the object node (oid, version)
-// in the DAG.
+// in the DAG DB.
func (d *dag) setNode(oid storage.ID, version storage.Version, node *dagNode) error {
if d.store == nil {
return errors.New("invalid DAG")
@@ -593,7 +737,7 @@
}
// getNode retrieves the dagNode structure for the object node (oid, version)
-// from the DAG.
+// from the DAG DB.
func (d *dag) getNode(oid storage.ID, version storage.Version) (*dagNode, error) {
if d.store == nil {
return nil, errors.New("invalid DAG")
@@ -606,7 +750,7 @@
return &node, nil
}
-// delNode deletes the object node (oid, version) from the DAG.
+// delNode deletes the object node (oid, version) from the DAG DB.
func (d *dag) delNode(oid storage.ID, version storage.Version) error {
if d.store == nil {
return errors.New("invalid DAG")
@@ -615,12 +759,12 @@
return d.nodes.del(key)
}
-// objHeadKey returns the key used to access the object head in the DAG.
+// objHeadKey returns the key used to access the object head in the DAG DB.
func objHeadKey(oid storage.ID) string {
return oid.String()
}
-// setHead stores version as the object head in the DAG.
+// setHead stores version as the object head in the DAG DB.
func (d *dag) setHead(oid storage.ID, version storage.Version) error {
if d.store == nil {
return errors.New("invalid DAG")
@@ -629,7 +773,7 @@
return d.heads.set(key, version)
}
-// getHead retrieves the object head from the DAG.
+// getHead retrieves the object head from the DAG DB.
func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
var version storage.Version
if d.store == nil {
@@ -643,6 +787,51 @@
return version, err
}
+// dagTransactionKey returns the key used to access the transaction in the DAG DB.
+func dagTransactionKey(tid TxID) string {
+ return fmt.Sprintf("%v", tid)
+}
+
+// setTransaction stores the transaction object/version map in the DAG DB.
+func (d *dag) setTransaction(tid TxID, txMap dagTxMap) error {
+ if d.store == nil {
+ return errors.New("invalid DAG")
+ }
+ if tid == NoTxID {
+ return fmt.Errorf("invalid TxID: %v", tid)
+ }
+ key := dagTransactionKey(tid)
+ return d.trans.set(key, txMap)
+}
+
+// getTransaction retrieves the transaction object/version map from the DAG DB.
+func (d *dag) getTransaction(tid TxID) (dagTxMap, error) {
+ if d.store == nil {
+ return nil, errors.New("invalid DAG")
+ }
+ if tid == NoTxID {
+ return nil, fmt.Errorf("invalid TxID: %v", tid)
+ }
+ var txMap dagTxMap
+ key := dagTransactionKey(tid)
+ if err := d.trans.get(key, &txMap); err != nil {
+ return nil, err
+ }
+ return txMap, nil
+}
+
+// delTransaction deletes the transation object/version map from the DAG DB.
+func (d *dag) delTransaction(tid TxID) error {
+ if d.store == nil {
+ return errors.New("invalid DAG")
+ }
+ if tid == NoTxID {
+ return fmt.Errorf("invalid TxID: %v", tid)
+ }
+ key := dagTransactionKey(tid)
+ return d.trans.del(key)
+}
+
// getParentMap is a testing and debug helper function that returns for
// an object a map of all the object version in the DAG and their parents.
// The map represents the graph of the object version history.
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 659a586..a36bc4c 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -99,7 +99,7 @@
t.Error(err)
}
- err = dag.addNode(oid, 4, false, []storage.Version{2, 3}, "foobar")
+ err = dag.addNode(oid, 4, false, []storage.Version{2, 3}, "foobar", NoTxID)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("addNode() did not fail on a closed DAG: %v", err)
}
@@ -126,6 +126,11 @@
t.Errorf("prune() did not fail on a closed DAG: %v", err)
}
+ err = dag.pruneDone()
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("pruneDone() did not fail on a closed DAG: %v", err)
+ }
+
node := &dagNode{Level: 15, Parents: []storage.Version{444, 555}, Logrec: "logrec-23"}
err = dag.setNode(oid, 4, node)
if err == nil || err.Error() != "invalid DAG" {
@@ -162,8 +167,33 @@
t.Errorf("compact() did not fail on a closed DAG: %v", err)
}
+ if tid := dag.addNodeTxStart(); tid != NoTxID {
+ t.Errorf("addNodeTxStart() did not fail on a closed DAG: TxID %v", tid)
+ }
+
+ err = dag.addNodeTxEnd(1)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("addNodeTxEnd() did not fail on a closed DAG: %v", err)
+ }
+
+ err = dag.setTransaction(1, nil)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("setTransaction() did not fail on a closed DAG: %v", err)
+ }
+
+ _, err = dag.getTransaction(1)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("getTransaction() did not fail on a closed DAG: %v", err)
+ }
+
+ err = dag.delTransaction(1)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("delTransaction() did not fail on a closed DAG: %v", err)
+ }
+
// These calls should be harmless NOPs.
dag.clearGraft()
+ dag.clearTxGC()
dag.flush()
dag.close()
if dag.hasNode(oid, 4) {
@@ -421,12 +451,16 @@
// Clear grafting info; this happens at the end of a sync log replay.
d.clearGraft()
- // There should be no grafting info, and hasConflict() should fail.
+ // There should be no grafting or transaction info, and hasConflict() should fail.
newHeads, grafts := d.getGraftNodes(oid)
if newHeads != nil || grafts != nil {
return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
}
+ if n := len(d.txSet); n != 0 {
+ return fmt.Errorf("transaction set not empty: %d entries found", n)
+ }
+
isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
if errConflict == nil {
return fmt.Errorf("Object %d: conflict did not fail: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
@@ -472,26 +506,26 @@
}
// Make sure an existing node cannot be added again.
- if err = dag.addNode(oid, 1, false, []storage.Version{0, 2}, "foobar"); err == nil {
+ if err = dag.addNode(oid, 1, false, []storage.Version{0, 2}, "foobar", NoTxID); err == nil {
t.Errorf("addNode() did not fail when given an existing node")
}
// Make sure a new node cannot have more than 2 parents.
- if err = dag.addNode(oid, 3, false, []storage.Version{0, 1, 2}, "foobar"); err == nil {
+ if err = dag.addNode(oid, 3, false, []storage.Version{0, 1, 2}, "foobar", NoTxID); err == nil {
t.Errorf("addNode() did not fail when given 3 parents")
}
// Make sure a new node cannot have an invalid parent.
- if err = dag.addNode(oid, 3, false, []storage.Version{0, 555}, "foobar"); err == nil {
+ if err = dag.addNode(oid, 3, false, []storage.Version{0, 555}, "foobar", NoTxID); err == nil {
t.Errorf("addNode() did not fail when using an invalid parent")
}
// Make sure a new root node (no parents) cannot be added once a root exists.
// For the parents array, check both the "nil" and the empty array as input.
- if err = dag.addNode(oid, 6789, false, nil, "foobar"); err == nil {
+ if err = dag.addNode(oid, 6789, false, nil, "foobar", NoTxID); err == nil {
t.Errorf("Adding a 2nd root node (nil parents) for object %d in DAG file %s did not fail", oid, dagfile)
}
- if err = dag.addNode(oid, 6789, false, []storage.Version{}, "foobar"); err == nil {
+ if err = dag.addNode(oid, 6789, false, []storage.Version{}, "foobar", NoTxID); err == nil {
t.Errorf("Adding a 2nd root node (empty parents) for object %d in DAG file %s did not fail", oid, dagfile)
}
@@ -982,6 +1016,11 @@
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
+ err = dag.pruneDone()
+ if err != nil {
+ t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+ }
+
// Remove pruned nodes from the expected parent map used to validate
// and set the parents of the pruned node to nil.
if version < 10 {
@@ -1049,6 +1088,11 @@
t.Errorf("pruning object %d:%d deleted %d log records instead of %d", oid, version, del, expDel)
}
+ err = dag.pruneDone()
+ if err != nil {
+ t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+ }
+
if head, err := dag.getHead(oid); err != nil || head != 8 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
@@ -1499,3 +1543,376 @@
t.Fatal(err)
}
}
+
+// TestAddNodeTransactional tests adding multiple DAG nodes grouped within a transaction.
+func TestAddNodeTransactional(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-02.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ oid_a, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+ oid_b, err := strToObjID("67890")
+ if err != nil {
+ t.Fatal(err)
+ }
+ oid_c, err := strToObjID("222")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify NoTxID is reported as an error.
+ if err := dag.addNodeTxEnd(NoTxID); err == nil {
+ t.Errorf("addNodeTxEnd() did not fail for invalid 'NoTxID' value")
+ }
+ if _, err := dag.getTransaction(NoTxID); err == nil {
+ t.Errorf("getTransaction() did not fail for invalid 'NoTxID' value")
+ }
+ if err := dag.setTransaction(NoTxID, nil); err == nil {
+ t.Errorf("setTransaction() did not fail for invalid 'NoTxID' value")
+ }
+ if err := dag.delTransaction(NoTxID); err == nil {
+ t.Errorf("delTransaction() did not fail for invalid 'NoTxID' value")
+ }
+
+ // Mutate 2 objects within a transaction.
+ tid_1 := dag.addNodeTxStart()
+ if tid_1 == NoTxID {
+ t.Fatal("Cannot start 1st DAG addNode() transaction")
+ }
+
+ txMap, ok := dag.txSet[tid_1]
+ if !ok {
+ t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_1, dagfile)
+ }
+ if n := len(txMap); n != 0 {
+ t.Errorf("Transactions map for Tx ID %v has length %d instead of 0 in DAG file %s", tid_1, n, dagfile)
+ }
+
+ if err := dag.addNode(oid_a, 3, false, []storage.Version{2}, "logrec-a-03", tid_1); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+ }
+ if err := dag.addNode(oid_b, 3, false, []storage.Version{2}, "logrec-b-03", tid_1); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_1, dagfile, err)
+ }
+
+ // At the same time mutate the 3rd object in another transaction.
+ tid_2 := dag.addNodeTxStart()
+ if tid_2 == NoTxID {
+ t.Fatal("Cannot start 2nd DAG addNode() transaction")
+ }
+
+ txMap, ok = dag.txSet[tid_2]
+ if !ok {
+ t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+ }
+ if n := len(txMap); n != 0 {
+ t.Errorf("Transactions map for Tx ID %v has length %d instead of 0 in DAG file %s", tid_2, n, dagfile)
+ }
+
+ if err := dag.addNode(oid_c, 2, false, []storage.Version{1}, "logrec-c-02", tid_2); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_c, tid_2, dagfile, err)
+ }
+
+ // Verify the in-memory transaction sets constructed.
+ txMap, ok = dag.txSet[tid_1]
+ if !ok {
+ t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_1, dagfile)
+ }
+
+ expTxMap := dagTxMap{oid_a: 3, oid_b: 3}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_1, dagfile, txMap, expTxMap)
+ }
+
+ txMap, ok = dag.txSet[tid_2]
+ if !ok {
+ t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+ }
+
+ expTxMap = dagTxMap{oid_c: 2}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+ }
+
+ // Verify failing to use a Tx ID not returned by addNodeTxStart().
+ bad_tid := tid_1 + 1
+ for bad_tid == NoTxID || bad_tid == tid_2 {
+ bad_tid++
+ }
+
+ if err := dag.addNode(oid_c, 3, false, []storage.Version{2}, "logrec-c-03", bad_tid); err == nil {
+ t.Errorf("addNode() did not fail on object %d for a bad Tx ID %v in DAG file %s", oid_c, bad_tid, dagfile)
+ }
+ if err := dag.addNodeTxEnd(bad_tid); err == nil {
+ t.Errorf("addNodeTxEnd() did not fail for a bad Tx ID %v in DAG file %s", bad_tid, dagfile)
+ }
+
+ // End the 1st transaction and verify the in-memory and in-DAG data.
+ if err := dag.addNodeTxEnd(tid_1); err != nil {
+ t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+ }
+
+ if _, ok = dag.txSet[tid_1]; ok {
+ t.Errorf("Transactions map for Tx ID %v still exists in DAG file %s", tid_1, dagfile)
+ }
+
+ txMap, err = dag.getTransaction(tid_1)
+ if err != nil {
+ t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+ }
+
+ expTxMap = dagTxMap{oid_a: 3, oid_b: 3}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map from DAG storage for Tx ID %v in DAG file %s: %v instead of %v",
+ tid_1, dagfile, txMap, expTxMap)
+ }
+
+ txMap, ok = dag.txSet[tid_2]
+ if !ok {
+ t.Errorf("Transactions map for Tx ID %v not found in DAG file %s", tid_2, dagfile)
+ }
+
+ expTxMap = dagTxMap{oid_c: 2}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+ }
+
+ // End the 2nd transaction and re-verify the in-memory and in-DAG data.
+ if err := dag.addNodeTxEnd(tid_2); err != nil {
+ t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+ }
+
+ if _, ok = dag.txSet[tid_2]; ok {
+ t.Errorf("Transactions map for Tx ID %v still exists in DAG file %s", tid_2, dagfile)
+ }
+
+ txMap, err = dag.getTransaction(tid_2)
+ if err != nil {
+ t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+ }
+
+ expTxMap = dagTxMap{oid_c: 2}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+ }
+
+ if n := len(dag.txSet); n != 0 {
+ t.Errorf("Transaction sets in-memory: %d entries found, should be empty in DAG file %s", n, dagfile)
+ }
+
+ // Get the 3 new nodes from the DAG and verify their Tx IDs.
+ node, err := dag.getNode(oid_a, 3)
+ if err != nil {
+ t.Errorf("Cannot find object %d:3 in DAG file %s: %v", oid_a, dagfile, err)
+ }
+ if node.TxID != tid_1 {
+ t.Errorf("Invalid TxID for object %d:3 in DAG file %s: %v instead of %v", oid_a, dagfile, node.TxID, tid_1)
+ }
+ node, err = dag.getNode(oid_b, 3)
+ if err != nil {
+ t.Errorf("Cannot find object %d:3 in DAG file %s: %v", oid_b, dagfile, err)
+ }
+ if node.TxID != tid_1 {
+ t.Errorf("Invalid TxID for object %d:3 in DAG file %s: %v instead of %v", oid_b, dagfile, node.TxID, tid_1)
+ }
+ node, err = dag.getNode(oid_c, 2)
+ if err != nil {
+ t.Errorf("Cannot find object %d:2 in DAG file %s: %v", oid_c, dagfile, err)
+ }
+ if node.TxID != tid_2 {
+ t.Errorf("Invalid TxID for object %d:2 in DAG file %s: %v instead of %v", oid_c, dagfile, node.TxID, tid_2)
+ }
+
+ for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// TestPruningTransactions tests pruning DAG nodes grouped within transactions.
+func TestPruningTransactions(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-02.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ oid_a, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+ oid_b, err := strToObjID("67890")
+ if err != nil {
+ t.Fatal(err)
+ }
+ oid_c, err := strToObjID("222")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Mutate objects in 2 transactions then add non-transactional mutations
+ // to act as the pruning points. Before pruning the DAG is:
+ // a1 -- a2 -- (a3) --- a4
+ // b1 -- b2 -- (b3) -- (b4) -- b5
+ // c1 ---------------- (c2)
+ // Now by pruning at (a4, b5, c2), the new DAG should be:
+ // a4
+ // b5
+ // (c2)
+ // Transaction 1 (a3, b3) gets deleted, but transaction 2 (b4, c2) still
+ // has (c2) dangling waiting for a future pruning.
+ tid_1 := dag.addNodeTxStart()
+ if tid_1 == NoTxID {
+ t.Fatal("Cannot start 1st DAG addNode() transaction")
+ }
+ if err := dag.addNode(oid_a, 3, false, []storage.Version{2}, "logrec-a-03", tid_1); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+ }
+ if err := dag.addNode(oid_b, 3, false, []storage.Version{2}, "logrec-b-03", tid_1); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_1, dagfile, err)
+ }
+ if err := dag.addNodeTxEnd(tid_1); err != nil {
+ t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+ }
+
+ tid_2 := dag.addNodeTxStart()
+ if tid_2 == NoTxID {
+ t.Fatal("Cannot start 2nd DAG addNode() transaction")
+ }
+ if err := dag.addNode(oid_b, 4, false, []storage.Version{3}, "logrec-b-04", tid_2); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_2, dagfile, err)
+ }
+ if err := dag.addNode(oid_c, 2, false, []storage.Version{1}, "logrec-c-02", tid_2); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_c, tid_2, dagfile, err)
+ }
+ if err := dag.addNodeTxEnd(tid_2); err != nil {
+ t.Errorf("Cannot addNodeTxEnd() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+ }
+
+ if err := dag.addNode(oid_a, 4, false, []storage.Version{3}, "logrec-a-04", NoTxID); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_a, tid_1, dagfile, err)
+ }
+ if err := dag.addNode(oid_b, 5, false, []storage.Version{4}, "logrec-b-05", NoTxID); err != nil {
+ t.Errorf("Cannot addNode() on object %d and Tx ID %v in DAG file %s: %v", oid_b, tid_2, dagfile, err)
+ }
+
+ if err = dag.moveHead(oid_a, 4); err != nil {
+ t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_a, dagfile, err)
+ }
+ if err = dag.moveHead(oid_b, 5); err != nil {
+ t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_b, dagfile, err)
+ }
+ if err = dag.moveHead(oid_c, 2); err != nil {
+ t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_c, dagfile, err)
+ }
+
+ // Verify the transaction sets.
+ txMap, err := dag.getTransaction(tid_1)
+ if err != nil {
+ t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_1, dagfile, err)
+ }
+
+ expTxMap := dagTxMap{oid_a: 3, oid_b: 3}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map from DAG storage for Tx ID %v in DAG file %s: %v instead of %v",
+ tid_1, dagfile, txMap, expTxMap)
+ }
+
+ txMap, err = dag.getTransaction(tid_2)
+ if err != nil {
+ t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+ }
+
+ expTxMap = dagTxMap{oid_b: 4, oid_c: 2}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+ }
+
+ // Prune the 3 objects at their head nodes.
+ for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+ head, err := dag.getHead(oid)
+ if err != nil {
+ t.Errorf("Cannot getHead() on object %d in DAG file %s: %v", oid, dagfile, err)
+ }
+ err = dag.prune(oid, head, func(lr string) error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Cannot prune() on object %d in DAG file %s: %v", oid, dagfile, err)
+ }
+ }
+
+ if err = dag.pruneDone(); err != nil {
+ t.Errorf("pruneDone() failed in DAG file %s: %v", dagfile, err)
+ }
+
+ if n := len(dag.txGC); n != 0 {
+ t.Errorf("Transaction GC map not empty after pruneDone() in DAG file %s: %d", dagfile, n)
+ }
+
+ // Verify that Tx-1 was deleted and Tx-2 still has c2 in it.
+ txMap, err = dag.getTransaction(tid_1)
+ if err == nil {
+ t.Errorf("getTransaction() did not fail for Tx ID %v in DAG file %s: %v", tid_1, dagfile, txMap)
+ }
+
+ txMap, err = dag.getTransaction(tid_2)
+ if err != nil {
+ t.Errorf("Cannot getTransaction() for Tx ID %v in DAG file %s: %v", tid_2, dagfile, err)
+ }
+
+ expTxMap = dagTxMap{oid_c: 2}
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Invalid transaction map for Tx ID %v in DAG file %s: %v instead of %v", tid_2, dagfile, txMap, expTxMap)
+ }
+
+ // Add c3 as a new head and prune at that point. This should GC Tx-2.
+ if err := dag.addNode(oid_c, 3, false, []storage.Version{2}, "logrec-c-03", NoTxID); err != nil {
+ t.Errorf("Cannot addNode() on object %d in DAG file %s: %v", oid_c, dagfile, err)
+ }
+ if err = dag.moveHead(oid_c, 3); err != nil {
+ t.Errorf("Object %d cannot move head in DAG file %s: %v", oid_c, dagfile, err)
+ }
+
+ err = dag.prune(oid_c, 3, func(lr string) error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Cannot prune() on object %d in DAG file %s: %v", oid_c, dagfile, err)
+ }
+ if err = dag.pruneDone(); err != nil {
+ t.Errorf("pruneDone() #2 failed in DAG file %s: %v", dagfile, err)
+ }
+ if n := len(dag.txGC); n != 0 {
+ t.Errorf("Transaction GC map not empty after pruneDone() in DAG file %s: %d", dagfile, n)
+ }
+
+ txMap, err = dag.getTransaction(tid_2)
+ if err == nil {
+ t.Errorf("getTransaction() did not fail for Tx ID %v in DAG file %s: %v", tid_2, dagfile, txMap)
+ }
+
+ for _, oid := range []storage.ID{oid_a, oid_b, oid_c} {
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 930df89..7c22050 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -412,7 +412,7 @@
}
// Insert the new log record into dag.
- if err = l.s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey); err != nil {
+ if err = l.s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey, NoTxID); err != nil {
return err
}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 76aa7da..13b8e76 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -309,7 +309,7 @@
vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
switch rec.RecType {
case NodeRec:
- return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+ return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey, NoTxID)
case LinkRec:
return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
default:
@@ -617,7 +617,7 @@
// Add a new DAG node.
switch rec.RecType {
case NodeRec:
- err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+ err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey, NoTxID)
case LinkRec:
err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
default:
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 468c882..769aff8 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -48,7 +48,7 @@
if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
t.Errorf("GetLogRec didn't fail")
}
- if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey); err != nil {
+ if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey, NoTxID); err != nil {
t.Errorf("AddNode failed with err %v", err)
}
curRec, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers)
@@ -97,7 +97,7 @@
if err != nil {
t.Errorf("PutLogRec failed with err %v", err)
}
- if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey); err != nil {
+ if err = s.dag.addNode(objID, expRec.CurVers, false, expRec.Parents, logKey, NoTxID); err != nil {
t.Errorf("AddNode failed with err %v", err)
}
}
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 02cd561..82bff82 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -192,7 +192,7 @@
for _, cmd := range cmds {
switch cmd.cmd {
case addLocal:
- if err = dag.addNode(cmd.objID, cmd.version, false, cmd.parents, cmd.logrec); err != nil {
+ if err = dag.addNode(cmd.objID, cmd.version, false, cmd.parents, cmd.logrec, NoTxID); err != nil {
return fmt.Errorf("cannot add local node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
}
if err := dag.moveHead(cmd.objID, cmd.version); err != nil {
@@ -201,7 +201,7 @@
dag.flush()
case addRemote:
- if err = dag.addNode(cmd.objID, cmd.version, true, cmd.parents, cmd.logrec); err != nil {
+ if err = dag.addNode(cmd.objID, cmd.version, true, cmd.parents, cmd.logrec, NoTxID); err != nil {
return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
}
dag.flush()
diff --git a/runtimes/google/vsync/testdata/local-init-02.sync b/runtimes/google/vsync/testdata/local-init-02.sync
new file mode 100644
index 0000000..94307d9
--- /dev/null
+++ b/runtimes/google/vsync/testdata/local-init-02.sync
@@ -0,0 +1,10 @@
+# Create DAGs for 3 objects locally.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>
+
+addl|12345|1|||logrec-a-01
+addl|12345|2|1||logrec-a-02
+
+addl|67890|1|||logrec-b-01
+addl|67890|2|1||logrec-b-02
+
+addl|222|1|||logrec-c-01
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 5699fbc..3507aad 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -163,7 +163,7 @@
if err != nil {
return err
}
- if err := s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey); err != nil {
+ if err := s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Parents, logKey, NoTxID); err != nil {
return err
}
if err := s.dag.moveHead(rec.ObjID, rec.CurVers); err != nil {