blob: 0b5d4c80c09242ff2ca835bd9a58e005db01f8e4 [file] [log] [blame]
package vsync
// Tests for the Veyron Sync DAG component.
import (
"errors"
"fmt"
"math/rand"
"os"
"reflect"
"testing"
"time"
"veyron2/storage"
)
// dagFilename generates a filename for a temporary (per unit test) DAG file.
// Do not replace this function with TempFile because TempFile creates the new
// file and the tests must verify that the DAG can create a non-existing file.
func dagFilename() string {
return fmt.Sprintf("%s/sync_dag_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
}
// fileSize returns the size of a file.
func fileSize(fname string) int64 {
finfo, err := os.Stat(fname)
if err != nil {
return -1
}
return finfo.Size()
}
// TestDAGOpen tests the creation of a DAG, closing and re-opening it. It also
// verifies that its backing file is created and that a 2nd close is safe.
func TestDAGOpen(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
fsize := fileSize(dagfile)
if fsize < 0 {
t.Fatalf("DAG file %s not created", dagfile)
}
dag.flush()
oldfsize := fsize
fsize = fileSize(dagfile)
if fsize <= oldfsize {
t.Fatalf("DAG file %s not flushed", dagfile)
}
dag.close()
dag, err = openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot re-open existing DAG file %s", dagfile)
}
oldfsize = fsize
fsize = fileSize(dagfile)
if fsize != oldfsize {
t.Fatalf("DAG file %s size changed across re-open", dagfile)
}
dag.close()
dag.close() // multiple closes should be a safe NOP
fsize = fileSize(dagfile)
if fsize != oldfsize {
t.Fatalf("DAG file %s size changed across close", dagfile)
}
// Fail opening a DAG in a non-existent directory.
_, err = openDAG("/not/really/there/junk.dag")
if err == nil {
t.Fatalf("openDAG() did not fail when using a bad pathname")
}
}
// TestInvalidDAG tests using DAG methods on an invalid (closed) DAG.
func TestInvalidDAG(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
dag.close()
oid, err := strToObjID("6789")
if err != nil {
t.Error(err)
}
err = dag.addNode(oid, 4, false, []storage.Version{2, 3}, "foobar")
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("addNode() did not fail on a closed DAG: %v", err)
}
err = dag.moveHead(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("moveHead() did not fail on a closed DAG: %v", err)
}
_, _, _, _, err = dag.hasConflict(oid)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("hasConflict() did not fail on a closed DAG: %v", err)
}
_, err = dag.getLogrec(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("getLogrec() did not fail on a closed DAG: %v", err)
}
err = dag.prune(oid, 4, func(lr string) error {
return nil
})
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("prune() did not fail on a closed DAG: %v", err)
}
node := &dagNode{Level: 15, Parents: []storage.Version{444, 555}, Logrec: "logrec-23"}
err = dag.setNode(oid, 4, node)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("setNode() did not fail on a closed DAG: %v", err)
}
_, err = dag.getNode(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("getNode() did not fail on a closed DAG: %v", err)
}
err = dag.delNode(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("delNode() did not fail on a closed DAG: %v", err)
}
err = dag.setHead(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("setHead() did not fail on a closed DAG: %v", err)
}
_, err = dag.getHead(oid)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("getHead() did not fail on a closed DAG: %v", err)
}
err = dag.compact()
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("compact() did not fail on a closed DAG: %v", err)
}
// These calls should be harmless NOPs.
dag.clearGraft()
dag.flush()
dag.close()
if dag.hasNode(oid, 4) {
t.Errorf("hasNode() found an object on a closed DAG")
}
if pmap := dag.getParentMap(oid); len(pmap) != 0 {
t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
}
if head, gmap := dag.getGraftNodes(oid); head != 0 || len(gmap) != 0 {
t.Errorf("getGraftNodes() found data on a closed DAG: head: %d, map: %v", head, gmap)
}
}
// TestSetNode tests setting and getting a DAG node across DAG open/close/reopen.
func TestSetNode(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
version := storage.Version(0)
oid, err := strToObjID("111")
if err != nil {
t.Fatal(err)
}
node, err := dag.getNode(oid, version)
if err == nil || node != nil {
t.Errorf("Found non-existent object %d:%d in DAG file %s: %v", oid, version, dagfile, node)
}
if dag.hasNode(oid, version) {
t.Errorf("hasNode() found non-existent object %d:%d in DAG file %s", oid, version, dagfile)
}
if logrec, err := dag.getLogrec(oid, version); err == nil || logrec != "" {
t.Errorf("Non-existent object %d:%d has a logrec in DAG file %s: %v", oid, version, dagfile, logrec)
}
node = &dagNode{Level: 15, Parents: []storage.Version{444, 555}, Logrec: "logrec-23"}
if err = dag.setNode(oid, version, node); err != nil {
t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
}
for i := 0; i < 2; i++ {
node2, err := dag.getNode(oid, version)
if err != nil || node2 == nil {
t.Errorf("Cannot find stored object %d:%d (i=%d) in DAG file %s", oid, version, i, dagfile)
}
if !dag.hasNode(oid, version) {
t.Errorf("hasNode() did not find object %d:%d (i=%d) in DAG file %s", oid, version, i, dagfile)
}
if !reflect.DeepEqual(node, node2) {
t.Errorf("Object %d:%d has wrong data (i=%d) in DAG file %s: %v instead of %v",
oid, version, i, dagfile, node2, node)
}
if logrec, err := dag.getLogrec(oid, version); err != nil || logrec != "logrec-23" {
t.Errorf("Object %d:%d has wrong logrec (i=%d) in DAG file %s: %v",
oid, version, i, dagfile, logrec)
}
if i == 0 {
dag.flush()
dag.close()
dag, err = openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot re-open DAG file %s", dagfile)
}
}
}
dag.close()
}
// TestDelNode tests deleting a DAG node across DAG open/close/reopen.
func TestDelNode(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
version := storage.Version(1)
oid, err := strToObjID("222")
if err != nil {
t.Fatal(err)
}
node := &dagNode{Level: 123, Parents: []storage.Version{333}, Logrec: "logrec-789"}
if err = dag.setNode(oid, version, node); err != nil {
t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
}
dag.flush()
err = dag.delNode(oid, version)
if err != nil {
t.Fatalf("Cannot delete object %d:%d in DAG file %s", oid, version, dagfile)
}
dag.flush()
for i := 0; i < 2; i++ {
node2, err := dag.getNode(oid, version)
if err == nil || node2 != nil {
t.Errorf("Found deleted object %d:%d (%v) (i=%d) in DAG file %s", oid, version, node2, i, dagfile)
}
if dag.hasNode(oid, version) {
t.Errorf("hasNode() found deleted object %d:%d (i=%d) in DAG file %s", oid, version, i, dagfile)
}
if logrec, err := dag.getLogrec(oid, version); err == nil || logrec != "" {
t.Errorf("Deleted object %d:%d (i=%d) has logrec in DAG file %s: %v", oid, version, i, dagfile, logrec)
}
if i == 0 {
dag.close()
dag, err = openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot re-open DAG file %s", dagfile)
}
}
}
dag.close()
}
// TestSetHead tests setting and getting a DAG head node across DAG open/close/reopen.
func TestSetHead(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
oid, err := strToObjID("333")
if err != nil {
t.Fatal(err)
}
version, err := dag.getHead(oid)
if err == nil {
t.Errorf("Found non-existent object head %d in DAG file %s: %d", oid, dagfile, version)
}
version = 555
if err = dag.setHead(oid, version); err != nil {
t.Fatalf("Cannot set object head %d (%d) in DAG file %s", oid, version, dagfile)
}
dag.flush()
for i := 0; i < 3; i++ {
version2, err := dag.getHead(oid)
if err != nil {
t.Errorf("Cannot find stored object head %d (i=%d) in DAG file %s", oid, i, dagfile)
}
if version != version2 {
t.Errorf("Object %d has wrong head data (i=%d) in DAG file %s: %d instead of %d",
oid, i, dagfile, version2, version)
}
if i == 0 {
dag.close()
dag, err = openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot re-open DAG file %s", dagfile)
}
} else if i == 1 {
version = 888
if err = dag.setHead(oid, version); err != nil {
t.Fatalf("Cannot set new object head %d (%d) in DAG file %s", oid, version, dagfile)
}
dag.flush()
}
}
dag.close()
}
// checkEndOfSync simulates and check the end-of-sync operations: clear the
// node grafting metadata and verify that it is empty and that HasConflict()
// detects this case and fails, then close the DAG.
func checkEndOfSync(d *dag, oid storage.ID) error {
// Clear grafting info; this happens at the end of a sync log replay.
d.clearGraft()
// There should be no grafting info, and hasConflict() should fail.
newHead, grafts := d.getGraftNodes(oid)
if newHead != 0 || grafts != nil {
return fmt.Errorf("Object %d: graft info not cleared: newhead (%d), grafts (%v)", oid, newHead, grafts)
}
isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
if errConflict == nil {
return fmt.Errorf("Object %d: conflict did not fail: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
d.close()
return nil
}
// TestLocalUpdates tests the sync handling of initial local updates: an object
// is created (v0) and updated twice (v1, v2) on this device. The DAG should
// show: v0 -> v1 -> v2 and the head should point to v2.
func TestLocalUpdates(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
t.Fatal(err)
}
// The head must have moved to "v2" and the parent map shows the updated DAG.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
if head, e := dag.getHead(oid); e != nil || head != 2 {
t.Errorf("Invalid object %d head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
// Make sure an existing node cannot be added again.
if err = dag.addNode(oid, 1, false, []storage.Version{0, 2}, "foobar"); err == nil {
t.Errorf("addNode() did not fail when given an existing node")
}
// Make sure a new node cannot have more than 2 parents.
if err = dag.addNode(oid, 3, false, []storage.Version{0, 1, 2}, "foobar"); err == nil {
t.Errorf("addNode() did not fail when given 3 parents")
}
// Make sure a new node cannot have an invalid parent.
if err = dag.addNode(oid, 3, false, []storage.Version{0, 555}, "foobar"); err == nil {
t.Errorf("addNode() did not fail when using an invalid parent")
}
// Make sure a new root node (no parents) cannot be added once a root exists.
// For the parents array, check both the "nil" and the empty array as input.
if err = dag.addNode(oid, 6789, false, nil, "foobar"); err == nil {
t.Errorf("Adding a 2nd root node (nil parents) for object %d in DAG file %s did not fail", oid, dagfile)
}
if err = dag.addNode(oid, 6789, false, []storage.Version{}, "foobar"); err == nil {
t.Errorf("Adding a 2nd root node (empty parents) for object %d in DAG file %s did not fail", oid, dagfile)
}
if err := checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestRemoteUpdates tests the sync handling of initial remote updates:
// an object is created (v0) and updated twice (v1, v2) on another device and
// we learn about it during sync. The updated DAG should show: v0 -> v1 -> v2
// and report no conflicts with the new head pointing at v2.
func TestRemoteUpdates(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "remote-init-00.sync"); err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still undefined) and the parent
// map shows the newly grafted DAG fragment.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
if head, e := dag.getHead(oid); e == nil {
t.Errorf("Object %d head found in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
// Verify the grafting of remote nodes.
newHead, grafts := dag.getGraftNodes(oid)
if newHead != 2 {
t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
}
expgrafts := map[storage.Version]uint64{}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
if !(!isConflict && newHead == 2 && oldHead == 0 && ancestor == 0 && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, e := dag.getLogrec(oid, newHead); e != nil || logrec != "logrec-02" {
t.Errorf("Invalid logrec for newhead object %d:%d in DAG file %s: %v", oid, newHead, dagfile, logrec)
}
// Make sure an unknown node cannot become the new head.
if err = dag.moveHead(oid, 55); err == nil {
t.Errorf("moveHead() did not fail on an invalid node")
}
// Then we can move the head and clear the grafting data.
if err = dag.moveHead(oid, newHead); err != nil {
t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
}
if err := checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestRemoteNoConflict tests sync of remote updates on top of a local initial
// state without conflict. An object is created locally and updated twice
// (v0 -> v1 -> v2). Another device, having gotten this info, makes 3 updates
// on top of that (v2 -> v3 -> v4 -> v5) and sends this info in a later sync.
// The updated DAG should show (v0 -> v1 -> v2 -> v3 -> v4 -> v5) and report
// no conflicts with the new head pointing at v5. It should also report v2 as
// the graft point on which the new fragment (v3 -> v4 -> v5) gets attached.
func TestRemoteNoConflict(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-noconf-00.sync"); err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
if head, e := dag.getHead(oid); e != nil || head != 2 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {2}, 4: {3}, 5: {4}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
// Verify the grafting of remote nodes.
newHead, grafts := dag.getGraftNodes(oid)
if newHead != 5 {
t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
}
expgrafts := map[storage.Version]uint64{2: 2}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be no conflict.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
if !(!isConflict && newHead == 5 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, e := dag.getLogrec(oid, oldHead); e != nil || logrec != "logrec-02" {
t.Errorf("Invalid logrec for oldhead object %d:%d in DAG file %s: %v", oid, oldHead, dagfile, logrec)
}
if logrec, e := dag.getLogrec(oid, newHead); e != nil || logrec != "logrec-05" {
t.Errorf("Invalid logrec for newhead object %d:%d in DAG file %s: %v", oid, newHead, dagfile, logrec)
}
// Then we can move the head and clear the grafting data.
if err = dag.moveHead(oid, newHead); err != nil {
t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
}
// Clear the grafting data and verify that hasConflict() fails without it.
dag.clearGraft()
isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
if errConflict == nil {
t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if err := checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestRemoteConflict tests sync handling remote updates that build on the
// local initial state and trigger a conflict. An object is created locally
// and updated twice (v0 -> v1 -> v2). Another device, having only gotten
// the v0 -> v1 history, makes 3 updates on top of v1 (v1 -> v3 -> v4 -> v5)
// and sends this info during a later sync. Separately, the local device
// makes a conflicting (concurrent) update v1 -> v2. The updated DAG should
// show the branches: (v0 -> v1 -> v2) and (v0 -> v1 -> v3 -> v4 -> v5) and
// report the conflict between v2 and v5 (current and new heads). It should
// also report v1 as the graft point and the common ancestor in the conflict.
// The conflict is resolved locally by creating v6 that is derived from both
// v2 and v5 and it becomes the new head.
func TestRemoteConflict(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-conf-00.sync"); err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
if head, e := dag.getHead(oid); e != nil || head != 2 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {1}, 4: {3}, 5: {4}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
// Verify the grafting of remote nodes.
newHead, grafts := dag.getGraftNodes(oid)
if newHead != 5 {
t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
}
expgrafts := map[storage.Version]uint64{1: 1}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be a conflict between v2 and v5 with v1 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
if !(isConflict && newHead == 5 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, e := dag.getLogrec(oid, oldHead); e != nil || logrec != "logrec-02" {
t.Errorf("Invalid logrec for oldhead object %d:%d in DAG file %s: %v", oid, oldHead, dagfile, logrec)
}
if logrec, e := dag.getLogrec(oid, newHead); e != nil || logrec != "logrec-05" {
t.Errorf("Invalid logrec for newhead object %d:%d in DAG file %s: %v", oid, newHead, dagfile, logrec)
}
if logrec, e := dag.getLogrec(oid, ancestor); e != nil || logrec != "logrec-01" {
t.Errorf("Invalid logrec for ancestor object %d:%d in DAG file %s: %v", oid, ancestor, dagfile, logrec)
}
// Resolve the conflict by adding a new local v6 derived from v2 and v5 (this replay moves the head).
if err = dagReplayCommands(dag, "local-resolve-00.sync"); err != nil {
t.Fatal(err)
}
// Verify that the head moved to v6 and the parent map shows the resolution.
if head, e := dag.getHead(oid); e != nil || head != 6 {
t.Errorf("Object %d has wrong head after conflict resolution in DAG file %s: %d", oid, dagfile, head)
}
exp[6] = []storage.Version{2, 5}
pmap = dag.getParentMap(oid)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map after conflict resolution in DAG file %s: (%v) instead of (%v)",
oid, dagfile, pmap, exp)
}
if err := checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestRemoteConflictTwoGrafts tests sync handling remote updates that build
// on the local initial state and trigger a conflict with 2 graft points.
// An object is created locally and updated twice (v0 -> v1 -> v2). Another
// device, first learns about v0 and makes it own conflicting update v0 -> v3.
// That remote device later learns about v1 and resolves the v1/v3 confict by
// creating v4. Then it makes a last v4 -> v5 update -- which will conflict
// with v2 but it doesn't know that.
// Now the sync order is reversed and the local device learns all of what
// happened on the remote device. The local DAG should get be augmented by
// a subtree with 2 graft points: v0 and v1. It receives this new branch:
// v0 -> v3 -> v4 -> v5. Note that v4 is also derived from v1 as a remote
// conflict resolution. This should report a conflict between v2 and v5
// (current and new heads), with v0 and v1 as graft points, and v1 as the
// most-recent common ancestor for that conflict. The conflict is resolved
// locally by creating v6, derived from both v2 and v5, becoming the new head.
func TestRemoteConflictTwoGrafts(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
t.Fatal(err)
}
if err = dagReplayCommands(dag, "remote-conf-01.sync"); err != nil {
t.Fatal(err)
}
// The head must not have moved (i.e. still at v2) and the parent map
// shows the newly grafted DAG fragment on top of the prior DAG.
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
if head, e := dag.getHead(oid); e != nil || head != 2 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0}, 4: {1, 3}, 5: {4}}
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
// Verify the grafting of remote nodes.
newHead, grafts := dag.getGraftNodes(oid)
if newHead != 5 {
t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
}
expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
if !reflect.DeepEqual(grafts, expgrafts) {
t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
}
// There should be a conflict between v2 and v5 with v1 as ancestor.
isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
if !(isConflict && newHead == 5 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
if logrec, e := dag.getLogrec(oid, oldHead); e != nil || logrec != "logrec-02" {
t.Errorf("Invalid logrec for oldhead object %d:%d in DAG file %s: %v", oid, oldHead, dagfile, logrec)
}
if logrec, e := dag.getLogrec(oid, newHead); e != nil || logrec != "logrec-05" {
t.Errorf("Invalid logrec for newhead object %d:%d in DAG file %s: %v", oid, newHead, dagfile, logrec)
}
if logrec, e := dag.getLogrec(oid, ancestor); e != nil || logrec != "logrec-01" {
t.Errorf("Invalid logrec for ancestor object %d:%d in DAG file %s: %v", oid, ancestor, dagfile, logrec)
}
// Resolve the conflict by adding a new local v6 derived from v2 and v5 (this replay moves the head).
if err = dagReplayCommands(dag, "local-resolve-00.sync"); err != nil {
t.Fatal(err)
}
// Verify that the head moved to v6 and the parent map shows the resolution.
if head, e := dag.getHead(oid); e != nil || head != 6 {
t.Errorf("Object %d has wrong head after conflict resolution in DAG file %s: %d", oid, dagfile, head)
}
exp[6] = []storage.Version{2, 5}
pmap = dag.getParentMap(oid)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map after conflict resolution in DAG file %s: (%v) instead of (%v)",
oid, dagfile, pmap, exp)
}
if err := checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestAncestorIterator checks that the iterator goes over the correct set
// of ancestor nodes for an object given a starting node. It should traverse
// reconvergent DAG branches only visiting each ancestor once:
// v0 -> v1 -> v2 -> v4 -> v5 -> v7 -> v8
// |--> v3 ---| |
// +--> v6 ---------------+
// - Starting at v0 it should only cover v0.
// - Starting at v2 it should only cover v0-v2.
// - Starting at v5 it should only cover v0-v5.
// - Starting at v8 it should cover all nodes (v0-v8).
func TestAncestorIterator(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
// Loop checking the iteration behavior for different starting nodes.
for _, start := range []storage.Version{0, 2, 5, 8} {
visitCount := make(map[storage.Version]int)
err = dag.ancestorIter(oid, []storage.Version{start},
func(oid storage.ID, v storage.Version, node *dagNode) error {
visitCount[v]++
return nil
})
// Check that all prior nodes are visited only once.
for i := storage.Version(0); i < (start + 1); i++ {
if visitCount[i] != 1 {
t.Errorf("wrong visit count for iter on object %d node %d starting from node %d: %d instead of 1",
oid, i, start, visitCount[i])
}
}
}
// Make sure an error in the callback is returned through the iterator.
cbErr := errors.New("callback error")
err = dag.ancestorIter(oid, []storage.Version{8}, func(oid storage.ID, v storage.Version, node *dagNode) error {
if v == 0 {
return cbErr
}
return nil
})
if err != cbErr {
t.Errorf("wrong error returned from callback: %v instead of %v", err, cbErr)
}
if err = checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestPruning tests sync pruning of the DAG for an object with 3 concurrent
// updates (i.e. 2 conflict resolution convergent points). The pruning must
// get rid of the DAG branches across the reconvergence points:
// v0 -> v1 -> v2 -> v4 -> v5 -> v7 -> v8
// |--> v3 ---| |
// +--> v6 ---------------+
// By pruning at v0, nothing is deleted.
// Then by pruning at v1, only v0 is deleted.
// Then by pruning at v5, v1-v4 are deleted leaving v5 and "v6 -> v7 -> v8".
// Then by pruning at v7, v5-v6 are deleted leaving "v7 -> v8".
// Then by pruning at v8, v7 is deleted leaving v8 as the head.
// Then by pruning again at v8 nothing changes.
func TestPruning(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {1}, 4: {2, 3}, 5: {4}, 6: {1}, 7: {5, 6}, 8: {7}}
// Loop pruning at an invalid version (333) then at v0, v5, v8 and again at v8.
testVersions := []storage.Version{333, 0, 1, 5, 7, 8, 8}
delCounts := []int{0, 0, 1, 4, 2, 1, 0}
for i, version := range testVersions {
del := 0
err = dag.prune(oid, version, func(lr string) error {
del++
return nil
})
if i == 0 && err == nil {
t.Errorf("pruning non-existent object %d:%d did not fail in DAG file %s", oid, version, dagfile)
} else if i > 0 && err != nil {
t.Errorf("pruning object %d:%d failed in DAG file %s: %v", oid, version, dagfile, err)
}
if del != delCounts[i] {
t.Errorf("pruning object %d:%d deleted %d log records instead of %d", oid, version, del, delCounts[i])
}
if head, err := dag.getHead(oid); err != nil || head != 8 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
// Remove pruned nodes from the expected parent map used to validate
// and set the parents of the pruned node to nil.
if version < 10 {
for j := storage.Version(0); j < version; j++ {
delete(exp, j)
}
exp[version] = nil
}
pmap := dag.getParentMap(oid)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
}
if err = checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestPruningCallbackError tests sync pruning of the DAG when the callback
// function returns an error. The pruning must try to delete as many nodes
// and log records as possible and properly adjust the parent pointers of
// the pruning node. The object DAG is:
// v0 -> v1 -> v2 -> v4 -> v5 -> v7 -> v8
// |--> v3 ---| |
// +--> v6 ---------------+
// By pruning at v8 and having the callback function fail for v3, all other
// nodes must be deleted and only v8 remains as the head.
func TestPruningCallbackError(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
if err = dagReplayCommands(dag, "local-init-01.sync"); err != nil {
t.Fatal(err)
}
oid, err := strToObjID("12345")
if err != nil {
t.Fatal(err)
}
exp := map[storage.Version][]storage.Version{8: nil}
// Prune at v8 with a callback function that fails for v3.
del, expDel := 0, 8
version := storage.Version(8)
err = dag.prune(oid, version, func(lr string) error {
del++
if lr == "logrec-03" {
return fmt.Errorf("refuse to delete %s", lr)
}
return nil
})
if err == nil {
t.Errorf("pruning object %d:%d did not fail in DAG file %s", oid, version, dagfile)
}
if del != expDel {
t.Errorf("pruning object %d:%d deleted %d log records instead of %d", oid, version, del, expDel)
}
if head, err := dag.getHead(oid); err != nil || head != 8 {
t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
}
pmap := dag.getParentMap(oid)
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
}
if err = checkEndOfSync(dag, oid); err != nil {
t.Fatal(err)
}
}
// TestDAGCompact tests compacting of dag's kvdb file.
func TestDAGCompact(t *testing.T) {
dagfile := dagFilename()
defer os.Remove(dagfile)
dag, err := openDAG(dagfile)
if err != nil {
t.Fatalf("Cannot open new DAG file %s", dagfile)
}
// Put some data in "heads" table.
headMap := make(map[storage.ID]storage.Version)
for i := 0; i < 10; i++ {
// Generate a random object id in [0, 1000).
oid, err := strToObjID(fmt.Sprintf("%d", rand.Intn(1000)))
if err != nil {
t.Fatal(err)
}
// Generate a random version number for this object.
vers := storage.Version(rand.Intn(5000))
// Cache this <oid,version> pair to verify with getHead().
headMap[oid] = vers
if err = dag.setHead(oid, vers); err != nil {
t.Fatalf("Cannot set object head %d (%d) in DAG file %s", oid, vers, dagfile)
}
// Flush immediately to let the kvdb file grow.
dag.flush()
}
// Put some data in "nodes" table.
type nodeKey struct {
oid storage.ID
vers storage.Version
}
nodeMap := make(map[nodeKey]*dagNode)
for oid, vers := range headMap {
// Generate a random dag node for this <oid, vers>.
l := uint64(rand.Intn(20))
p1 := storage.Version(rand.Intn(5000))
p2 := storage.Version(rand.Intn(5000))
log := fmt.Sprintf("%d", rand.Intn(1000))
node := &dagNode{Level: l, Parents: []storage.Version{p1, p2}, Logrec: log}
// Cache this <oid,version, dagNode> to verify with getNode().
key := nodeKey{oid: oid, vers: vers}
nodeMap[key] = node
if err = dag.setNode(oid, vers, node); err != nil {
t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, vers, node, dagfile)
}
// Flush immediately to let the kvdb file grow.
dag.flush()
}
// Get size before compaction.
oldSize := fileSize(dagfile)
if oldSize < 0 {
t.Fatalf("DAG file %s not created", dagfile)
}
if err = dag.compact(); err != nil {
t.Fatalf("Cannot compact DAG file %s", dagfile)
}
// Verify size of kvdb file is reduced.
size := fileSize(dagfile)
if size < 0 {
t.Fatalf("DAG file %s not created", dagfile)
}
if size > oldSize {
t.Fatalf("DAG file %s not compacted", dagfile)
}
// Check data exists after compaction.
for oid, vers := range headMap {
vers2, err := dag.getHead(oid)
if err != nil {
t.Errorf("Cannot find stored object head %d in DAG file %s", oid, dagfile)
}
if vers != vers2 {
t.Errorf("Object %d has wrong head data in DAG file %s: %d instead of %d",
oid, dagfile, vers2, vers)
}
}
for key, node := range nodeMap {
node2, err := dag.getNode(key.oid, key.vers)
if err != nil || node2 == nil {
t.Errorf("Cannot find stored object %d:%d in DAG file %s", key.oid, key.vers, dagfile)
}
if !reflect.DeepEqual(node, node2) {
t.Errorf("Object %d:%d has wrong data in DAG file %s: %v instead of %v",
key.oid, key.vers, dagfile, node2, node)
}
}
dag.close()
}