veyron/runtimes/google/vsync:
* Track delete mutations from store while maintaining unique version
numbers in the DAG for delete records.
* Update testfiles and tests to not use NoVersion (i.e. 0) as a valid
version number.
Change-Id: I668f933679feb52e464c61f8c61ac0e2096d8674
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0a50cdf..f4d4a42 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -1214,11 +1214,11 @@
// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
// linked nodes (conflict resolution by selecting an existing version) on top of
// a local initial state without conflict. An object is created locally and
-// updated twice (v0 -> v1 -> v2). Another device has learned about v0, created
-// (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by selecting
-// v1 over v3. Now it sends that new info (v3 and the v1/v3 link) back to the
-// original (local) device. Instead of a v2/v3 conflict, the device sees that
-// v1 was chosen over v3 and resolves it as a no-conflict case.
+// updated twice (v1 -> v2 -> v3). Another device has learned about v1, created
+// (v1 -> v4), then learned about (v1 -> v2) and resolved that conflict by selecting
+// v2 over v4. Now it sends that new info (v4 and the v2/v4 link) back to the
+// original (local) device. Instead of a v3/v4 conflict, the device sees that
+// v2 was chosen over v4 and resolves it as a no-conflict case.
func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1228,27 +1228,27 @@
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
- if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ if err = dagReplayCommands(dag, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-noconf-link-00.log.sync"); err != nil {
t.Fatal(err)
}
- // The head must not have moved (i.e. still at v2) and the parent map
+ // The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
- if head, e := dag.getHead(oid); e != nil || head != 2 {
+ if head, e := dag.getHead(oid); e != nil || head != 3 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
- exp := map[storage.Version][]storage.Version{0: nil, 1: {0, 3}, 2: {1}, 3: {0}}
+ exp := map[storage.Version][]storage.Version{1: nil, 2: {1, 4}, 3: {2}, 4: {1}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
@@ -1257,19 +1257,19 @@
// Verify the grafting of remote nodes.
newHeads, grafts := dag.getGraftNodes(oid)
- expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+ expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
if !reflect.DeepEqual(newHeads, expNewHeads) {
t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
- expgrafts := map[storage.Version]uint64{0: 0, 3: 1}
+ expgrafts := map[storage.Version]uint64{1: 0, 4: 1}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
- if !(!isConflict && newHead == 2 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ if !(!isConflict && newHead == 3 && oldHead == 3 && ancestor == storage.NoVersion && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -1290,10 +1290,10 @@
// TestRemoteLinkedConflict tests sync of remote updates that contain linked
// nodes (conflict resolution by selecting an existing version) on top of a local
// initial state triggering a local conflict. An object is created locally and
-// updated twice (v0 -> v1 -> v2). Another device has along the way learned about v0,
-// created (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by
-// selecting v3 over v1. Now it sends that new info (v3 and the v3/v1 link) back
-// to the original (local) device. The device sees a v2/v3 conflict.
+// updated twice (v1 -> v2 -> v3). Another device has along the way learned about v1,
+// created (v1 -> v4), then learned about (v1 -> v2) and resolved that conflict by
+// selecting v4 over v2. Now it sends that new info (v4 and the v4/v2 link) back
+// to the original (local) device. The device sees a v3/v4 conflict.
func TestRemoteLinkedConflict(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1303,7 +1303,7 @@
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
- if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ if err = dagReplayCommands(dag, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-conf-link.log.sync"); err != nil {
@@ -1317,13 +1317,13 @@
t.Fatal(err)
}
- if head, e := dag.getHead(oid); e != nil || head != 2 {
+ if head, e := dag.getHead(oid); e != nil || head != 3 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
- exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 1}}
+ exp := map[storage.Version][]storage.Version{1: nil, 2: {1}, 3: {2}, 4: {1, 2}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
@@ -1332,19 +1332,19 @@
// Verify the grafting of remote nodes.
newHeads, grafts := dag.getGraftNodes(oid)
- expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 3: struct{}{}}
+ expNewHeads := map[storage.Version]struct{}{3: struct{}{}, 4: struct{}{}}
if !reflect.DeepEqual(newHeads, expNewHeads) {
t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
- expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
+ expgrafts := map[storage.Version]uint64{1: 0, 2: 1}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
- // There should be no conflict.
+ // There should be a conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
- if !(isConflict && newHead == 3 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
+ if !(isConflict && newHead == 4 && oldHead == 3 && ancestor == 2 && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -1365,11 +1365,11 @@
// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
// linked nodes (conflict resolution by selecting an existing version) on top of
// a local initial state without conflict, but moves the head node to a new one.
-// An object is created locally and updated twice (v0 -> v1 -> v2). Another device
-// has along the way learned about v0, created (v0 -> v3), then learned about
-// (v0 -> v1 -> v2) and resolved that conflict by selecting v3 over v2. Now it
-// sends that new info (v3 and the v3/v2 link) back to the original (local) device.
-// The device sees that the new head v3 is "derived" from v2 thus no conflict.
+// An object is created locally and updated twice (v1 -> v2 -> v3). Another device
+// has along the way learned about v1, created (v1 -> v4), then learned about
+// (v1 -> v2 -> v3) and resolved that conflict by selecting v4 over v3. Now it
+// sends that new info (v4 and the v4/v3 link) back to the original (local) device.
+// The device sees that the new head v4 is "derived" from v3 thus no conflict.
func TestRemoteLinkedConflictNewHead(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1379,7 +1379,7 @@
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
- if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ if err = dagReplayCommands(dag, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-noconf-link-01.log.sync"); err != nil {
@@ -1393,13 +1393,13 @@
t.Fatal(err)
}
- if head, e := dag.getHead(oid); e != nil || head != 2 {
+ if head, e := dag.getHead(oid); e != nil || head != 3 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
- exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 2}}
+ exp := map[storage.Version][]storage.Version{1: nil, 2: {1}, 3: {2}, 4: {1, 3}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
@@ -1408,19 +1408,19 @@
// Verify the grafting of remote nodes.
newHeads, grafts := dag.getGraftNodes(oid)
- expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
+ expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
if !reflect.DeepEqual(newHeads, expNewHeads) {
t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
- expgrafts := map[storage.Version]uint64{0: 0, 2: 2}
+ expgrafts := map[storage.Version]uint64{1: 0, 3: 2}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
- if !(!isConflict && newHead == 3 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ if !(!isConflict && newHead == 4 && oldHead == 3 && ancestor == storage.NoVersion && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -1442,12 +1442,12 @@
// contain linked nodes (conflict resolution by selecting an existing version)
// on top of a local initial state without conflict, but moves the head node
// to a new one that overtook the linked node.
-// An object is created locally and updated twice (v0 -> v1 -> v2). Another
-// device has along the way learned about v0, created (v0 -> v3), then learned
-// about (v0 -> v1 -> v2) and resolved that conflict by selecting v2 over v3.
-// Then it creates a new update v4 from v2 (v2 -> v4). Now it sends that new
-// info (v3, the v2/v3 link, and v4) back to the original (local) device.
-// The device sees that the new head v4 is "derived" from v2 thus no conflict.
+// An object is created locally and updated twice (v1 -> v2 -> v3). Another
+// device has along the way learned about v1, created (v1 -> v4), then learned
+// about (v1 -> v2 -> v3) and resolved that conflict by selecting v3 over v4.
+// Then it creates a new update v5 from v3 (v3 -> v5). Now it sends that new
+// info (v4, the v3/v4 link, and v5) back to the original (local) device.
+// The device sees that the new head v5 is "derived" from v3 thus no conflict.
func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
@@ -1457,7 +1457,7 @@
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
- if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ if err = dagReplayCommands(dag, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-noconf-link-02.log.sync"); err != nil {
@@ -1471,13 +1471,13 @@
t.Fatal(err)
}
- if head, e := dag.getHead(oid); e != nil || head != 2 {
+ if head, e := dag.getHead(oid); e != nil || head != 3 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
- exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1, 3}, 3: {0}, 4: {2}}
+ exp := map[storage.Version][]storage.Version{1: nil, 2: {1}, 3: {2, 4}, 4: {1}, 5: {3}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
@@ -1486,19 +1486,19 @@
// Verify the grafting of remote nodes.
newHeads, grafts := dag.getGraftNodes(oid)
- expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
+ expNewHeads := map[storage.Version]struct{}{5: struct{}{}}
if !reflect.DeepEqual(newHeads, expNewHeads) {
t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
- expgrafts := map[storage.Version]uint64{0: 0, 2: 2, 3: 1}
+ expgrafts := map[storage.Version]uint64{1: 0, 3: 2, 4: 1}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
- if !(!isConflict && newHead == 4 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ if !(!isConflict && newHead == 5 && oldHead == 3 && ancestor == storage.NoVersion && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -1522,7 +1522,7 @@
t.Fatal(err)
}
- if head, e := dag.getHead(oid); e != nil || head != 4 {
+ if head, e := dag.getHead(oid); e != nil || head != 5 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
@@ -1537,7 +1537,7 @@
}
isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
- if !(!isConflict && newHead == 4 && oldHead == 4 && ancestor == 0 && errConflict == nil) {
+ if !(!isConflict && newHead == 5 && oldHead == 5 && ancestor == storage.NoVersion && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index f7c46f2..ae539a7 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -386,19 +386,42 @@
}
// processWatchRecord processes new object versions obtained from the local store.
-func (l *iLog) processWatchRecord(objID storage.ID, vers storage.Version, par []storage.Version, val *LogValue, txID TxID) error {
+func (l *iLog) processWatchRecord(objID storage.ID, vers, parent storage.Version, val *LogValue, txID TxID) error {
if l.db == nil {
return errInvalidLog
}
vlog.VI(2).Infof("processWatchRecord:: adding object %v %v", objID, vers)
- // Check if the object version already exists in the DAG. if so return.
- if l.s.dag.hasNode(objID, vers) {
- return nil
+
+ if vers != storage.NoVersion {
+ // Check if the object's vers already exists in the DAG.
+ if l.s.dag.hasNode(objID, vers) {
+ return nil
+ }
+ } else {
+ // Check if the parent version has a deleted
+ // descendant already in the DAG.
+ if l.s.dag.hasDeletedDescendant(objID, parent) {
+ return nil
+ }
+ }
+
+ var pars []storage.Version
+ if parent != storage.NoVersion {
+ pars = []storage.Version{parent}
+ }
+
+ // If the current version is a deletion, generate a new version number.
+ if val.Delete {
+ if vers != storage.NoVersion {
+ return fmt.Errorf("deleted vers is %v", vers)
+ }
+ vers = storage.NewVersion()
+ val.Mutation.Version = vers
}
// Create a log record from Watch's Change Record.
- rec, err := l.createLocalLogRec(objID, vers, par, val)
+ rec, err := l.createLocalLogRec(objID, vers, pars, val)
if err != nil {
return err
}
diff --git a/runtimes/google/vsync/ilog_test.go b/runtimes/google/vsync/ilog_test.go
index b6ae29a..f027c90 100644
--- a/runtimes/google/vsync/ilog_test.go
+++ b/runtimes/google/vsync/ilog_test.go
@@ -133,7 +133,7 @@
t.Errorf("CreateLocalGeneration did not fail on a closed log: %v", err)
}
- err = log.processWatchRecord(storage.NewID(), 2, []storage.Version{0, 1}, &LogValue{}, NoTxID)
+ err = log.processWatchRecord(storage.NewID(), 2, storage.Version(999), &LogValue{}, NoTxID)
if err == nil || err != errInvalidLog {
t.Errorf("ProcessWatchRecord did not fail on a closed log: %v", err)
}
@@ -774,7 +774,7 @@
}
// TestProcessWatchRecord tests that local updates are correctly handled.
-// Commands are in file testdata/local-init-00.sync.
+// Commands are in file testdata/local-init-00.log.sync.
func TestProcessWatchRecord(t *testing.T) {
logfile := getFileName()
defer os.Remove(logfile)
@@ -792,7 +792,7 @@
t.Fatalf("Cannot open new log file %s, err %v", logfile, err)
}
- if _, err = logReplayCommands(log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
@@ -819,7 +819,7 @@
}
// Verify DAG state.
- if head, err := log.s.dag.getHead(objid); err != nil || head != 2 {
+ if head, err := log.s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index d6c786e..c3d8f4f 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -544,11 +544,24 @@
// to the store are needed. If the remote version is
// picked, we put it in the store.
if st.resolvVal.Mutation.Version != st.oldHead {
- // Append to mutations.
st.resolvVal.Mutation.PriorVersion = st.oldHead
- vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
- st.resolvVal.Mutation, obj)
- m = append(m, st.resolvVal.Mutation)
+
+ // Convert resolvVal.Mutation into a mutation for the Store.
+ stMutation, err := i.storeMutation(obj, st.resolvVal)
+ if err != nil {
+ return err
+ }
+
+ vlog.VI(2).Infof("updateStoreAndSync:: Try to append mutation %v (%v) for obj %v (nh %v, oh %v)",
+ st.resolvVal.Mutation, stMutation, obj, st.newHead, st.oldHead)
+
+ // Append to mutations, skipping a delete following a delete mutation.
+ if stMutation.Version != storage.NoVersion ||
+ stMutation.PriorVersion != storage.NoVersion {
+ vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
+ stMutation, obj)
+ m = append(m, stMutation)
+ }
}
}
@@ -590,6 +603,41 @@
return nil
}
+// storeMutation converts a resolved mutation generated by syncd to
+// one that can be sent to the store. To send to the store, it
+// converts the version numbers corresponding to object deletions to
+// storage.NoVersion when required.
+func (i *syncInitiator) storeMutation(obj storage.ID, resolvVal *LogValue) (raw.Mutation, error) {
+ curDelete := resolvVal.Delete
+ priorDelete := false
+ if resolvVal.Mutation.PriorVersion != storage.NoVersion {
+ oldRec, err := i.getLogRec(obj, resolvVal.Mutation.PriorVersion)
+ if err != nil {
+ return raw.Mutation{}, err
+ }
+ priorDelete = oldRec.Value.Delete
+ }
+
+ // Current version and prior versions are not deletes.
+ if !curDelete && !priorDelete {
+ return resolvVal.Mutation, nil
+ }
+
+ // Creating a new copy of the mutation to adjust version
+ // numbers when handling deletions.
+ stMutation := resolvVal.Mutation
+ // Adjust the current version if this a deletion.
+ if curDelete {
+ stMutation.Version = storage.NoVersion
+ }
+ // Adjust the prior version if it is a deletion.
+ if priorDelete {
+ stMutation.PriorVersion = storage.NoVersion
+ }
+
+ return stMutation, nil
+}
+
// getLogRec returns the log record corresponding to a given object and its version.
func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
logKey, err := i.syncd.dag.getLogrec(obj, vers)
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 987d4d7..d9db4bb 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -112,7 +112,8 @@
}
// TODO(hpucha): Add more tests around retrying failed puts in the next pass (processUpdatedObjects).
-// TestLogStreamRemoteOnly tests processing of a remote log stream.
+// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are in file
+// testdata/remote-init-00.log.sync.
func TestLogStreamRemoteOnly(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -169,7 +170,7 @@
t.Errorf("Data mismatch in log record %v", curRec)
}
// Verify DAG state.
- if _, err := s.dag.getNode(objid, storage.Version(i)); err != nil {
+ if _, err := s.dag.getNode(objid, storage.Version(i+1)); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
}
}
@@ -183,7 +184,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ if st.newHead != 3 || st.oldHead != storage.NoVersion {
t.Errorf("Conflict detection didn't succeed %v", st)
}
if err := s.hdlInitiator.resolveConflicts(); err != nil {
@@ -192,14 +193,14 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -265,7 +266,7 @@
t.Errorf("Data mismatch in log record %v", curRec)
}
// Verify DAG state.
- if _, err := s.dag.getNode(objid, storage.Version(i)); err != nil {
+ if _, err := s.dag.getNode(objid, storage.Version(i+1)); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
}
}
@@ -280,7 +281,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ if st.newHead != 3 || st.oldHead != storage.NoVersion {
t.Errorf("Conflict detection didn't succeed %v", st)
}
if err := s.hdlInitiator.resolveConflicts(); err != nil {
@@ -289,19 +290,21 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
-// TestLogStreamRemoteWithTx tests processing of a remote log stream that contains transactions.
+// TestLogStreamRemoteWithTx tests processing of a remote log stream
+// that contains transactions. Commands are in file
+// testdata/remote-init-02.log.sync.
func TestLogStreamRemoteWithTx(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -335,13 +338,13 @@
// Verify transaction state.
objs := []string{"123", "456", "789"}
objids := make([]storage.ID, 3)
- maxVers := []storage.Version{2, 1, 3}
+ maxVers := []storage.Version{3, 2, 4}
txVers := map[string]struct{}{
- "123-1": struct{}{},
"123-2": struct{}{},
- "456-0": struct{}{},
+ "123-3": struct{}{},
"456-1": struct{}{},
- "789-0": struct{}{},
+ "456-2": struct{}{},
+ "789-1": struct{}{},
}
for pos, o := range objs {
var err error
@@ -349,7 +352,7 @@
if err != nil {
t.Errorf("Could not create objid %v", err)
}
- for i := storage.Version(0); i <= storage.Version(maxVers[pos]); i++ {
+ for i := storage.Version(1); i <= storage.Version(maxVers[pos]); i++ {
node, err := s.dag.getNode(objids[pos], i)
if err != nil {
t.Errorf("cannot find dag node for object %d %v: %s", objids[pos], i, err)
@@ -366,7 +369,7 @@
}
// Verify transaction state for the first transaction.
- node, err := s.dag.getNode(objids[0], storage.Version(1))
+ node, err := s.dag.getNode(objids[0], storage.Version(2))
if err != nil {
t.Errorf("cannot find dag node for object %d v1: %s", objids[0], err)
}
@@ -378,9 +381,9 @@
t.Errorf("cannot find transaction for id %v: %s", node.TxID, err)
}
expTxMap := dagTxMap{
- objids[0]: storage.Version(1),
- objids[1]: storage.Version(0),
- objids[2]: storage.Version(0),
+ objids[0]: storage.Version(2),
+ objids[1]: storage.Version(1),
+ objids[2]: storage.Version(1),
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
@@ -388,7 +391,7 @@
}
// Verify transaction state for the second transaction.
- node, err = s.dag.getNode(objids[0], storage.Version(2))
+ node, err = s.dag.getNode(objids[0], storage.Version(3))
if err != nil {
t.Errorf("cannot find dag node for object %d v1: %s", objids[0], err)
}
@@ -400,8 +403,8 @@
t.Errorf("cannot find transaction for id %v: %s", node.TxID, err)
}
expTxMap = dagTxMap{
- objids[0]: storage.Version(2),
- objids[1]: storage.Version(1),
+ objids[0]: storage.Version(3),
+ objids[1]: storage.Version(2),
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
@@ -422,10 +425,247 @@
}
}
+// TestLogStreamRemoteWithDel tests processing of a remote log stream
+// that contains object deletion. Commands are in file
+// testdata/remote-init-03.log.sync.
+func TestLogStreamRemoteWithDel(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ stream, err := createReplayStream("remote-init-03.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+
+ // Check minGens.
+ expVec := GenVector{"VeyronPhone": 1}
+ if !reflect.DeepEqual(expVec, minGens) {
+ t.Errorf("Data mismatch for minGens: %v instead of %v",
+ minGens, expVec)
+ }
+
+ // Check generation metadata.
+ curVal, err := s.log.getGenMetadata("VeyronPhone", 1)
+ if err != nil || curVal == nil {
+ t.Fatalf("GetGenMetadata() can not find object in log file err %v", err)
+ }
+ expVal := &genMetadata{Pos: 0, Count: 3, MaxLSN: 2}
+ if !reflect.DeepEqual(expVal, curVal) {
+ t.Errorf("Data mismatch for generation metadata: %v instead of %v",
+ curVal, expVal)
+ }
+
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ // Check all log records.
+ for i := LSN(0); i < 3; i++ {
+ curRec, err := s.log.getLogRec("VeyronPhone", GenID(1), i)
+ if err != nil || curRec == nil {
+ t.Fatalf("GetLogRec() can not find object %d in log file err %v",
+ i, err)
+ }
+ if curRec.ObjID != objid {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
+ // Verify DAG state.
+ if _, err := s.dag.getNode(objid, storage.Version(i+1)); err != nil {
+ t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
+ }
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 3 || st.oldHead != storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 3 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ m, err := s.hdlInitiator.storeMutation(objid, st.resolvVal)
+ if err != nil {
+ t.Errorf("Could not translate mutation %v", err)
+ }
+ if m.Version != storage.NoVersion || m.PriorVersion != storage.NoVersion {
+ t.Errorf("Data mismatch in mutation translation %v", m)
+ }
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+ node, err := s.dag.getNode(objid, storage.Version(3))
+ if err != nil {
+ t.Errorf("cannot find dag node for object %d v3: %s", objid, err)
+ }
+ if !node.Deleted {
+ t.Errorf("deleted node not found for object %d v3", objid)
+ }
+ if !s.dag.hasDeletedDescendant(objid, storage.Version(2)) {
+ t.Errorf("link to deleted node not found for object %d from v2", objid)
+ }
+ if !s.dag.hasDeletedDescendant(objid, storage.Version(1)) {
+ t.Errorf("link to deleted node not found for object %d from v1", objid)
+ }
+}
+
+// TestLogStreamDel2Objs tests that a local and a remote log stream
+// can be correctly applied when there is local and a remote delete on
+// 2 different objects. Commands are in files
+// testdata/<local-init-01.log.sync,remote-2obj-del.log.sync>.
+func TestLogStreamDel2Objs(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-01.log.sync"); err != nil {
+ t.Error(err)
+ }
+
+ stream, err := createReplayStream("remote-2obj-del.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+
+ // Check minGens.
+ expVec := GenVector{"VeyronPhone": 1}
+ if !reflect.DeepEqual(expVec, minGens) {
+ t.Errorf("Data mismatch for minGens: %v instead of %v",
+ minGens, expVec)
+ }
+ // Check generation metadata.
+ curVal, err := s.log.getGenMetadata("VeyronPhone", 1)
+ if err != nil || curVal == nil {
+ t.Fatalf("GetGenMetadata() can not find object in log file for VeyronPhone err %v", err)
+ }
+ expVal := &genMetadata{Pos: 0, Count: 4, MaxLSN: 3}
+ if !reflect.DeepEqual(expVal, curVal) {
+ t.Errorf("Data mismatch for generation metadata: %v instead of %v",
+ curVal, expVal)
+ }
+
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ if len(s.hdlInitiator.updObjects) != 2 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+
+ objs := []string{"123", "456"}
+ newHeads := []storage.Version{6, 2}
+ conflicts := []bool{false, true}
+ for pos, o := range objs {
+ objid, err := strToObjID(o)
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+
+ st := s.hdlInitiator.updObjects[objid]
+
+ if st.isConflict != conflicts[pos] {
+ t.Errorf("Detected a wrong conflict %v", st)
+ }
+ if st.newHead != newHeads[pos] {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if pos == 1 {
+ // Force a blessing to remote version for testing.
+ st.resolvVal.Mutation.Version = st.newHead
+ st.resolvVal.Mutation.PriorVersion = st.oldHead
+ st.resolvVal.Delete = false
+ }
+ m, err := s.hdlInitiator.storeMutation(objid, st.resolvVal)
+ if err != nil {
+ t.Errorf("Could not translate mutation %v", err)
+ }
+
+ if pos == 0 {
+ if st.oldHead != 3 {
+ t.Errorf("Conflict detection didn't succeed for obj123 %v", st)
+ }
+ if m.Version != storage.NoVersion || m.PriorVersion != 3 {
+ t.Errorf("Data mismatch in mutation translation for obj123 %v", m)
+ }
+ // Test echo back from watch for these mutations.
+ if err := s.log.processWatchRecord(objid, 0, storage.Version(3), &LogValue{}, NoTxID); err != nil {
+ t.Errorf("Echo processing from watch failed %v", err)
+ }
+ }
+
+ if pos == 1 {
+ if st.oldHead == storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed for obj456 %v", st)
+ }
+ if m.Version != 2 || m.PriorVersion != storage.NoVersion {
+ t.Errorf("Data mismatch in mutation translation for obj456 %v", m)
+ }
+ // Test echo back from watch for these mutations.
+ if err := s.log.processWatchRecord(objid, storage.Version(2), 0, &LogValue{}, NoTxID); err != nil {
+ t.Errorf("Echo processing from watch failed %v", err)
+ }
+ }
+ }
+
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 6 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+}
+
// TestLogStreamNoConflict tests that a local and a remote log stream
// can be correctly applied (when there are no conflicts). Commands
// are in files
-// testdata/<local-init-00.sync,remote-noconf-00.log.sync>.
+// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
func TestLogStreamNoConflict(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -440,7 +680,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
@@ -478,7 +718,7 @@
}
// Check all log records.
for _, devid := range []DeviceID{"VeyronPhone", "VeyronTab"} {
- var v storage.Version
+ v := storage.Version(1)
for i := LSN(0); i < 3; i++ {
curRec, err := s.log.getLogRec(devid, GenID(1), i)
if err != nil || curRec == nil {
@@ -505,7 +745,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 5 || st.oldHead != 2 {
+ if st.newHead != 6 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
if err := s.hdlInitiator.resolveConflicts(); err != nil {
@@ -514,21 +754,21 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ if st.resolvVal.Mutation.PriorVersion != 3 || st.resolvVal.Mutation.Version != 6 {
t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 6 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
// TestLogStreamConflict tests that a local and a remote log stream
// can be correctly applied (when there are conflicts). Commands are
-// in files testdata/<local-init-00.sync,remote-conf-00.log.sync>.
+// in files testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
func TestLogStreamConflict(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -544,7 +784,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
@@ -592,7 +832,7 @@
lcount := []LSN{3, 4}
// Check all log records.
for index, devid := range []DeviceID{"VeyronPhone", "VeyronTab"} {
- var v storage.Version
+ v := storage.Version(1)
for i := LSN(0); i < lcount[index]; i++ {
curRec, err := s.log.getLogRec(devid, GenID(1), i)
if err != nil || curRec == nil {
@@ -619,10 +859,10 @@
if !st.isConflict {
t.Errorf("Didn't detect a conflict %v", st)
}
- if st.newHead != 5 || st.oldHead != 2 {
+ if st.newHead != 6 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
- if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ if st.resolvVal.Mutation.PriorVersion != 3 || st.resolvVal.Mutation.Version != 6 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
@@ -630,14 +870,14 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 6 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
// TestMultipleLogStream tests that a local and 2 remote log streams
// can be correctly applied (when there are conflicts). Commands are
-// in file testdata/<local-init-00.sync,remote-conf-01.log.sync>.
+// in file testdata/<local-init-00.log.sync,remote-conf-01.log.sync>.
func TestMultipleLogStream(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -653,7 +893,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
@@ -714,7 +954,7 @@
// Check all log records.
lcount := []LSN{2, 4, 1}
for index, devid := range []DeviceID{"VeyronPhone", "VeyronTab", "VeyronLaptop"} {
- var v storage.Version
+ v := storage.Version(1)
for i := LSN(0); i < lcount[index]; i++ {
curRec, err := s.log.getLogRec(devid, GenID(1), i)
if err != nil || curRec == nil {
@@ -741,10 +981,10 @@
if !st.isConflict {
t.Errorf("Didn't detect a conflict %v", st)
}
- if st.newHead != 5 || st.oldHead != 2 {
+ if st.newHead != 6 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
- if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ if st.resolvVal.Mutation.PriorVersion != 3 || st.resolvVal.Mutation.Version != 6 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
@@ -752,7 +992,7 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 6 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -761,7 +1001,7 @@
// record stream can be correctly applied, when the conflict is
// resolved by a blessing. In this test, local head of the object is
// unchanged at the end of replay. Commands are in files
-// testdata/<local-init-00.sync,remote-noconf-link-00.log.sync>.
+// testdata/<local-init-00.log.sync,remote-noconf-link-00.log.sync>.
func TestInitiatorBlessNoConf0(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -776,7 +1016,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
stream, err := createReplayStream("remote-noconf-link-00.log.sync")
@@ -803,7 +1043,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 2 || st.oldHead != 2 {
+ if st.newHead != 3 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
@@ -813,7 +1053,7 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 2 {
+ if st.resolvVal.Mutation.Version != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// No new log records should be added.
@@ -821,7 +1061,7 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -830,7 +1070,7 @@
// record stream can be correctly applied, when the conflict is
// resolved by a blessing. In this test, local head of the object is
// updated at the end of the replay. Commands are in files
-// testdata/<local-init-00.sync,remote-noconf-link-01.log.sync>.
+// testdata/<local-init-00.log.sync,remote-noconf-link-01.log.sync>.
func TestInitiatorBlessNoConf1(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -845,7 +1085,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
stream, err := createReplayStream("remote-noconf-link-01.log.sync")
@@ -872,7 +1112,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 3 || st.oldHead != 2 {
+ if st.newHead != 4 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
@@ -882,7 +1122,7 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// No new log records should be added.
@@ -890,7 +1130,7 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -901,7 +1141,7 @@
// updated at the end of the first replay. In the second replay, a
// conflict resolved locally is rediscovered since it was also
// resolved remotely. Commands are in files
-// testdata/<local-init-00.sync,remote-noconf-link-02.log.sync,
+// testdata/<local-init-00.log.sync,remote-noconf-link-02.log.sync,
// remote-noconf-link-repeat.log.sync>.
func TestInitiatorBlessNoConf2(t *testing.T) {
dir, err := createTempDir()
@@ -917,7 +1157,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
stream, err := createReplayStream("remote-noconf-link-02.log.sync")
@@ -944,7 +1184,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 4 || st.oldHead != 2 {
+ if st.newHead != 5 || st.oldHead != 3 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
@@ -954,7 +1194,7 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{"VeyronTab": 0}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 2 {
+ if st.resolvVal.Mutation.Version != 5 || st.resolvVal.Mutation.PriorVersion != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// No new log records should be added.
@@ -962,7 +1202,7 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
@@ -986,7 +1226,7 @@
if st.isConflict {
t.Errorf("Detected a conflict %v", st)
}
- if st.newHead != 4 || st.oldHead != 4 {
+ if st.newHead != 5 || st.oldHead != 5 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
@@ -996,7 +1236,7 @@
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronLaptop"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 4 {
+ if st.resolvVal.Mutation.Version != 5 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// No new log records should be added.
@@ -1004,7 +1244,7 @@
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -1012,7 +1252,7 @@
// TestInitiatorBlessConf tests that a local and a remote log record
// stream can be correctly applied, when the conflict is resolved by a
// blessing. Commands are in files
-// testdata/<local-init-00.sync,remote-conf-link.log.sync>.
+// testdata/<local-init-00.log.sync,remote-conf-link.log.sync>.
func TestInitiatorBlessConf(t *testing.T) {
dir, err := createTempDir()
if err != nil {
@@ -1027,7 +1267,7 @@
defer s.Close()
defer os.RemoveAll(dir)
- if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ if _, err = logReplayCommands(s.log, "local-init-00.log.sync"); err != nil {
t.Error(err)
}
stream, err := createReplayStream("remote-conf-link.log.sync")
@@ -1054,21 +1294,21 @@
if !st.isConflict {
t.Errorf("Didn't detect a conflict %v", st)
}
- if st.newHead != 3 || st.oldHead != 2 || st.ancestor != 1 {
+ if st.newHead != 4 || st.oldHead != 3 || st.ancestor != 2 {
t.Errorf("Conflict detection didn't succeed %v", st)
}
if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 3 {
+ if st.resolvVal.Mutation.Version != 4 {
t.Errorf("Mutation generation is not accurate %v", st)
}
if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 3 {
t.Errorf("Mutation generation is not accurate %v", st)
}
// New log records should be added.
@@ -1084,7 +1324,7 @@
t.Errorf("Data mismatch in log record %v", curRec)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
diff --git a/runtimes/google/vsync/testdata/local-init-00.log.sync b/runtimes/google/vsync/testdata/local-init-00.log.sync
new file mode 100644
index 0000000..3eb51de
--- /dev/null
+++ b/runtimes/google/vsync/testdata/local-init-00.log.sync
@@ -0,0 +1,6 @@
+# Create an object locally and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
+
+addl|12345|1|||logrec-00|false|false
+addl|12345|2|1||logrec-01|false|false
+addl|12345|3|2||logrec-02|false|false
diff --git a/runtimes/google/vsync/testdata/local-init-01.log.sync b/runtimes/google/vsync/testdata/local-init-01.log.sync
new file mode 100644
index 0000000..578f795
--- /dev/null
+++ b/runtimes/google/vsync/testdata/local-init-01.log.sync
@@ -0,0 +1,9 @@
+# Create objects locally and update one and delete another.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
+
+addl|123|1|||logrec-00|false|false
+addl|123|2|1||logrec-01|false|false
+addl|123|3|2||logrec-02|false|false
+
+addl|456|1|||logrec-00|false|false
+addl|456|0|1||logrec-00|false|true
\ No newline at end of file
diff --git a/runtimes/google/vsync/testdata/remote-2obj-del.log.sync b/runtimes/google/vsync/testdata/remote-2obj-del.log.sync
new file mode 100644
index 0000000..403da95
--- /dev/null
+++ b/runtimes/google/vsync/testdata/remote-2obj-del.log.sync
@@ -0,0 +1,7 @@
+# Update one object and delete another object remotely.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
+
+addr|123|4|3||VeyronPhone:1:0|false|false
+addr|123|5|4||VeyronPhone:1:1|false|false
+addr|123|6|5||VeyronPhone:1:2|false|true
+addr|456|2|1||VeyronPhone:1:3|false|false
\ No newline at end of file
diff --git a/runtimes/google/vsync/testdata/remote-conf-00.log.sync b/runtimes/google/vsync/testdata/remote-conf-00.log.sync
index 994a66f..a391c14 100644
--- a/runtimes/google/vsync/testdata/remote-conf-00.log.sync
+++ b/runtimes/google/vsync/testdata/remote-conf-00.log.sync
@@ -1,8 +1,8 @@
# Update an object remotely three times triggering one conflict after
-# it was created locally up to v2 (i.e. assume the remote sync received
-# it from the local sync at v1, then updated separately).
+# it was created locally up to v3 (i.e. assume the remote sync received
+# it from the local sync at v2, then updated separately).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|1||VeyronPhone:1:0|false|false
-addr|12345|4|3||VeyronPhone:1:1|false|false
-addr|12345|5|4||VeyronPhone:1:2|false|false
+addr|12345|4|2||VeyronPhone:1:0|false|false
+addr|12345|5|4||VeyronPhone:1:1|false|false
+addr|12345|6|5||VeyronPhone:1:2|false|false
diff --git a/runtimes/google/vsync/testdata/remote-conf-01.log.sync b/runtimes/google/vsync/testdata/remote-conf-01.log.sync
index 4fabf55..63117bf 100644
--- a/runtimes/google/vsync/testdata/remote-conf-01.log.sync
+++ b/runtimes/google/vsync/testdata/remote-conf-01.log.sync
@@ -1,10 +1,11 @@
# Update an object remotely three times triggering a conflict with
-# 2 graft points: v0 and v2. This assumes that the remote sync got
-# v0, made its own conflicting v3 that it resolved into v4 (against v1)
-# then made a v5 change. When the local sync gets back this info it
-# sees 2 graft points: v0-v3 and v1-v4.
+# 2 graft points: v1 and v4. This assumes that the remote sync got
+# v1, made its own conflicting v4 that it resolved into v5 (against v2)
+# then made a v6 change. When the local sync gets back this info it
+# sees 2 graft points: v1-v4 and v2-v5.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|0||VeyronLaptop:1:0|false|false
-addr|12345|4|1|3|VeyronPhone:1:0|false|false
-addr|12345|5|4||VeyronPhone:1:1|false|false
+addr|12345|4|1||VeyronLaptop:1:0|false|false
+addr|12345|5|2|4|VeyronPhone:1:0|false|false
+addr|12345|6|5||VeyronPhone:1:1|false|false
+
diff --git a/runtimes/google/vsync/testdata/remote-conf-link.log.sync b/runtimes/google/vsync/testdata/remote-conf-link.log.sync
index 65d0176..e6093a2 100644
--- a/runtimes/google/vsync/testdata/remote-conf-link.log.sync
+++ b/runtimes/google/vsync/testdata/remote-conf-link.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|0||VeyronPhone:1:0|false|false
-linkr|12345|3|1||VeyronPhone:1:1
+addr|12345|4|1||VeyronPhone:1:0|false|false
+linkr|12345|4|2||VeyronPhone:1:1
diff --git a/runtimes/google/vsync/testdata/remote-init-00.log.sync b/runtimes/google/vsync/testdata/remote-init-00.log.sync
index 0030417..5e12809 100644
--- a/runtimes/google/vsync/testdata/remote-init-00.log.sync
+++ b/runtimes/google/vsync/testdata/remote-init-00.log.sync
@@ -1,6 +1,6 @@
# Create an object remotely and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|0|||VeyronPhone:1:0|false|false
-addr|12345|1|0||VeyronPhone:1:1|false|false
-addr|12345|2|1||VeyronPhone:1:2|false|false
+addr|12345|1|||VeyronPhone:1:0|false|false
+addr|12345|2|1||VeyronPhone:1:1|false|false
+addr|12345|3|2||VeyronPhone:1:2|false|false
diff --git a/runtimes/google/vsync/testdata/remote-init-01.log.sync b/runtimes/google/vsync/testdata/remote-init-01.log.sync
index a357986..ad022b4 100644
--- a/runtimes/google/vsync/testdata/remote-init-01.log.sync
+++ b/runtimes/google/vsync/testdata/remote-init-01.log.sync
@@ -1,6 +1,6 @@
# Create an object remotely and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|0|||VeyronPhone:5:0|false|false
-addr|12345|1|0||VeyronPhone:5:1|false|false
-addr|12345|2|1||VeyronPhone:5:2|false|false
+addr|12345|1|||VeyronPhone:5:0|false|false
+addr|12345|2|1||VeyronPhone:5:1|false|false
+addr|12345|3|2||VeyronPhone:5:2|false|false
diff --git a/runtimes/google/vsync/testdata/remote-init-02.log.sync b/runtimes/google/vsync/testdata/remote-init-02.log.sync
index c0053ba..8885949 100644
--- a/runtimes/google/vsync/testdata/remote-init-02.log.sync
+++ b/runtimes/google/vsync/testdata/remote-init-02.log.sync
@@ -1,17 +1,17 @@
-# Create an object remotely and update it twice (linked-list).
+# Create objects and transactions remotely.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|123|0|||VeyronPhone:1:0|false|false
+addr|123|1|||VeyronPhone:1:0|false|false
-addr|123|1|0||VeyronPhone:1:1|true|false
-addr|456|0|||VeyronPhone:1:2|true|false
-addr|789|0|||VeyronPhone:1:3|false|false
+addr|123|2|1||VeyronPhone:1:1|true|false
+addr|456|1|||VeyronPhone:1:2|true|false
+addr|789|1|||VeyronPhone:1:3|false|false
-addr|789|1|0||VeyronPhone:1:4|false|false
+addr|789|2|1||VeyronPhone:1:4|false|false
-addr|789|2|0||VeyronTab:1:0|false|false
+addr|789|3|1||VeyronTab:1:0|false|false
-addr|789|3|1|2|VeyronPhone:2:0|false|false
+addr|789|4|2|3|VeyronPhone:2:0|false|false
-addr|123|2|1||VeyronPhone:2:1|true|false
-addr|456|1|0||VeyronPhone:2:2|false|false
+addr|123|3|2||VeyronPhone:2:1|true|false
+addr|456|2|1||VeyronPhone:2:2|false|false
diff --git a/runtimes/google/vsync/testdata/remote-init-03.log.sync b/runtimes/google/vsync/testdata/remote-init-03.log.sync
new file mode 100644
index 0000000..85083c7
--- /dev/null
+++ b/runtimes/google/vsync/testdata/remote-init-03.log.sync
@@ -0,0 +1,6 @@
+# Create an object remotely and delete it.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
+
+addr|12345|1|||VeyronPhone:1:0|false|false
+addr|12345|2|1||VeyronPhone:1:1|false|false
+addr|12345|3|2||VeyronPhone:1:2|false|true
diff --git a/runtimes/google/vsync/testdata/remote-noconf-00.log.sync b/runtimes/google/vsync/testdata/remote-noconf-00.log.sync
index c291155..c56208b 100644
--- a/runtimes/google/vsync/testdata/remote-noconf-00.log.sync
+++ b/runtimes/google/vsync/testdata/remote-noconf-00.log.sync
@@ -1,8 +1,8 @@
# Update an object remotely three times without triggering a conflict
-# after it was created locally up to v2 (i.e. assume the remote sync
+# after it was created locally up to v3 (i.e. assume the remote sync
# received it from the local sync first, then updated it).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|2||VeyronPhone:1:0|false|false
-addr|12345|4|3||VeyronPhone:1:1|false|false
-addr|12345|5|4||VeyronPhone:1:2|false|false
+addr|12345|4|3||VeyronPhone:1:0|false|false
+addr|12345|5|4||VeyronPhone:1:1|false|false
+addr|12345|6|5||VeyronPhone:1:2|false|false
diff --git a/runtimes/google/vsync/testdata/remote-noconf-link-00.log.sync b/runtimes/google/vsync/testdata/remote-noconf-link-00.log.sync
index ab1e2c4..1da37e3 100644
--- a/runtimes/google/vsync/testdata/remote-noconf-link-00.log.sync
+++ b/runtimes/google/vsync/testdata/remote-noconf-link-00.log.sync
@@ -1,5 +1,6 @@
# Update an object remotely, detect conflict, and bless the remote version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|0||VeyronPhone:1:0|false|false
-linkr|12345|1|3||VeyronPhone:1:1
+addr|12345|4|1||VeyronPhone:1:0|false|false
+linkr|12345|2|4||VeyronPhone:1:1
+
diff --git a/runtimes/google/vsync/testdata/remote-noconf-link-01.log.sync b/runtimes/google/vsync/testdata/remote-noconf-link-01.log.sync
index 4ee0b53..5530231 100644
--- a/runtimes/google/vsync/testdata/remote-noconf-link-01.log.sync
+++ b/runtimes/google/vsync/testdata/remote-noconf-link-01.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|0||VeyronPhone:1:0|false|false
-linkr|12345|3|2||VeyronPhone:1:1
+addr|12345|4|1||VeyronPhone:1:0|false|false
+linkr|12345|4|3||VeyronPhone:1:1
diff --git a/runtimes/google/vsync/testdata/remote-noconf-link-02.log.sync b/runtimes/google/vsync/testdata/remote-noconf-link-02.log.sync
index 53d9735..c628985 100644
--- a/runtimes/google/vsync/testdata/remote-noconf-link-02.log.sync
+++ b/runtimes/google/vsync/testdata/remote-noconf-link-02.log.sync
@@ -1,6 +1,7 @@
# Update an object remotely, detect conflict, and bless the remote version, and continue updating.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-addr|12345|3|0||VeyronPhone:1:0|false|false
-linkr|12345|2|3||VeyronPhone:1:1
-addr|12345|4|2||VeyronPhone:2:0|false|false
+addr|12345|4|1||VeyronPhone:1:0|false|false
+linkr|12345|3|4||VeyronPhone:1:1
+addr|12345|5|3||VeyronPhone:2:0|false|false
+
diff --git a/runtimes/google/vsync/testdata/remote-noconf-link-repeat.log.sync b/runtimes/google/vsync/testdata/remote-noconf-link-repeat.log.sync
index 2ca80bf..1ab977d 100644
--- a/runtimes/google/vsync/testdata/remote-noconf-link-repeat.log.sync
+++ b/runtimes/google/vsync/testdata/remote-noconf-link-repeat.log.sync
@@ -1,5 +1,5 @@
# Resolve the same conflict on two different devices.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<continued>|<deleted>
-linkr|12345|2|3||VeyronLaptop:1:0
+linkr|12345|3|4||VeyronLaptop:1:0
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 263ee30..d0bd35d 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -8,6 +8,8 @@
"time"
"veyron/services/store/raw"
+
+ "veyron2/storage"
)
// getFileName generates a filename for a temporary (per unit test) kvdb file.
@@ -92,7 +94,11 @@
for _, cmd := range cmds {
switch cmd.cmd {
case addLocal:
- err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{Mutation: raw.Mutation{Version: cmd.version}}, NoTxID)
+ parent := storage.NoVersion
+ if cmd.parents != nil {
+ parent = cmd.parents[0]
+ }
+ err = log.processWatchRecord(cmd.objID, cmd.version, parent, &LogValue{Mutation: raw.Mutation{Version: cmd.version}, Delete: cmd.deleted}, NoTxID)
if err != nil {
return nil, fmt.Errorf("cannot replay local log records %d:%s err %v",
cmd.objID, cmd.version, err)
@@ -149,6 +155,7 @@
Value: LogValue{
Mutation: raw.Mutation{Version: cmd.version},
Continued: cmd.continued,
+ Delete: cmd.deleted,
},
}
@@ -172,6 +179,7 @@
if err != nil {
return err
}
+
if err := s.dag.addNode(rec.ObjID, rec.CurVers, false, rec.Value.Delete, rec.Parents, logKey, NoTxID); err != nil {
return err
}
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 4d887c3..2dc6b17 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -17,7 +17,6 @@
"veyron2/ipc"
"veyron2/rt"
"veyron2/services/watch"
- "veyron2/storage"
"veyron2/vlog"
)
@@ -185,13 +184,8 @@
time = w.curTxSyncTime
}
val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == watch.DoesNotExist, Continued: ch.Continued}
- var parents []storage.Version
- if mu.PriorVersion != storage.NoVersion {
- parents = []storage.Version{mu.PriorVersion}
- }
-
vlog.VI(2).Infof("processChanges:: processing record %v, Tx %v", val, w.curTx)
- if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, parents, val, w.curTx); err != nil {
+ if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, mu.PriorVersion, val, w.curTx); err != nil {
return fmt.Errorf("cannot process mutation: %#v: %s", ch, err)
}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index dee40cf..0725b84 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -252,6 +252,42 @@
ResumeMarker: nil,
Continued: false,
},
+
+ // 3rd transaction: remove "/a/b" which updates "a" (its "Dir" field) and deletes "b".
+ watch.Change{
+ Name: "",
+ State: 0,
+ Value: &raw.Mutation{
+ ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
+ 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
+ PriorVersion: 0x365a858149c6e2d1,
+ Version: 0xa858149c6e2d1000,
+ Value: "value-a",
+ Dir: []storage.DEntry{
+ storage.DEntry{
+ Name: "c",
+ ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
+ 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
+ },
+ },
+ },
+ ResumeMarker: nil,
+ Continued: true,
+ },
+ watch.Change{
+ Name: "",
+ State: watch.DoesNotExist,
+ Value: &raw.Mutation{
+ ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
+ 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
+ PriorVersion: 0x55104dc76695721d,
+ Version: 0x0,
+ Value: "",
+ Dir: nil,
+ },
+ ResumeMarker: nil,
+ Continued: false,
+ },
}
return batch
@@ -321,6 +357,7 @@
// TestWatcherRecvError tests the watcher reacting to an error from the stream receive.
// It verifies that the watcher retries the Watch() RPC after a delay.
func TestWatcherRecvError(t *testing.T) {
+ rt.Init()
dir := initTestDir(t)
defer os.RemoveAll(dir)
@@ -339,6 +376,7 @@
// TestWatcherChanges tests the watcher applying changes received from store.
func TestWatcherChanges(t *testing.T) {
+ rt.Init()
dir := initTestDir(t)
defer os.RemoveAll(dir)
@@ -353,8 +391,8 @@
oidB := storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}
oidC := storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71}
- oids := []storage.ID{oidRoot, oidA, oidB, oidC}
- heads := []storage.Version{0x4d65822107fcfd52, 0x365a858149c6e2d1, 0x55104dc76695721d, 0x380704bb7b4d7c03}
+ oids := []storage.ID{oidRoot, oidA, oidC}
+ heads := []storage.Version{0x4d65822107fcfd52, 0xa858149c6e2d1000, 0x380704bb7b4d7c03}
for i, oid := range oids {
expHead := heads[i]
@@ -366,6 +404,15 @@
}
}
+ // Verify oidB.
+ headB, err := s.dag.getHead(oidB)
+ if err != nil {
+ t.Errorf("cannot find head node for object %d: %s", oidB, err)
+ }
+ if headB == storage.NoVersion || headB == storage.Version(0x55104dc76695721d) {
+ t.Errorf("wrong head for object B %d: %d ", oidB, headB)
+ }
+
// Verify transaction state for the first transaction.
node, err := s.dag.getNode(oidRoot, heads[0])
if err != nil {
@@ -381,7 +428,7 @@
expTxMap := dagTxMap{
oidRoot: heads[0],
oidA: storage.Version(0x57e9d1860d1d68d8),
- oidB: heads[2],
+ oidB: storage.Version(0x55104dc76695721d),
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
@@ -389,6 +436,27 @@
}
// Verify transaction state for the second transaction.
+ node, err = s.dag.getNode(oidC, heads[2])
+ if err != nil {
+ t.Errorf("cannot find dag node for object %d %v: %s", oidC, heads[2], err)
+ }
+ if node.TxID == NoTxID {
+ t.Errorf("expecting non nil txid for object %d:%v", oidC, heads[2])
+ }
+ txMap, err = s.dag.getTransaction(node.TxID)
+ if err != nil {
+ t.Errorf("cannot find transaction for id %v: %s", node.TxID, err)
+ }
+ expTxMap = dagTxMap{
+ oidA: storage.Version(0x365a858149c6e2d1),
+ oidC: heads[2],
+ }
+ if !reflect.DeepEqual(txMap, expTxMap) {
+ t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
+ node.TxID, txMap, expTxMap)
+ }
+
+ // Verify transaction state for the third transaction.
node, err = s.dag.getNode(oidA, heads[1])
if err != nil {
t.Errorf("cannot find dag node for object %d %v: %s", oidA, heads[1], err)
@@ -402,14 +470,26 @@
}
expTxMap = dagTxMap{
oidA: heads[1],
- oidC: heads[3],
+ oidB: headB,
}
if !reflect.DeepEqual(txMap, expTxMap) {
t.Errorf("Data mismatch for txid %v txmap %v instead of %v",
node.TxID, txMap, expTxMap)
}
- expResmark := []byte{2, 0, 0, 0, 0, 0, 0, 0}
+ // Verify deletion tracking.
+ node, err = s.dag.getNode(oidB, headB)
+ if err != nil {
+ t.Errorf("cannot find dag node for object %d %v: %s", oidB, headB, err)
+ }
+ if !node.Deleted {
+ t.Errorf("deleted node not found for object %d %v: %s", oidB, headB, err)
+ }
+ if !s.dag.hasDeletedDescendant(oidB, storage.Version(0x55104dc76695721d)) {
+ t.Errorf("link to deleted node not found for object %d %v: %s", oidB, headB, err)
+ }
+
+ expResmark := []byte{3, 0, 0, 0, 0, 0, 0, 0}
if bytes.Compare(s.devtab.head.Resmark, expResmark) != 0 {
t.Errorf("error in watch device table resume marker: %v instead of %v", s.devtab.head.Resmark, expResmark)