syncbase/vsync: DAG improvements & more tests.
* Support conflict detection where the DAG fragments have no
common ancestor (evolved from disjoint pasts).
* Switch sync tests to using leveldb instead of memstore.
* Guard the random number generator with a mutex.
* Add more DAG tests.
Change-Id: Ic46ce0109cde04d8b4eaa93ae001ee28b2a65d12
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index a84d916..5a432c7 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -283,15 +283,6 @@
return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
}
- // A new root node (no parents) is allowed only for new objects.
- // TODO(rdaoud): remove that limitation, we now can have key-collision.
- if parents == nil {
- if _, err := getHead(ctx, tx, oid); err == nil {
- return verror.New(verror.ErrInternal, ctx,
- "cannot add another root node for this object", oid, version)
- }
- }
-
// Verify the parents, determine the node level. Also save the levels
// of the parent nodes for later in this function in graft updates.
parentLevels := make(map[string]uint64)
@@ -338,9 +329,9 @@
// During a sync operation, each mutated object gets new nodes added in
// its DAG. These new nodes are either derived from nodes that were
// previously known on this device (i.e. their parent nodes are pre-
- // existing), or they are derived from other new DAG nodes being
- // discovered during this sync (i.e. their parent nodes were also just
- // added to the DAG).
+ // existing, or they have no parents (new root nodes)), or they are
+ // derived from other new DAG nodes being discovered during this sync
+ // (i.e. their parent nodes were also just added to the DAG).
//
// To detect a conflict and find the most recent common ancestor to
// pass to the conflict resolver, the DAG graft info keeps track of the
@@ -462,8 +453,9 @@
// hasConflict determines if an object has a conflict between its new and old
// head nodes.
-// - Yes: return (true, newHead, oldHead, ancestor)
-// - No: return (false, newHead, oldHead, NoVersion)
+// - Yes: return (true, newHead, oldHead, ancestor) -- from a common past
+// - Yes: return (true, newHead, oldHead, NoVersion) -- from disjoint pasts
+// - No: return (false, newHead, oldHead, NoVersion) -- no conflict
// A conflict exists when there are two new-head nodes in the graft structure.
// It means the newly added object versions are not derived in part from this
// device's current knowledge. If there is a single new-head, the object
@@ -475,6 +467,11 @@
ancestor = NoVersion
err = nil
+ if graft == nil {
+ err = verror.New(verror.ErrInternal, ctx, "no DAG graft map given")
+ return
+ }
+
info := graft[oid]
if info == nil {
err = verror.New(verror.ErrInternal, ctx, "node", oid, "has no DAG graft info")
@@ -519,6 +516,10 @@
// conflict resolver function is assumed to be convergent. However it
// is nicer to make that selection deterministic so all devices see the
// same choice: the version number is used as a tie-breaker.
+ // Note: for the case of a conflict from disjoint pasts, there are no
+ // graft nodes (empty set) and thus no common ancestor because the two
+ // DAG fragments were created from distinct root nodes. The "NoVersion"
+ // value is returned as the ancestor.
isConflict = true
var maxLevel uint64
for node, level := range info.graftNodes {
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 1b899df..c3f135a 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -21,6 +21,7 @@
// TestSetNode tests setting and getting a DAG node.
func TestSetNode(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
oid, version := "1111", "1"
@@ -67,6 +68,7 @@
// TestDelNode tests deleting a DAG node.
func TestDelNode(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
oid, version := "2222", "2"
@@ -102,6 +104,7 @@
// TestAddParent tests adding parents to a DAG node.
func TestAddParent(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -194,6 +197,7 @@
// TestSetHead tests setting and getting a DAG head node.
func TestSetHead(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
oid := "3333"
@@ -228,6 +232,7 @@
// show: v1 -> v2 -> v3 and the head should point to v3.
func TestLocalUpdates(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -266,14 +271,15 @@
t.Errorf("addNode() did not fail when using an invalid parent")
}
- // Make sure a new root node (no parents) cannot be added once a root exists.
+ // Make sure a new root node (no parents) can be added once a root exists.
// For the parents array, check both the "nil" and the empty array as input.
- if err := s.addNode(nil, tx, oid, "6789", "foo", false, nil, NoBatchId, nil); err == nil {
- t.Errorf("adding a 2nd root node (nil parents) for object %s did not fail", oid)
+ if err := s.addNode(nil, tx, oid, "6789", "foo", false, nil, NoBatchId, nil); err != nil {
+ t.Errorf("cannot add another root node (nil parents) for object %s: %v", oid, err)
}
- if err := s.addNode(nil, tx, oid, "6789", "foo", false, []string{}, NoBatchId, nil); err == nil {
- t.Errorf("adding a 2nd root node (empty parents) for object %s did not fail", oid)
+ if err := s.addNode(nil, tx, oid, "9999", "foo", false, []string{}, NoBatchId, nil); err != nil {
+ t.Errorf("cannot add another root node (empty parents) for object %s: %v", oid, err)
}
+
tx.Abort()
}
@@ -283,6 +289,7 @@
// and report no conflicts with the new head pointing at v3.
func TestRemoteUpdates(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -324,7 +331,7 @@
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "3" && oldHead == NoVersion && ancestor == NoVersion && errConflict == nil) {
- t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -356,6 +363,7 @@
// the graft point on which the new fragment (v4 -> v5 -> v6) gets attached.
func TestRemoteNoConflict(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -399,7 +407,7 @@
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
- t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -425,7 +433,7 @@
}
}
-// TestRemoteConflict tests sync handling remote updates that build on the
+// TestRemoteConflict tests sync handling of remote updates that build on the
// local initial state and trigger a conflict. An object is created locally
// and updated twice (v1 -> v2 -> v3). Another device, having only gotten
// the v1 -> v2 history, makes 3 updates on top of v2 (v2 -> v4 -> v5 -> v6)
@@ -438,6 +446,7 @@
// v3 and v6 and it becomes the new head.
func TestRemoteConflict(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -481,7 +490,7 @@
// There should be a conflict between v3 and v6 with v2 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
- t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -513,7 +522,7 @@
}
}
-// TestRemoteConflictTwoGrafts tests sync handling remote updates that build
+// TestRemoteConflictTwoGrafts tests sync handling of remote updates that build
// on the local initial state and trigger a conflict with 2 graft points.
// An object is created locally and updated twice (v1 -> v2 -> v3). Another
// device, first learns about v1 and makes it own conflicting update v1 -> v4.
@@ -530,6 +539,7 @@
// locally by creating v7, derived from both v3 and v6, becoming the new head.
func TestRemoteConflictTwoGrafts(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -573,7 +583,7 @@
// There should be a conflict between v3 and v6 with v2 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
- t.Errorf("object %s wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
@@ -605,6 +615,90 @@
}
}
+// TestRemoteConflictNoAncestor tests sync handling of remote updates that create
+// the same object independently from local initial state (no common past) and
+// trigger a conflict with no common ancestors (no graft points). An object is
+// created locally and updated twice (v1 -> v2 -> v3). Another device creates
+// the same object from scratch and updates it twice (v4 -> v5 -> v6). When
+// the local device learns of what happened on the remote device, it should
+// detect a conflict between v3 and v6 with no common ancestor. The conflict
+// is resolved locally by creating v7, derived from both v3 and v6, becoming
+// the new head.
+func TestRemoteConflictNoAncestor(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-conf-03.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v3) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": nil, "5": {"4"}, "6": {"5"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true, "6": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be a conflict between v3 and v6 with no ancestor.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+ t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
+ }
+ if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+ t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
+ }
+
+ // Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
+ if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify that the head moved to v7 and the parent map shows the resolution.
+ if head, err := getHead(nil, st, oid); err != nil || head != "7" {
+ t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
+ }
+
+ exp["7"] = []string{"3", "6"}
+ pmap = getParentMap(nil, st, oid, nil)
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
+ oid, pmap, exp)
+ }
+}
+
// TestAncestorIterator checks that the iterator goes over the correct set
// of ancestor nodes for an object given a starting node. It should traverse
// reconvergent DAG branches only visiting each ancestor once:
@@ -617,6 +711,7 @@
// - Starting at v9 it should cover all nodes (v1-v9).
func TestAncestorIterator(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -672,6 +767,7 @@
// Then by pruning again at v9 nothing changes.
func TestPruning(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -757,6 +853,7 @@
// nodes must be deleted and only v9 remains as the head.
func TestPruningCallbackError(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
s := svc.sync
@@ -807,3 +904,729 @@
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
}
+
+// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict. An object is created locally and
+// updated twice (v1 -> v2 -> v3). Another device learns about v1, then creates
+// (v1 -> v4), then learns about (v1 -> v2) and resolves the (v2/v4) conflict by
+// selecting v2 over v4. It sends that new info (v4 and the v2/v4 link) back to
+// the original (local) device. Instead of a v3/v4 conflict, the device sees
+// that v2 was chosen over v4 and resolves it as a no-conflict case.
+func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-noconf-link-00.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v3) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1", "4"}, "3": {"2"}, "4": {"1"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"1": 0, "4": 1}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "3" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Verify that hasConflict() fails with a nil or empty graft map.
+ isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, nil)
+ if errConflict == nil {
+ t.Errorf("hasConflict() on %v did not fail with a nil graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+ isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, newGraft())
+ if errConflict == nil {
+ t.Errorf("hasConflict() on %v did not fail with an empty graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+}
+
+// TestRemoteLinkedConflict tests sync of remote updates that contain linked
+// nodes (conflict resolution by selecting an existing version) on top of a local
+// initial state triggering a local conflict. An object is created locally and
+// updated twice (v1 -> v2 -> v3). Another device has along the way learned
+// about v1, created (v1 -> v4), then learned about (v1 -> v2) and resolved that
+// conflict by selecting v4 over v2. Now it sends that new info (v4 and the
+// v4/v2 link) back to the original (local) device which sees a v3/v4 conflict.
+func TestRemoteLinkedConflict(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-conf-link.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1", "2"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"3": true, "4": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"1": 0, "2": 1}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be a conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(isConflict && newHead == "4" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict, but moves the head node to a new one.
+// An object is created locally and updated twice (v1 -> v2 -> v3). Another
+// device has along the way learned about v1, created (v1 -> v4), then learned
+// about (v1 -> v2 -> v3) and resolved that conflict by selecting v4 over v3.
+// Now it sends that new info (v4 and the v4/v3 link) back to the original
+// (local) device. The device sees that the new head v4 is "derived" from v3
+// thus no conflict.
+func TestRemoteLinkedConflictNewHead(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-noconf-link-01.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1", "3"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"4": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"1": 0, "3": 2}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "4" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
+// contain linked nodes (conflict resolution by selecting an existing version)
+// on top of a local initial state without conflict, but moves the head node
+// to a new one that overtook the linked node.
+//
+// An object is created locally and updated twice (v1 -> v2 -> v3). Another
+// device has along the way learned about v1, created (v1 -> v4), then learned
+// about (v1 -> v2 -> v3) and resolved that conflict by selecting v3 over v4.
+// Then it creates a new update v5 from v3 (v3 -> v5). Now it sends that new
+// info (v4, the v3/v4 link, and v5) back to the original (local) device.
+// The device sees that the new head v5 is "derived" from v3 thus no conflict.
+func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ oid := "1234"
+
+ if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+ graft, err := s.dagReplayCommands(nil, "remote-noconf-link-02.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ if head, err := getHead(nil, st, oid); err != nil || head != "3" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ pmap := getParentMap(nil, st, oid, graft)
+
+ exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2", "4"}, "4": {"1"}, "5": {"3"}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ g := graft[oid]
+
+ expNewHeads := map[string]bool{"5": true}
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts := map[string]uint64{"1": 0, "3": 2, "4": 1}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "5" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Move the head.
+ tx := st.NewTransaction()
+ if err = moveHead(nil, tx, oid, newHead); err != nil {
+ t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
+ }
+ tx.Commit()
+
+ // Now new info comes from another device repeating the v2/v3 link.
+ // Verify that it is a NOP (no changes).
+ graft, err = s.dagReplayCommands(nil, "remote-noconf-link-repeat.log.sync")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, err := getHead(nil, st, oid); err != nil || head != "5" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+
+ g = graft[oid]
+
+ if !reflect.DeepEqual(g.newHeads, expNewHeads) {
+ t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
+ }
+
+ expGrafts = map[string]uint64{}
+ if !reflect.DeepEqual(g.graftNodes, expGrafts) {
+ t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
+ }
+
+ isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, graft)
+ if !(!isConflict && newHead == "5" && oldHead == "5" && ancestor == NoVersion && errConflict == nil) {
+ t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+}
+
+// TestAddNodeBatch tests adding multiple DAG nodes grouped within a batch.
+func TestAddNodeBatch(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ if _, err := s.dagReplayCommands(nil, "local-init-02.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ oid_a, oid_b, oid_c := "1234", "6789", "2222"
+
+ tx := st.NewTransaction()
+
+ // Verify NoBatchId is reported as an error.
+ if err := s.endBatch(nil, tx, NoBatchId, 0); err == nil {
+ t.Errorf("endBatch() did not fail for invalid 'NoBatchId' value")
+ }
+ if _, err := getBatch(nil, st, NoBatchId); err == nil {
+ t.Errorf("getBatch() did not fail for invalid 'NoBatchId' value")
+ }
+ if err := setBatch(nil, tx, NoBatchId, nil); err == nil {
+ t.Errorf("setBatch() did not fail for invalid 'NoBatchId' value")
+ }
+ if err := delBatch(nil, tx, NoBatchId); err == nil {
+ t.Errorf("delBatch() did not fail for invalid 'NoBatchId' value")
+ }
+
+ // Mutate 2 objects within a batch.
+ btid_1 := s.startBatch(nil, st, NoBatchId)
+ if btid_1 == NoBatchId {
+ t.Fatal("cannot start 1st DAG batch")
+ }
+ if err := s.endBatch(nil, tx, btid_1, 0); err == nil {
+ t.Errorf("endBatch() did not fail for a zero-count batch")
+ }
+
+ info := s.batches[btid_1]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_1)
+ }
+ if n := len(info.Objects); n != 0 {
+ t.Errorf("batch info map for ID %v has length %d instead of 0", btid_1, n)
+ }
+
+ if err := s.addNode(nil, tx, oid_a, "3", "logrec-a-03", false, []string{"2"}, btid_1, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_1, err)
+ }
+
+ if id := s.startBatch(nil, st, btid_1); id != btid_1 {
+ t.Fatalf("restarting batch failed: got %v instead of %v", id, btid_1)
+ }
+
+ if err := s.addNode(nil, tx, oid_b, "3", "logrec-b-03", false, []string{"2"}, btid_1, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_1, err)
+ }
+
+ // At the same time mutate the 3rd object in another batch.
+ btid_2 := s.startBatch(nil, st, NoBatchId)
+ if btid_2 == NoBatchId {
+ t.Fatal("cannot start 2nd DAG batch")
+ }
+
+ info = s.batches[btid_2]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_2)
+ }
+ if n := len(info.Objects); n != 0 {
+ t.Errorf("batch info map for ID %v has length %d instead of 0", btid_2, n)
+ }
+
+ if err := s.addNode(nil, tx, oid_c, "2", "logrec-c-02", false, []string{"1"}, btid_2, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_c, btid_2, err)
+ }
+
+ // Verify the in-memory batch sets constructed.
+ info = s.batches[btid_1]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_1)
+ }
+
+ expInfo := &batchInfo{map[string]string{oid_a: "3", oid_b: "3"}, 0}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_1, info, expInfo)
+ }
+
+ info = s.batches[btid_2]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_2)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_c: "2"}, 0}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
+ }
+
+ // Verify failing to use a batch ID not returned by startBatch().
+ bad_btid := btid_1 + 1
+ for bad_btid == NoBatchId || bad_btid == btid_2 {
+ bad_btid++
+ }
+
+ if err := s.addNode(nil, tx, oid_c, "3", "logrec-c-03", false, []string{"2"}, bad_btid, nil); err == nil {
+ t.Errorf("addNode() did not fail on object %s for a bad batch ID %v", oid_c, bad_btid)
+ }
+ if err := s.endBatch(nil, tx, bad_btid, 1); err == nil {
+ t.Errorf("endBatch() did not fail for a bad batch ID %v", bad_btid)
+ }
+
+ // End the 1st batch and verify the in-memory and in-store data.
+ if err := s.endBatch(nil, tx, btid_1, 2); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_1, err)
+ }
+ tx.Commit()
+
+ if info = s.batches[btid_1]; info != nil {
+ t.Errorf("batch info for ID %v still exists", btid_1)
+ }
+
+ info, err := getBatch(nil, st, btid_1)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_1, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_a: "3", oid_b: "3"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
+ btid_1, info, expInfo)
+ }
+
+ info = s.batches[btid_2]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_2)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_c: "2"}, 0}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
+ }
+
+ // End the 2nd batch and re-verify the in-memory and in-store data.
+ tx = st.NewTransaction()
+ if err := s.endBatch(nil, tx, btid_2, 1); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_2, err)
+ }
+ tx.Commit()
+
+ if info = s.batches[btid_2]; info != nil {
+ t.Errorf("batch info for ID %v still exists", btid_2)
+ }
+
+ info, err = getBatch(nil, st, btid_2)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_c: "2"}, 1}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v", btid_2, info, expInfo)
+ }
+
+ if n := len(s.batches); n != 0 {
+ t.Errorf("batches set in-memory: %d entries found, should be empty", n)
+ }
+
+ // Test incrementally filling up a batch.
+ btid_3 := uint64(100)
+ if s.batches[btid_3] != nil {
+ t.Errorf("batch info for ID %v found", btid_3)
+ }
+
+ if id := s.startBatch(nil, st, btid_3); id != btid_3 {
+ t.Fatalf("cannot start batch %v", btid_3)
+ }
+
+ info = s.batches[btid_3]
+ if info == nil {
+ t.Errorf("batches state for ID %v not found", btid_3)
+ }
+ if n := len(info.Objects); n != 0 {
+ t.Errorf("batch info map for ID %v has length %d instead of 0", btid_3, n)
+ }
+
+ tx = st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_a, "4", "logrec-a-04", false, []string{"3"}, btid_3, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_3, err)
+ }
+
+ if err := s.endBatch(nil, tx, btid_3, 2); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_3, err)
+ }
+ tx.Commit()
+
+ if s.batches[btid_3] != nil {
+ t.Errorf("batch info for ID %v still exists", btid_3)
+ }
+
+ info, err = getBatch(nil, st, btid_3)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_3, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_a: "4"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
+ btid_3, info, expInfo)
+ }
+
+ if id := s.startBatch(nil, st, btid_3); id != btid_3 {
+ t.Fatalf("cannot start batch %v", btid_3)
+ }
+
+ info = s.batches[btid_3]
+ if info == nil {
+ t.Errorf("batch state for ID %v not found", btid_3)
+ }
+
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
+ btid_3, info, expInfo)
+ }
+
+ tx = st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_b, "4", "logrec-b-04", false, []string{"3"}, btid_3, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_3, err)
+ }
+
+ if err := s.endBatch(nil, tx, btid_3, 3); err == nil {
+ t.Errorf("endBatch() didn't fail for ID %v: %v", btid_3, err)
+ }
+
+ if err := s.endBatch(nil, tx, btid_3, 2); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_3, err)
+ }
+ tx.Commit()
+
+ info, err = getBatch(nil, st, btid_3)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_3, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_a: "4", oid_b: "4"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch state from DAG storage for ID %v: %v instead of %v",
+ btid_3, info, expInfo)
+ }
+
+ // Get the 3 new nodes from the DAG and verify their batch IDs.
+ type nodeTest struct {
+ oid string
+ version string
+ btid uint64
+ }
+ tests := []nodeTest{
+ {oid_a, "3", btid_1},
+ {oid_a, "4", btid_3},
+ {oid_b, "3", btid_1},
+ {oid_b, "4", btid_3},
+ {oid_c, "2", btid_2},
+ }
+
+ for _, test := range tests {
+ node, err := getNode(nil, st, test.oid, test.version)
+ if err != nil {
+ t.Errorf("cannot find object %s:%s: %v", test.oid, test.version, err)
+ }
+ if node.BatchId != test.btid {
+ t.Errorf("invalid batch ID for object %s:%s: %v instead of %v",
+ test.oid, test.version, node.BatchId, test.btid)
+ }
+ }
+}
+
+// TestPruningBatches tests pruning DAG nodes grouped within batches.
+func TestPruningBatches(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ if _, err := s.dagReplayCommands(nil, "local-init-02.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ oid_a, oid_b, oid_c := "1234", "6789", "2222"
+
+ // Mutate objects in 2 batches then add non-batch mutations to act as
+ // the pruning points. Before pruning the DAG is:
+ // a1 -- a2 -- (a3) --- a4
+ // b1 -- b2 -- (b3) -- (b4) -- b5
+ // c1 ---------------- (c2)
+ // Now by pruning at (a4, b5, c2), the new DAG should be:
+ // a4
+ // b5
+ // (c2)
+ // Batch 1 (a3, b3) gets deleted, but batch 2 (b4, c2) still has (c2)
+ // dangling waiting for a future pruning.
+ btid_1 := s.startBatch(nil, st, NoBatchId)
+ if btid_1 == NoBatchId {
+ t.Fatal("cannot start 1st DAG addNode() batch")
+ }
+
+ tx := st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_a, "3", "logrec-a-03", false, []string{"2"}, btid_1, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_1, err)
+ }
+ if err := s.addNode(nil, tx, oid_b, "3", "logrec-b-03", false, []string{"2"}, btid_1, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_1, err)
+ }
+ if err := s.endBatch(nil, tx, btid_1, 2); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_1, err)
+ }
+ tx.Commit()
+
+ btid_2 := s.startBatch(nil, st, NoBatchId)
+ if btid_2 == NoBatchId {
+ t.Fatal("cannot start 2nd DAG addNode() batch")
+ }
+
+ tx = st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_b, "4", "logrec-b-04", false, []string{"3"}, btid_2, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_2, err)
+ }
+ if err := s.addNode(nil, tx, oid_c, "2", "logrec-c-02", false, []string{"1"}, btid_2, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_c, btid_2, err)
+ }
+ if err := s.endBatch(nil, tx, btid_2, 2); err != nil {
+ t.Errorf("cannot endBatch() for ID %v: %v", btid_2, err)
+ }
+ tx.Commit()
+
+ tx = st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_a, "4", "logrec-a-04", false, []string{"3"}, NoBatchId, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s: %v", oid_a, err)
+ }
+ if err := s.addNode(nil, tx, oid_b, "5", "logrec-b-05", false, []string{"4"}, NoBatchId, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s: %v", oid_b, err)
+ }
+
+ if err := moveHead(nil, tx, oid_a, "4"); err != nil {
+ t.Errorf("object %s cannot move head: %v", oid_a, err)
+ }
+ if err := moveHead(nil, tx, oid_b, "5"); err != nil {
+ t.Errorf("object %s cannot move head: %v", oid_b, err)
+ }
+ if err := moveHead(nil, tx, oid_c, "2"); err != nil {
+ t.Errorf("object %s cannot move head: %v", oid_c, err)
+ }
+ tx.Commit()
+
+ // Verify the batch sets.
+ info, err := getBatch(nil, st, btid_1)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_1, err)
+ }
+
+ expInfo := &batchInfo{map[string]string{oid_a: "3", oid_b: "3"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
+ btid_1, info, expInfo)
+ }
+
+ info, err = getBatch(nil, st, btid_2)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_b: "4", oid_c: "2"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
+ btid_2, info, expInfo)
+ }
+
+ // Prune the 3 objects at their head nodes.
+ batches := newBatchPruning()
+ tx = st.NewTransaction()
+ for _, oid := range []string{oid_a, oid_b, oid_c} {
+ head, err := getHead(nil, st, oid)
+ if err != nil {
+ t.Errorf("cannot getHead() on object %s: %v", oid, err)
+ }
+ err = prune(nil, tx, oid, head, batches,
+ func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("cannot prune() on object %s: %v", oid, err)
+ }
+ }
+
+ if err = pruneDone(nil, tx, batches); err != nil {
+ t.Errorf("pruneDone() failed: %v", err)
+ }
+ tx.Commit()
+
+ // Verify that batch-1 was deleted and batch-2 still has c2 in it.
+ info, err = getBatch(nil, st, btid_1)
+ if err == nil {
+ t.Errorf("getBatch() did not fail for ID %v: %v", btid_1, info)
+ }
+
+ info, err = getBatch(nil, st, btid_2)
+ if err != nil {
+ t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
+ }
+
+ expInfo = &batchInfo{map[string]string{oid_c: "2"}, 2}
+ if !reflect.DeepEqual(info, expInfo) {
+ t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
+ }
+
+ // Add c3 as a new head and prune at that point. This should GC batch-2.
+ tx = st.NewTransaction()
+ if err := s.addNode(nil, tx, oid_c, "3", "logrec-c-03", false, []string{"2"}, NoBatchId, nil); err != nil {
+ t.Errorf("cannot addNode() on object %s: %v", oid_c, err)
+ }
+ if err = moveHead(nil, tx, oid_c, "3"); err != nil {
+ t.Errorf("object %s cannot move head: %v", oid_c, err)
+ }
+
+ batches = newBatchPruning()
+ err = prune(nil, tx, oid_c, "3", batches,
+ func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("cannot prune() on object %s: %v", oid_c, err)
+ }
+ if err = pruneDone(nil, tx, batches); err != nil {
+ t.Errorf("pruneDone() #2 failed: %v", err)
+ }
+ tx.Commit()
+
+ info, err = getBatch(nil, st, btid_2)
+ if err == nil {
+ t.Errorf("getBatch() did not fail for ID %v: %v", btid_2, info)
+ }
+}
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index 50fbf14..470d086 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -147,9 +147,7 @@
return nil, fmt.Errorf("cannot add local node %s:%s: %v",
cmd.oid, cmd.version, err)
}
- tx.Commit()
- tx = st.NewTransaction()
if err = moveHead(ctx, tx, cmd.oid, cmd.version); err != nil {
return nil, fmt.Errorf("cannot move head to %s:%s: %v",
cmd.oid, cmd.version, err)
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 92d3d23..3d3563c 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -69,13 +69,16 @@
}
var (
- rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
- _ interfaces.SyncServerMethods = (*syncService)(nil)
- _ util.Layer = (*syncService)(nil)
+ rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+ rngLock sync.Mutex
+ _ interfaces.SyncServerMethods = (*syncService)(nil)
+ _ util.Layer = (*syncService)(nil)
)
// rand64 generates an unsigned 64-bit pseudo-random number.
func rand64() uint64 {
+ rngLock.Lock()
+ defer rngLock.Unlock()
return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 1b89adb..f41436c 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -40,6 +40,7 @@
// TestAddSyncGroup tests adding SyncGroups.
func TestAddSyncGroup(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
checkSGStats(t, svc, "add-1", 0, 0)
@@ -170,6 +171,7 @@
// TestInvalidAddSyncGroup tests adding SyncGroups.
func TestInvalidAddSyncGroup(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *interfaces.SyncGroup, msg string) {
@@ -203,6 +205,7 @@
// TestDeleteSyncGroup tests deleting a SyncGroup.
func TestDeleteSyncGroup(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
sgName := "foobar"
@@ -288,6 +291,7 @@
// TestMultiSyncGroups tests creating multiple SyncGroups.
func TestMultiSyncGroups(t *testing.T) {
svc := createService(t)
+ defer destroyService(t, svc)
st := svc.St()
sgName1, sgName2 := "foo", "bar"
diff --git a/services/syncbase/vsync/testdata/local-init-02.sync b/services/syncbase/vsync/testdata/local-init-02.sync
new file mode 100644
index 0000000..cb60a79
--- /dev/null
+++ b/services/syncbase/vsync/testdata/local-init-02.sync
@@ -0,0 +1,10 @@
+# Create DAGs for 3 objects locally.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addl|1234|1|||logrec-a-01|0|1|false
+addl|1234|2|1||logrec-a-02|0|1|false
+
+addl|6789|1|||logrec-b-01|0|1|false
+addl|6789|2|1||logrec-b-02|0|1|false
+
+addl|2222|1|||logrec-c-01|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
new file mode 100644
index 0000000..c293656
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -0,0 +1,6 @@
+# Create the same object remotely from scratch and update it twice (linked-list).
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|||VeyronPhone:10:1:0|0|1|false
+addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
+addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-link.log.sync b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
new file mode 100644
index 0000000..a324e4f
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
@@ -0,0 +1,5 @@
+# Update an object remotely, detect conflict, and bless the local version.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
+linkr|1234|4|2||VeyronPhone:10:1:1
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
new file mode 100644
index 0000000..6945bb2
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
@@ -0,0 +1,5 @@
+# Update an object remotely, detect conflict, and bless the remote version.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
+linkr|1234|2|4||VeyronPhone:10:1:1
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
new file mode 100644
index 0000000..0c6969e
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
@@ -0,0 +1,5 @@
+# Update an object remotely, detect conflict, and bless the local version.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
+linkr|1234|4|3||VeyronPhone:10:1:1
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
new file mode 100644
index 0000000..df9e128
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
@@ -0,0 +1,6 @@
+# Update an object remotely, detect conflict, and bless the remote version, and continue updating.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
+linkr|1234|3|4||VeyronPhone:10:1:1
+addr|1234|5|3||VeyronPhone:10:2:0|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
new file mode 100644
index 0000000..82e11c6
--- /dev/null
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
@@ -0,0 +1,4 @@
+# Resolve the same conflict on two different devices.
+# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
+
+linkr|1234|3|4||VeyronLaptop:10:1:0
diff --git a/services/syncbase/vsync/util_test.go b/services/syncbase/vsync/util_test.go
index 986132f..cbaad6c 100644
--- a/services/syncbase/vsync/util_test.go
+++ b/services/syncbase/vsync/util_test.go
@@ -7,11 +7,14 @@
// Utilities for testing sync.
import (
+ "fmt"
+ "os"
"testing"
+ "time"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/syncbase/x/ref/services/syncbase/store/memstore"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
@@ -21,8 +24,10 @@
// mockService emulates a Syncbase service that includes store and sync.
// It is used to access a mock application.
type mockService struct {
- st store.Store
- sync *syncService
+ engine string
+ path string
+ st store.Store
+ sync *syncService
}
func (s *mockService) St() store.Store {
@@ -108,14 +113,30 @@
}
// createService creates a mock Syncbase service used for testing sync functionality.
-// At present it relies on the underlying Memstore engine.
func createService(t *testing.T) *mockService {
- var err error
+ engine := "leveldb"
+ path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
+
+ st, err := util.OpenStore(engine, path)
+ if err != nil {
+ t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
+ }
+
s := &mockService{
- st: memstore.New(),
+ st: st,
+ engine: engine,
+ path: path,
}
if s.sync, err = New(nil, nil, s); err != nil {
+ util.DestroyStore(engine, path)
t.Fatalf("cannot create sync service: %v", err)
}
return s
}
+
+// destroyService cleans up the mock Syncbase service.
+func destroyService(t *testing.T, s *mockService) {
+ if err := util.DestroyStore(s.engine, s.path); err != nil {
+ t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
+ }
+}