blob: af09ce110e9106eacb49368f4a7702dc1ceb24db [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The initiator tests below are driven by replaying the state from the log
// files (in testdata directory). These log files may mimic watching the
// Database locally (addl commands in the log file) or obtaining log records and
// generation vector from a remote peer (addr, genvec commands). The log files
// contain the metadata of log records. The log files are only used to set up
// the state. The tests verify that given a particular local state and a stream
// of remote deltas, the initiator behaves as expected.
package vsync
import (
"fmt"
"reflect"
"testing"
"time"
"v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/v23/vdl"
"v.io/v23/vom"
_ "v.io/x/ref/runtime/factories/generic"
)
func TestExtractBlobRefs(t *testing.T) {
var tests [][]byte
br := nosql.BlobRef("123")
// BlobRef is the value.
buf0, err := vom.Encode(br)
if err != nil {
t.Fatalf("Encode(BlobRef) failed, err %v", err)
}
tests = append(tests, buf0)
// Struct contains BlobRef.
type test1Struct struct {
A int64
B string
C nosql.BlobRef
}
v1 := test1Struct{A: 10, B: "foo", C: br}
buf1, err := vom.Encode(v1)
if err != nil {
t.Fatalf("Encode(test1Struct) failed, err %v", err)
}
tests = append(tests, buf1)
// Nested struct contains BlobRef.
type test2Struct struct {
A int64
B string
C test1Struct
}
v2 := test2Struct{A: 10, B: "foo", C: v1}
buf2, err := vom.Encode(v2)
if err != nil {
t.Fatalf("Encode(test2Struct) failed, err %v", err)
}
tests = append(tests, buf2)
for i, buf := range tests {
var val *vdl.Value
if err := vom.Decode(buf, &val); err != nil {
t.Fatalf("Decode failed (test %d), err %v", i, err)
}
gotbrs := make(map[nosql.BlobRef]struct{})
if err := extractBlobRefs(val, gotbrs); err != nil {
t.Fatalf("extractBlobRefs failed (test %d), err %v", i, err)
}
wantbrs := map[nosql.BlobRef]struct{}{br: struct{}{}}
if !reflect.DeepEqual(gotbrs, wantbrs) {
t.Fatalf("Data mismatch in blobrefs (test %d), got %v, want %v", i, gotbrs, wantbrs)
}
}
}
// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
// in file testdata/remote-init-00.log.sync.
func TestLogStreamRemoteOnly(t *testing.T) {
svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync")
defer cleanup(t, svc)
// Check all log records.
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
var gen uint64
var parents []string
for gen = 1; gen < 4; gen++ {
gotRec, err := getLogRec(nil, svc.St(), 11, gen)
if err != nil || gotRec == nil {
t.Fatalf("getLogRec can not find object 11 %d, err %v", gen, err)
}
vers := fmt.Sprintf("%d", gen)
wantRec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
Id: 11,
Gen: gen,
RecType: interfaces.NodeRec,
ObjId: objid,
CurVers: vers,
Parents: parents,
UpdTime: constTime,
BatchCount: 1,
},
Pos: gen - 1,
}
if !reflect.DeepEqual(gotRec, wantRec) {
t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
}
// Verify DAG state.
if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
}
// Verify Database state.
tx := svc.St().NewTransaction()
if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
}
tx.Abort()
parents = []string{vers}
}
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if st.isConflict {
t.Fatalf("Detected a conflict %v", st)
}
if st.newHead != "3" || st.oldHead != NoVersion {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
// Verify genvec state.
wantVec := interfaces.GenVector{
"foo1": interfaces.PrefixGenVector{11: 3},
"bar": interfaces.PrefixGenVector{11: 0},
}
if !reflect.DeepEqual(iSt.updLocal, wantVec) {
t.Fatalf("Final local gen vec mismatch got %v, want %v", iSt.updLocal, wantVec)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "3" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
valbuf, err := svc.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
}
tx := svc.St().NewTransaction()
version, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(version) != "3" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(version), err)
}
tx.Abort()
}
// TestLogStreamNoConflict tests that a local and a remote log stream can be
// correctly applied (when there are no conflicts). Commands are in files
// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
func TestLogStreamNoConflict(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync")
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
// Check all log records.
var version uint64 = 1
var parents []string
for _, devid := range []uint64{10, 11} {
var gen uint64
for gen = 1; gen < 4; gen++ {
gotRec, err := getLogRec(nil, svc.St(), devid, gen)
if err != nil || gotRec == nil {
t.Fatalf("getLogRec can not find object %d:%d, err %v",
devid, gen, err)
}
vers := fmt.Sprintf("%d", version)
wantRec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
Id: devid,
Gen: gen,
RecType: interfaces.NodeRec,
ObjId: objid,
CurVers: vers,
Parents: parents,
UpdTime: constTime,
BatchCount: 1,
},
Pos: gen - 1,
}
if !reflect.DeepEqual(gotRec, wantRec) {
t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
}
// Verify DAG state.
if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
}
// Verify Database state.
tx := svc.St().NewTransaction()
if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
}
tx.Abort()
parents = []string{vers}
version++
}
}
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if st.isConflict {
t.Fatalf("Detected a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
// Verify genvec state.
wantVec := interfaces.GenVector{
"foo1": interfaces.PrefixGenVector{11: 3},
"bar": interfaces.PrefixGenVector{11: 0},
}
if !reflect.DeepEqual(iSt.updLocal, wantVec) {
t.Fatalf("Final local gen vec failed got %v, want %v", iSt.updLocal, wantVec)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
valbuf, err := svc.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
}
tx := svc.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
// TestLogStreamConflict tests that a local and a remote log stream can be
// correctly applied when there are conflicts. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
func TestLogStreamConflict(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync")
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if !st.isConflict {
t.Fatalf("Didn't detect a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "2" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
if st.res.ty != pickRemote {
t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
valbuf, err := svc.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
}
tx := svc.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
// TestLogStreamConflictNoAncestor tests that a local and a remote log stream
// can be correctly applied when there are conflicts from the start where the
// two versions of an object have no common ancestor. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-03.log.sync>.
func TestLogStreamConflictNoAncestor(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync")
defer cleanup(t, svc)
objid := util.JoinKeyParts(util.RowPrefix, "foo1")
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if !st.isConflict {
t.Fatalf("Didn't detect a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
if st.res.ty != pickRemote {
t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
valbuf, err := svc.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
}
tx := svc.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
//////////////////////////////
// Helpers.
func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
// Set a large value to prevent the initiator from running.
peerSyncInterval = 1 * time.Hour
conflictResolutionPolicy = useTime
svc := createService(t)
cleanup := destroyService
s := svc.sync
s.id = 10 // initiator
sgId1 := interfaces.GroupId(1234)
nullInfo := nosql.SyncGroupMemberInfo{}
sgInfo := sgMemberInfo{
sgId1: nullInfo,
}
sg1 := &interfaces.SyncGroup{
Name: "sg1",
Id: sgId1,
AppName: "mockapp",
DbName: "mockdb",
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: nosql.SyncGroupSpec{
Prefixes: []string{"foo", "bar"},
MountTables: []string{"1/2/3/4", "5/6/7/8"},
},
Joiners: map[string]nosql.SyncGroupMemberInfo{
"a": nullInfo,
"b": nullInfo,
},
}
tx := svc.St().NewTransaction()
if err := addSyncGroup(nil, tx, sg1); err != nil {
t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("cannot commit adding SyncGroup ID %d, err %v", sg1.Id, err)
}
if lfile != "" {
replayLocalCommands(t, svc, lfile)
}
if rfile == "" {
return svc, nil, cleanup
}
gdb := appDbName("mockapp", "mockdb")
iSt, err := newInitiationState(nil, s, "b", gdb, sgInfo)
if err != nil {
t.Fatalf("newInitiationState failed with err %v", err)
}
testIfSgPfxsEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
// Create local genvec so that it contains knowledge only about common prefixes.
if err := iSt.createLocalGenVec(nil); err != nil {
t.Fatalf("createLocalGenVec failed with err %v", err)
}
wantVec := interfaces.GenVector{
"foo": interfaces.PrefixGenVector{10: 0},
"bar": interfaces.PrefixGenVector{10: 0},
}
if !reflect.DeepEqual(iSt.local, wantVec) {
t.Fatalf("createLocalGenVec failed got %v, want %v", iSt.local, wantVec)
}
iSt.stream = createReplayStream(t, rfile)
if err := iSt.recvAndProcessDeltas(nil); err != nil {
t.Fatalf("recvAndProcessDeltas failed with err %v", err)
}
if err := iSt.processUpdatedObjects(nil); err != nil {
t.Fatalf("processUpdatedObjects failed with err %v", err)
}
return svc, iSt, cleanup
}
func testIfSgPfxsEqual(t *testing.T, m map[string]sgSet, a []string) {
aMap := arrToMap(a)
if len(aMap) != len(m) {
t.Fatalf("testIfSgPfxsEqual diff lengths, got %v want %v", aMap, m)
}
for p := range aMap {
if _, ok := m[p]; !ok {
t.Fatalf("testIfSgPfxsEqual want %v", p)
}
}
}
func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
aMap := arrToMap(a)
if !reflect.DeepEqual(m, aMap) {
t.Fatalf("testIfMapArrEqual failed map %v, arr %v", m, aMap)
}
}
func arrToMap(a []string) map[string]struct{} {
m := make(map[string]struct{})
for _, s := range a {
m[s] = struct{}{}
}
return m
}