blob: 302d83800aaef549e1f4e8c119d876ecc26f10e9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Tests for the Syncbase DAG.
import (
"errors"
"fmt"
"reflect"
"strconv"
"testing"
"v.io/v23/context"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/syncbase/store"
)
// TestSetNode tests setting and getting a DAG node.
func TestSetNode(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
oid, version := "1111", "1"
node, err := getNode(nil, st, oid, version)
if err == nil || node != nil {
t.Errorf("found non-existent object %s:%s: %v", oid, version, node)
}
if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
}
if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
t.Errorf("non-existent object %s:%s has a logrec: %v", oid, version, logrec)
}
node = &DagNode{Level: 15, Parents: []string{"444", "555"}, Logrec: "logrec-23"}
tx := st.NewTransaction()
if err = setNode(nil, tx, oid, version, node); err != nil {
t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
}
tx.Commit()
node2, err := getNode(nil, st, oid, version)
if err != nil || node2 == nil {
t.Errorf("cannot find stored object %s:%s: %v", oid, version, err)
}
if ok, err := hasNode(nil, st, oid, version); err != nil || !ok {
t.Errorf("hasNode() did not find object %s:%s", oid, version)
}
if !reflect.DeepEqual(node, node2) {
t.Errorf("object %s:%s has wrong data: %v instead of %v", oid, version, node2, node)
}
if logrec, err := getLogRecKey(nil, st, oid, version); err != nil || logrec != "logrec-23" {
t.Errorf("object %s:%s has wrong logrec: %s", oid, version, logrec)
}
}
// TestDelNode tests deleting a DAG node.
func TestDelNode(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
oid, version := "2222", "2"
node := &DagNode{Level: 123, Parents: []string{"333"}, Logrec: "logrec-789"}
tx := st.NewTransaction()
if err := setNode(nil, tx, oid, version, node); err != nil {
t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
}
tx.Commit()
tx = st.NewTransaction()
if err := delNode(nil, tx, oid, version); err != nil {
t.Fatalf("cannot delete object %s:%s: %v", oid, version, err)
}
tx.Commit()
node2, err := getNode(nil, st, oid, version)
if err == nil || node2 != nil {
t.Errorf("found deleted object %s:%s (%v)", oid, version, node2)
}
if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
t.Errorf("hasNode() found deleted object %s:%s", oid, version)
}
if logrec, err := getLogRecKey(nil, st, oid, version); err == nil || logrec != "" {
t.Errorf("deleted object %s:%s has logrec: %s", oid, version, logrec)
}
}
// TestAddParent tests adding parents to a DAG node.
func TestAddParent(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid, version := "c\xfefoo1", "7"
tx := st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, "haha", NoBatchId, nil); err == nil {
t.Errorf("addParent() did not fail for an unknown object %s:%s", oid, version)
}
tx.Abort()
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
node := &DagNode{Level: 15, Logrec: "logrec-22"}
tx = st.NewTransaction()
if err := setNode(nil, tx, oid, version, node); err != nil {
t.Fatalf("cannot set object %s:%s (%v): %v", oid, version, node, err)
}
tx.Commit()
graft := newGraft(st)
tx = st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, version, NoBatchId, graft); err == nil {
t.Errorf("addParent() did not fail on a self-parent for object %s:%s", oid, version)
}
tx.Abort()
remote := true
expParents := []string{"4", "5", "6"}
for _, parent := range expParents {
tx = st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, parent, NoBatchId, graft); err == nil {
t.Errorf("addParent() did not reject invalid parent %s for object %s:%s",
parent, oid, version)
}
tx.Abort()
pnode := &DagNode{Level: 11, Logrec: fmt.Sprintf("logrec-%s", parent), Parents: []string{"3"}}
tx = st.NewTransaction()
if err := setNode(nil, tx, oid, parent, pnode); err != nil {
t.Fatalf("cannot set parent object %s:%s (%v): %v", oid, parent, pnode, err)
}
tx.Commit()
var g *graftMap
if remote {
g = graft
}
// addParent() twice to verify it is idempotent.
for i := 0; i < 2; i++ {
tx = st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, parent, NoBatchId, g); err != nil {
t.Errorf("addParent() failed on parent %s, remote %t (i=%d) for %s:%s: %v",
parent, remote, i, oid, version, err)
}
tx.Commit()
}
remote = !remote
}
node2, err := getNode(nil, st, oid, version)
if err != nil || node2 == nil {
t.Errorf("cannot find object %s:%s: %v", oid, version, err)
}
if !reflect.DeepEqual(node2.Parents, expParents) {
t.Errorf("invalid parents for object %s:%s: %v instead of %v",
oid, version, node2.Parents, expParents)
}
// Creating cycles should fail.
for v := 1; v < 7; v++ {
ver := fmt.Sprintf("%d", v)
tx = st.NewTransaction()
if err = s.addParent(nil, tx, oid, ver, version, NoBatchId, nil); err == nil {
t.Errorf("addParent() failed to reject a cycle for %s: from ancestor %s to node %s",
oid, ver, version)
}
tx.Abort()
}
}
// TestSetHead tests setting and getting a DAG head node.
func TestSetHead(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
oid := "3333"
version, err := getHead(nil, st, oid)
if err == nil {
t.Errorf("found non-existent object head %s:%s", oid, version)
}
for i := 0; i < 2; i++ {
version = fmt.Sprintf("v%d", 555+i)
tx := st.NewTransaction()
if err = setHead(nil, tx, oid, version); err != nil {
t.Fatalf("cannot set object head %s:%s (i=%d)", oid, version, i)
}
tx.Commit()
version2, err := getHead(nil, st, oid)
if err != nil {
t.Errorf("cannot find stored object head %s (i=%d)", oid, i)
}
if version != version2 {
t.Errorf("object %s has wrong head data (i=%d): %s instead of %s",
oid, i, version2, version)
}
}
}
// TestLocalUpdates tests the sync handling of initial local updates: an object
// is created (v1) and updated twice (v2, v3) on this device. The DAG should
// show: v1 -> v2 -> v3 and the head should point to v3.
func TestLocalUpdates(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
// The head must have moved to v3 and the parent map shows the updated DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("invalid object %s head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, nil)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
tx := st.NewTransaction()
// Make sure a new node cannot have more than 2 parents.
if err := s.addNode(nil, tx, oid, "4", "c\xfefoo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
t.Errorf("addNode() did not fail when given 3 parents")
}
// Make sure a new node cannot have an invalid parent.
if err := s.addNode(nil, tx, oid, "4", "c\xfefoo", false, []string{"1", "555"}, NoBatchId, nil); err == nil {
t.Errorf("addNode() did not fail when using an invalid parent")
}
// Make sure a new root node (no parents) can be added once a root exists.
// For the parents array, check both the "nil" and the empty array as input.
if err := s.addNode(nil, tx, oid, "6789", "c\xfefoo", false, nil, NoBatchId, nil); err != nil {
t.Errorf("cannot add another root node (nil parents) for object %s: %v", oid, err)
}
if err := s.addNode(nil, tx, oid, "9999", "c\xfefoo", false, []string{}, NoBatchId, nil); err != nil {
t.Errorf("cannot add another root node (empty parents) for object %s: %v", oid, err)
}
tx.Abort()
}
// TestRemoteUpdates tests the sync handling of initial remote updates:
// an object is created (v1) and updated twice (v2, v3) on another device and
// we learn about it during sync. The updated DAG should show: v1 -> v2 -> v3
// and report no conflicts with the new head pointing at v3.
func TestRemoteUpdates(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
graft, err := s.dagReplayCommands(nil, "remote-init-00.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still undefined) and the parent
// map shows the newly grafted DAG fragment.
if head, err := getHead(nil, st, oid); err == nil {
t.Errorf("object %s head found: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "3" && oldHead == NoVersion && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
// Make sure an unknown node cannot become the new head.
tx := st.NewTransaction()
if err := moveHead(nil, tx, oid, "55"); err == nil {
t.Errorf("moveHead() did not fail on an invalid node")
}
tx.Abort()
// Then move the head.
tx = st.NewTransaction()
if err := moveHead(nil, tx, oid, newHead); err != nil {
t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
}
tx.Commit()
}
// TestRemoteNoConflict tests sync of remote updates on top of a local initial
// state without conflict. An object is created locally and updated twice
// (v1 -> v2 -> v3). Another device, having gotten this info, makes 3 updates
// on top of that (v3 -> v4 -> v5 -> v6) and sends this info in a later sync.
// The updated DAG should show (v1 -> v2 -> v3 -> v4 -> v5 -> v6) and report
// no conflicts with the new head pointing at v6. It should also report v3 as
// the graft point on which the new fragment (v4 -> v5 -> v6) gets attached.
func TestRemoteNoConflict(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-noconf-00.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"3"}, "5": {"4"}, "6": {"5"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"3": 2}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
// Then move the head.
tx := st.NewTransaction()
if err := moveHead(nil, tx, oid, newHead); err != nil {
t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
}
tx.Commit()
// Verify that hasConflict() fails without graft data.
isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, nil)
if errConflict == nil {
t.Errorf("hasConflict() on %s did not fail w/o graft data: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
}
// TestRemoteConflict tests sync handling of remote updates that build on the
// local initial state and trigger a conflict. An object is created locally
// and updated twice (v1 -> v2 -> v3). Another device, having only gotten
// the v1 -> v2 history, makes 3 updates on top of v2 (v2 -> v4 -> v5 -> v6)
// and sends this info during a later sync. Separately, the local device
// makes a conflicting (concurrent) update v2 -> v3. The updated DAG should
// show the branches: (v1 -> v2 -> v3) and (v1 -> v2 -> v4 -> v5 -> v6) and
// report the conflict between v3 and v6 (current and new heads). It should
// also report v2 as the graft point and the common ancestor in the conflict.
// The conflict is resolved locally by creating v7 that is derived from both
// v3 and v6 and it becomes the new head.
func TestRemoteConflict(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-conf-00.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"4"}, "6": {"5"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"2": 1}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be a conflict between v3 and v6 with v2 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "y\xfel\xfed\xfe10\xfe2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
// Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
t.Fatal(err)
}
// Verify that the head moved to v7 and the parent map shows the resolution.
if head, err := getHead(nil, st, oid); err != nil || head != "7" {
t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
}
exp["7"] = []string{"3", "6"}
pmap = getParentMap(nil, st, oid, nil)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
oid, pmap, exp)
}
}
// TestRemoteConflictTwoGrafts tests sync handling of remote updates that build
// on the local initial state and trigger a conflict with 2 graft points.
// An object is created locally and updated twice (v1 -> v2 -> v3). Another
// device, first learns about v1 and makes it own conflicting update v1 -> v4.
// That remote device later learns about v2 and resolves the v2/v4 confict by
// creating v5. Then it makes a last v5 -> v6 update -- which will conflict
// with v3 but it doesn't know that.
// Now the sync order is reversed and the local device learns all of what
// happened on the remote device. The local DAG should get be augmented by
// a subtree with 2 graft points: v1 and v2. It receives this new branch:
// v1 -> v4 -> v5 -> v6. Note that v5 is also derived from v2 as a remote
// conflict resolution. This should report a conflict between v3 and v6
// (current and new heads), with v1 and v2 as graft points, and v2 as the
// most-recent common ancestor for that conflict. The conflict is resolved
// locally by creating v7, derived from both v3 and v6, becoming the new head.
func TestRemoteConflictTwoGrafts(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-conf-01.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1"}, "5": {"2", "4"}, "6": {"5"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"1": 0, "2": 1}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be a conflict between v3 and v6 with v2 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe2" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "y\xfel\xfed\xfe10\xfe2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
// Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
t.Fatal(err)
}
// Verify that the head moved to v7 and the parent map shows the resolution.
if head, err := getHead(nil, st, oid); err != nil || head != "7" {
t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
}
exp["7"] = []string{"3", "6"}
pmap = getParentMap(nil, st, oid, nil)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
oid, pmap, exp)
}
}
// TestRemoteConflictNoAncestor tests sync handling of remote updates that create
// the same object independently from local initial state (no common past) and
// trigger a conflict with no common ancestors (no graft points). An object is
// created locally and updated twice (v1 -> v2 -> v3). Another device creates
// the same object from scratch and updates it twice (v4 -> v5 -> v6). When
// the local device learns of what happened on the remote device, it should
// detect a conflict between v3 and v6 with no common ancestor. The conflict
// is resolved locally by creating v7, derived from both v3 and v6, becoming
// the new head.
func TestRemoteConflictNoAncestor(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-conf-03.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": nil, "5": {"4"}, "6": {"5"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be a conflict between v3 and v6 with no ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "6" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
// Resolve the conflict by adding a new local v7 derived from v3 and v6 (this replay moves the head).
if _, err := s.dagReplayCommands(nil, "local-resolve-00.sync"); err != nil {
t.Fatal(err)
}
// Verify that the head moved to v7 and the parent map shows the resolution.
if head, err := getHead(nil, st, oid); err != nil || head != "7" {
t.Errorf("object %s has wrong head after conflict resolution: %s", oid, head)
}
exp["7"] = []string{"3", "6"}
pmap = getParentMap(nil, st, oid, nil)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map after conflict resolution: (%v) instead of (%v)",
oid, pmap, exp)
}
}
// TestAncestorIterator checks that the iterator goes over the correct set
// of ancestor nodes for an object given a starting node. It should traverse
// reconvergent DAG branches only visiting each ancestor once:
// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
// |--> v4 ---| |
// +--> v7 ---------------+
// - Starting at v1 it should only cover v1.
// - Starting at v3 it should only cover v1-v3.
// - Starting at v6 it should only cover v1-v6.
// - Starting at v9 it should cover all nodes (v1-v9).
func TestAncestorIterator(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "1234"
if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
// Loop checking the iteration behavior for different starting nodes.
for _, start := range []int{1, 3, 6, 9} {
visitCount := make(map[string]int)
vstart := fmt.Sprintf("%d", start)
forEachAncestor(nil, st, oid, []string{vstart}, func(v string, nd *DagNode) error {
visitCount[v]++
return nil
})
// Check that all prior nodes are visited only once.
for i := 1; i < (start + 1); i++ {
vv := fmt.Sprintf("%d", i)
if visitCount[vv] != 1 {
t.Errorf("wrong visit count on object %s:%s starting from %s: %d instead of 1",
oid, vv, vstart, visitCount[vv])
}
}
}
// Make sure an error in the callback is returned.
cbErr := errors.New("callback error")
err := forEachAncestor(nil, st, oid, []string{"9"}, func(v string, nd *DagNode) error {
if v == "1" {
return cbErr
}
return nil
})
if err != cbErr {
t.Errorf("wrong error returned from callback: %v instead of %v", err, cbErr)
}
}
// TestPruning tests sync pruning of the DAG for an object with 3 concurrent
// updates (i.e. 2 conflict resolution convergent points). The pruning must
// get rid of the DAG branches across the reconvergence points:
// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
// |--> v4 ---| |
// +--> v7 ---------------+
// By pruning at v1, nothing is deleted.
// Then by pruning at v2, only v1 is deleted.
// Then by pruning at v6, v2-v5 are deleted leaving v6 and "v7 -> v8 -> v9".
// Then by pruning at v8, v6-v7 are deleted leaving "v8 -> v9".
// Then by pruning at v9, v8 is deleted leaving v9 as the head.
// Then by pruning again at v9 nothing changes.
func TestPruning(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "1234"
if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
// Loop pruning at an invalid version (333) then at different valid versions.
// The last version used (NoVersion) deletes all remaining nodes for the
// object, including the head node.
testVersions := []string{"333", "1", "2", "6", "8", "9", "9", NoVersion}
delCounts := []int{0, 0, 1, 4, 2, 1, 0, 1}
for i, version := range testVersions {
batches := newBatchPruning()
tx := st.NewTransaction()
del := 0
err := prune(nil, tx, oid, version, batches,
func(ctx *context.T, tx store.Transaction, lr string) error {
del++
return nil
})
tx.Commit()
if i == 0 && err == nil {
t.Errorf("pruning non-existent object %s:%s did not fail", oid, version)
} else if i > 0 && err != nil {
t.Errorf("pruning object %s:%s failed: %v", oid, version, err)
}
if del != delCounts[i] {
t.Errorf("pruning object %s:%s deleted %d log records instead of %d",
oid, version, del, delCounts[i])
}
head, err := getHead(nil, st, oid)
if version != NoVersion {
if err != nil || head != "9" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
} else if err == nil {
t.Errorf("found head %s for object %s after pruning all versions", head, oid)
}
tx = st.NewTransaction()
err = pruneDone(nil, tx, batches)
if err != nil {
t.Errorf("pruneDone() failed: %v", err)
}
tx.Commit()
// Remove pruned nodes from the expected parent map used to validate
// and set the parents of the pruned node to nil.
if version == NoVersion {
exp = make(map[string][]string)
} else {
intVersion, err := strconv.ParseInt(version, 10, 32)
if err != nil {
t.Errorf("invalid version: %s", version)
}
if intVersion < 10 {
for j := int64(0); j < intVersion; j++ {
delete(exp, fmt.Sprintf("%d", j))
}
exp[version] = nil
}
}
pmap := getParentMap(nil, st, oid, nil)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
}
}
// TestPruningCallbackError tests sync pruning of the DAG when the callback
// function returns an error. The pruning must try to delete as many nodes
// and log records as possible and properly adjust the parent pointers of
// the pruning node. The object DAG is:
// v1 -> v2 -> v3 -> v5 -> v6 -> v8 -> v9
// |--> v4 ---| |
// +--> v7 ---------------+
// By pruning at v9 and having the callback function fail for v4, all other
// nodes must be deleted and only v9 remains as the head.
func TestPruningCallbackError(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "1234"
if _, err := s.dagReplayCommands(nil, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
exp := map[string][]string{"9": nil}
// Prune at v9 with a callback function that fails for v4.
del, expDel := 0, 8
version := "9"
batches := newBatchPruning()
tx := st.NewTransaction()
err := prune(nil, tx, oid, version, batches,
func(ctx *context.T, tx store.Transaction, lr string) error {
del++
if lr == "logrec-03" {
return fmt.Errorf("refuse to delete %s", lr)
}
return nil
})
tx.Commit()
if err == nil {
t.Errorf("pruning object %s:%s did not fail", oid, version)
}
if del != expDel {
t.Errorf("pruning object %s:%s deleted %d log records instead of %d", oid, version, del, expDel)
}
tx = st.NewTransaction()
err = pruneDone(nil, tx, batches)
if err != nil {
t.Errorf("pruneDone() failed: %v", err)
}
tx.Commit()
if head, err := getHead(nil, st, oid); err != nil || head != version {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, nil)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Invalid pruning without a batch set.
tx = st.NewTransaction()
err = prune(nil, tx, oid, version, nil, func(ctx *context.T, tx store.Transaction, lr string) error {
return nil
})
if err == nil {
t.Errorf("pruning object %s:%s without a batch set did not fail", oid, version)
}
tx.Abort()
}
// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
// linked nodes (conflict resolution by selecting an existing version) on top of
// a local initial state without conflict. An object is created locally and
// updated twice (v1 -> v2 -> v3). Another device learns about v1, then creates
// (v1 -> v4), then learns about (v1 -> v2) and resolves the (v2/v4) conflict by
// selecting v2 over v4. It sends that new info (v4 and the v2/v4 link) back to
// the original (local) device. Instead of a v3/v4 conflict, the device sees
// that v2 was chosen over v4 and resolves it as a no-conflict case.
func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-noconf-link-00.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v3) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1", "4"}, "3": {"2"}, "4": {"1"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"1": 0, "4": 1}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "3" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
// Verify that hasConflict() fails with a nil or empty graft map.
isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, nil)
if errConflict == nil {
t.Errorf("hasConflict() on %v did not fail with a nil graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, newGraft(st))
if errConflict == nil {
t.Errorf("hasConflict() on %v did not fail with an empty graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
}
// TestRemoteLinkedConflict tests sync of remote updates that contain linked
// nodes (conflict resolution by selecting an existing version) on top of a
// local initial state triggering a local conflict. An object is created locally
// and updated twice (v1 -> v2 -> v3). Another device has along the way learned
// about v1, created (v1 -> v4), then learned about (v1 -> v2) and resolved that
// conflict by selecting v4 over v2. Now it sends that new info (v4 and the
// v4/v2 link) back to the original (local) device which sees a v3/v4 conflict.
func TestRemoteLinkedConflict(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-conf-link.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1", "2"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "4": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"1": 0, "2": 1}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be a conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(isConflict && newHead == "4" && oldHead == "3" && ancestor == "2" && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
}
// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
// linked nodes (conflict resolution by selecting an existing version) on top of
// a local initial state without conflict, but moves the head node to a new one.
// An object is created locally and updated twice (v1 -> v2 -> v3). Another
// device has along the way learned about v1, created (v1 -> v4), then learned
// about (v1 -> v2 -> v3) and resolved that conflict by selecting v4 over v3.
// Now it sends that new info (v4 and the v4/v3 link) back to the original
// (local) device. The device sees that the new head v4 is "derived" from v3
// thus no conflict.
func TestRemoteLinkedConflictNewHead(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-noconf-link-01.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"1", "3"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"4": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"1": 0, "3": 2}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "4" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
}
// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
// contain linked nodes (conflict resolution by selecting an existing version)
// on top of a local initial state without conflict, but moves the head node
// to a new one that overtook the linked node.
//
// An object is created locally and updated twice (v1 -> v2 -> v3). Another
// device has along the way learned about v1, created (v1 -> v4), then learned
// about (v1 -> v2 -> v3) and resolved that conflict by selecting v3 over v4.
// Then it creates a new update v5 from v3 (v3 -> v5). Now it sends that new
// info (v4, the v3/v4 link, and v5) back to the original (local) device.
// The device sees that the new head v5 is "derived" from v3 thus no conflict.
func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
oid := "c\xfefoo1"
if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
t.Fatal(err)
}
graft, err := s.dagReplayCommands(nil, "remote-noconf-link-02.log.sync")
if err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
if head, err := getHead(nil, st, oid); err != nil || head != "3" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
pmap := getParentMap(nil, st, oid, graft)
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2", "4"}, "4": {"1"}, "5": {"3"}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
// Verify the grafting of remote nodes.
g := graft.objGraft[oid]
expNewHeads := map[string]bool{"5": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts := map[string]uint64{"1": 0, "3": 2, "4": 1}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "5" && oldHead == "3" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
// Move the head.
tx := st.NewTransaction()
if err = moveHead(nil, tx, oid, newHead); err != nil {
t.Errorf("object %s cannot move head to %s: %v", oid, newHead, err)
}
tx.Commit()
// Now new info comes from another device repeating the v2/v3 link.
// Verify that it is a NOP (no changes).
graft, err = s.dagReplayCommands(nil, "remote-noconf-link-repeat.log.sync")
if err != nil {
t.Fatal(err)
}
if head, err := getHead(nil, st, oid); err != nil || head != "5" {
t.Errorf("object %s has wrong head: %s", oid, head)
}
g = graft.objGraft[oid]
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
}
expGrafts = map[string]uint64{}
if !reflect.DeepEqual(g.graftNodes, expGrafts) {
t.Errorf("invalid object %s graft: (%v) instead of (%v)", oid, g.graftNodes, expGrafts)
}
isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, graft)
if !(!isConflict && newHead == "5" && oldHead == "5" && ancestor == NoVersion && errConflict == nil) {
t.Errorf("object %s: wrong conflict info: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
}
// TestAddNodeBatch tests adding multiple DAG nodes grouped within a batch.
func TestAddNodeBatch(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
if _, err := s.dagReplayCommands(nil, "local-init-02.sync"); err != nil {
t.Fatal(err)
}
oid_a, oid_b, oid_c := "1234", "6789", "2222"
tx := st.NewTransaction()
// Verify NoBatchId is reported as an error.
if err := s.endBatch(nil, tx, NoBatchId, 0); err == nil {
t.Errorf("endBatch() did not fail for invalid 'NoBatchId' value")
}
if _, err := getBatch(nil, st, NoBatchId); err == nil {
t.Errorf("getBatch() did not fail for invalid 'NoBatchId' value")
}
if err := setBatch(nil, tx, NoBatchId, nil); err == nil {
t.Errorf("setBatch() did not fail for invalid 'NoBatchId' value")
}
if err := delBatch(nil, tx, NoBatchId); err == nil {
t.Errorf("delBatch() did not fail for invalid 'NoBatchId' value")
}
// Mutate 2 objects within a batch.
btid_1 := s.startBatch(nil, st, NoBatchId)
if btid_1 == NoBatchId {
t.Fatal("cannot start 1st DAG batch")
}
if err := s.endBatch(nil, tx, btid_1, 0); err == nil {
t.Errorf("endBatch() did not fail for a zero-count batch")
}
info := s.batches[btid_1]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_1)
}
if n := len(info.Objects); n != 0 {
t.Errorf("batch info map for ID %v has length %d instead of 0", btid_1, n)
}
if err := s.addNode(nil, tx, oid_a, "3", "logrec-a-03", false, []string{"2"}, btid_1, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_1, err)
}
if id := s.startBatch(nil, st, btid_1); id != btid_1 {
t.Fatalf("restarting batch failed: got %v instead of %v", id, btid_1)
}
if err := s.addNode(nil, tx, oid_b, "3", "logrec-b-03", false, []string{"2"}, btid_1, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_1, err)
}
// At the same time mutate the 3rd object in another batch.
btid_2 := s.startBatch(nil, st, NoBatchId)
if btid_2 == NoBatchId {
t.Fatal("cannot start 2nd DAG batch")
}
info = s.batches[btid_2]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_2)
}
if n := len(info.Objects); n != 0 {
t.Errorf("batch info map for ID %v has length %d instead of 0", btid_2, n)
}
if err := s.addNode(nil, tx, oid_c, "2", "logrec-c-02", false, []string{"1"}, btid_2, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_c, btid_2, err)
}
// Verify the in-memory batch sets constructed.
info = s.batches[btid_1]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_1)
}
expInfo := &BatchInfo{map[string]string{oid_a: "3", oid_b: "3"}, make(map[string]string), 0}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_1, info, expInfo)
}
info = s.batches[btid_2]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_2)
}
expInfo = &BatchInfo{map[string]string{oid_c: "2"}, make(map[string]string), 0}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
}
// Verify failing to use a batch ID not returned by startBatch().
bad_btid := btid_1 + 1
for bad_btid == NoBatchId || bad_btid == btid_2 {
bad_btid++
}
if err := s.addNode(nil, tx, oid_c, "3", "logrec-c-03", false, []string{"2"}, bad_btid, nil); err == nil {
t.Errorf("addNode() did not fail on object %s for a bad batch ID %v", oid_c, bad_btid)
}
if err := s.endBatch(nil, tx, bad_btid, 1); err == nil {
t.Errorf("endBatch() did not fail for a bad batch ID %v", bad_btid)
}
// End the 1st batch and verify the in-memory and in-store data.
if err := s.endBatch(nil, tx, btid_1, 2); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_1, err)
}
tx.Commit()
if info = s.batches[btid_1]; info != nil {
t.Errorf("batch info for ID %v still exists", btid_1)
}
info, err := getBatch(nil, st, btid_1)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_1, err)
}
expInfo = &BatchInfo{map[string]string{oid_a: "3", oid_b: "3"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
btid_1, info, expInfo)
}
info = s.batches[btid_2]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_2)
}
expInfo = &BatchInfo{map[string]string{oid_c: "2"}, make(map[string]string), 0}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
}
// End the 2nd batch and re-verify the in-memory and in-store data.
tx = st.NewTransaction()
if err := s.endBatch(nil, tx, btid_2, 1); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_2, err)
}
tx.Commit()
if info = s.batches[btid_2]; info != nil {
t.Errorf("batch info for ID %v still exists", btid_2)
}
info, err = getBatch(nil, st, btid_2)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
}
expInfo = &BatchInfo{map[string]string{oid_c: "2"}, nil, 1}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v", btid_2, info, expInfo)
}
if n := len(s.batches); n != 0 {
t.Errorf("batches set in-memory: %d entries found, should be empty", n)
}
// Test incrementally filling up a batch.
btid_3 := uint64(100)
if s.batches[btid_3] != nil {
t.Errorf("batch info for ID %v found", btid_3)
}
if id := s.startBatch(nil, st, btid_3); id != btid_3 {
t.Fatalf("cannot start batch %v", btid_3)
}
info = s.batches[btid_3]
if info == nil {
t.Errorf("batches state for ID %v not found", btid_3)
}
if n := len(info.Objects); n != 0 {
t.Errorf("batch info map for ID %v has length %d instead of 0", btid_3, n)
}
tx = st.NewTransaction()
if err := s.addNode(nil, tx, oid_a, "4", "logrec-a-04", false, []string{"3"}, btid_3, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_3, err)
}
if err := s.endBatch(nil, tx, btid_3, 2); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_3, err)
}
tx.Commit()
if s.batches[btid_3] != nil {
t.Errorf("batch info for ID %v still exists", btid_3)
}
info, err = getBatch(nil, st, btid_3)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_3, err)
}
expInfo = &BatchInfo{map[string]string{oid_a: "4"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
btid_3, info, expInfo)
}
if id := s.startBatch(nil, st, btid_3); id != btid_3 {
t.Fatalf("cannot start batch %v", btid_3)
}
info = s.batches[btid_3]
if info == nil {
t.Errorf("batch state for ID %v not found", btid_3)
}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
btid_3, info, expInfo)
}
tx = st.NewTransaction()
if err := s.addNode(nil, tx, oid_b, "4", "logrec-b-04", false, []string{"3"}, btid_3, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_3, err)
}
if err := s.endBatch(nil, tx, btid_3, 3); err == nil {
t.Errorf("endBatch() didn't fail for ID %v: %v", btid_3, err)
}
if err := s.endBatch(nil, tx, btid_3, 2); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_3, err)
}
tx.Commit()
info, err = getBatch(nil, st, btid_3)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_3, err)
}
expInfo = &BatchInfo{map[string]string{oid_a: "4", oid_b: "4"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch state from DAG storage for ID %v: %v instead of %v",
btid_3, info, expInfo)
}
// Get the 3 new nodes from the DAG and verify their batch IDs.
type nodeTest struct {
oid string
version string
btid uint64
}
tests := []nodeTest{
{oid_a, "3", btid_1},
{oid_a, "4", btid_3},
{oid_b, "3", btid_1},
{oid_b, "4", btid_3},
{oid_c, "2", btid_2},
}
for _, test := range tests {
node, err := getNode(nil, st, test.oid, test.version)
if err != nil {
t.Errorf("cannot find object %s:%s: %v", test.oid, test.version, err)
}
if node.BatchId != test.btid {
t.Errorf("invalid batch ID for object %s:%s: %v instead of %v",
test.oid, test.version, node.BatchId, test.btid)
}
}
}
// TestPruningBatches tests pruning DAG nodes grouped within batches.
func TestPruningBatches(t *testing.T) {
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
s := svc.sync
if _, err := s.dagReplayCommands(nil, "local-init-02.sync"); err != nil {
t.Fatal(err)
}
oid_a, oid_b, oid_c := "1234", "6789", "2222"
// Mutate objects in 2 batches then add non-batch mutations to act as
// the pruning points. Before pruning the DAG is:
// a1 -- a2 -- (a3) --- a4
// b1 -- b2 -- (b3) -- (b4) -- b5
// c1 ---------------- (c2)
// Now by pruning at (a4, b5, c2), the new DAG should be:
// a4
// b5
// (c2)
// Batch 1 (a3, b3) gets deleted, but batch 2 (b4, c2) still has (c2)
// dangling waiting for a future pruning.
btid_1 := s.startBatch(nil, st, NoBatchId)
if btid_1 == NoBatchId {
t.Fatal("cannot start 1st DAG addNode() batch")
}
tx := st.NewTransaction()
if err := s.addNode(nil, tx, oid_a, "3", "logrec-a-03", false, []string{"2"}, btid_1, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_a, btid_1, err)
}
if err := s.addNode(nil, tx, oid_b, "3", "logrec-b-03", false, []string{"2"}, btid_1, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_1, err)
}
if err := s.endBatch(nil, tx, btid_1, 2); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_1, err)
}
tx.Commit()
btid_2 := s.startBatch(nil, st, NoBatchId)
if btid_2 == NoBatchId {
t.Fatal("cannot start 2nd DAG addNode() batch")
}
tx = st.NewTransaction()
if err := s.addNode(nil, tx, oid_b, "4", "logrec-b-04", false, []string{"3"}, btid_2, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_b, btid_2, err)
}
if err := s.addNode(nil, tx, oid_c, "2", "logrec-c-02", false, []string{"1"}, btid_2, nil); err != nil {
t.Errorf("cannot addNode() on object %s and batch ID %v: %v", oid_c, btid_2, err)
}
if err := s.endBatch(nil, tx, btid_2, 2); err != nil {
t.Errorf("cannot endBatch() for ID %v: %v", btid_2, err)
}
tx.Commit()
tx = st.NewTransaction()
if err := s.addNode(nil, tx, oid_a, "4", "logrec-a-04", false, []string{"3"}, NoBatchId, nil); err != nil {
t.Errorf("cannot addNode() on object %s: %v", oid_a, err)
}
if err := s.addNode(nil, tx, oid_b, "5", "logrec-b-05", false, []string{"4"}, NoBatchId, nil); err != nil {
t.Errorf("cannot addNode() on object %s: %v", oid_b, err)
}
if err := moveHead(nil, tx, oid_a, "4"); err != nil {
t.Errorf("object %s cannot move head: %v", oid_a, err)
}
if err := moveHead(nil, tx, oid_b, "5"); err != nil {
t.Errorf("object %s cannot move head: %v", oid_b, err)
}
if err := moveHead(nil, tx, oid_c, "2"); err != nil {
t.Errorf("object %s cannot move head: %v", oid_c, err)
}
tx.Commit()
// Verify the batch sets.
info, err := getBatch(nil, st, btid_1)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_1, err)
}
expInfo := &BatchInfo{map[string]string{oid_a: "3", oid_b: "3"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
btid_1, info, expInfo)
}
info, err = getBatch(nil, st, btid_2)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
}
expInfo = &BatchInfo{map[string]string{oid_b: "4", oid_c: "2"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info from DAG storage for ID %v: %v instead of %v",
btid_2, info, expInfo)
}
// Prune the 3 objects at their head nodes.
batches := newBatchPruning()
tx = st.NewTransaction()
for _, oid := range []string{oid_a, oid_b, oid_c} {
head, err := getHead(nil, st, oid)
if err != nil {
t.Errorf("cannot getHead() on object %s: %v", oid, err)
}
err = prune(nil, tx, oid, head, batches,
func(ctx *context.T, itx store.Transaction, lr string) error {
return nil
})
if err != nil {
t.Errorf("cannot prune() on object %s: %v", oid, err)
}
}
if err = pruneDone(nil, tx, batches); err != nil {
t.Errorf("pruneDone() failed: %v", err)
}
tx.Commit()
// Verify that batch-1 was deleted and batch-2 still has c2 in it.
info, err = getBatch(nil, st, btid_1)
if err == nil {
t.Errorf("getBatch() did not fail for ID %v: %v", btid_1, info)
}
info, err = getBatch(nil, st, btid_2)
if err != nil {
t.Errorf("cannot getBatch() for ID %v: %v", btid_2, err)
}
expInfo = &BatchInfo{map[string]string{oid_c: "2"}, nil, 2}
if !reflect.DeepEqual(info, expInfo) {
t.Errorf("invalid batch info for ID %v: %v instead of %v", btid_2, info, expInfo)
}
// Add c3 as a new head and prune at that point. This should GC batch-2.
tx = st.NewTransaction()
if err := s.addNode(nil, tx, oid_c, "3", "logrec-c-03", false, []string{"2"}, NoBatchId, nil); err != nil {
t.Errorf("cannot addNode() on object %s: %v", oid_c, err)
}
if err = moveHead(nil, tx, oid_c, "3"); err != nil {
t.Errorf("object %s cannot move head: %v", oid_c, err)
}
batches = newBatchPruning()
err = prune(nil, tx, oid_c, "3", batches,
func(ctx *context.T, itx store.Transaction, lr string) error {
return nil
})
if err != nil {
t.Errorf("cannot prune() on object %s: %v", oid_c, err)
}
if err = pruneDone(nil, tx, batches); err != nil {
t.Errorf("pruneDone() #2 failed: %v", err)
}
tx.Commit()
info, err = getBatch(nil, st, btid_2)
if err == nil {
t.Errorf("getBatch() did not fail for ID %v: %v", btid_2, info)
}
}