blob: b8eb1ea69bd727972976e8b6e81be281a1e7ddf1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The initiator tests below are driven by replaying the state from the log
// files (in testdata directory). These log files may mimic watching the
// Database locally (addl commands in the log file) or obtaining log records and
// generation vector from a remote peer (addr, genvec commands). The log files
// contain the metadata of log records. The log files are only used to set up
// the state. The tests verify that given a particular local state and a stream
// of remote deltas, the initiator behaves as expected.
package vsync
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
wire "v.io/v23/services/syncbase"
"v.io/v23/vom"
"v.io/x/lib/set"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store/watchable"
)
// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
// in file testdata/remote-init-00.log.sync.
func TestLogStreamRemoteOnly(t *testing.T) {
svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync", false)
defer cleanup()
// Check all log records.
objid := common.JoinKeyParts(common.RowPrefix, "c", "foo1")
var gen uint64
var parents []string
for gen = 1; gen < 4; gen++ {
gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, 11, gen)
if err != nil || gotRec == nil {
t.Fatalf("getLogRec can not find object %s 11 %d, err %v", logDataPrefix, gen, err)
}
vers := fmt.Sprintf("%d", gen)
wantRec := &LocalLogRec{
Metadata: interfaces.LogRecMetadata{
Id: 11,
Gen: gen,
RecType: interfaces.NodeRec,
ObjId: objid,
CurVers: vers,
Parents: parents,
UpdTime: constTime,
BatchCount: 1,
},
Pos: gen - 1,
}
if !reflect.DeepEqual(gotRec, wantRec) {
t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
}
// Verify DAG state.
if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
}
// Verify Database state.
tx := createDatabase(t, svc).St().NewWatchableTransaction()
if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
}
tx.Abort()
parents = []string{vers}
}
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if st.isConflict {
t.Fatalf("Detected a conflict %v", st)
}
if st.newHead != "3" || st.oldHead != NoVersion {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
// Verify genvec state.
wantVecs := interfaces.Knowledge{
"c\xfefoo1": interfaces.GenVector{11: 3},
"c\xfebar": interfaces.GenVector{11: 0},
}
if !reflect.DeepEqual(iSt.updLocal, wantVecs) {
t.Fatalf("Final local gen vec mismatch got %v, want %v", iSt.updLocal, wantVecs)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "3" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
db := createDatabase(t, svc)
valbuf, err := db.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v: %+v %+v", err, valbuf, objid)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
}
tx := db.St().NewWatchableTransaction()
version, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(version) != "3" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(version), err)
}
tx.Abort()
}
// TestLogStreamNoConflict tests that a local and a remote log stream can be
// correctly applied (when there are no conflicts). Commands are in files
// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
func TestLogStreamNoConflict(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync", false)
defer cleanup()
objid := common.JoinKeyParts(common.RowPrefix, "c\xfefoo1")
// Check all log records.
var version uint64 = 1
var parents []string
for _, devid := range []uint64{10, 11} {
var gen uint64
for gen = 1; gen < 4; gen++ {
gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, devid, gen)
if err != nil || gotRec == nil {
t.Fatalf("getLogRec can not find object %s:%d:%d, err %v",
logDataPrefix, devid, gen, err)
}
vers := fmt.Sprintf("%d", version)
wantRec := &LocalLogRec{
Metadata: interfaces.LogRecMetadata{
Id: devid,
Gen: gen,
RecType: interfaces.NodeRec,
ObjId: objid,
CurVers: vers,
Parents: parents,
UpdTime: constTime,
BatchCount: 1,
},
Pos: gen - 1,
}
if !reflect.DeepEqual(gotRec, wantRec) {
t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
}
// Verify DAG state.
if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
}
// Verify Database state.
tx := createDatabase(t, svc).St().NewWatchableTransaction()
if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
}
tx.Abort()
parents = []string{vers}
version++
}
}
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if st.isConflict {
t.Fatalf("Detected a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
// Verify genvec state.
wantVecs := interfaces.Knowledge{
"c\xfefoo1": interfaces.GenVector{11: 3},
"c\xfebar": interfaces.GenVector{11: 0},
}
if !reflect.DeepEqual(iSt.updLocal, wantVecs) {
t.Fatalf("Final local gen vec failed got %v, want %v", iSt.updLocal, wantVecs)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
db := createDatabase(t, svc)
valbuf, err := db.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, val, err)
}
tx := db.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
// TestLogStreamConflict tests that a local and a remote log stream can be
// correctly applied when there are conflicts. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
func TestLogStreamConflict(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync", false)
defer cleanup()
objid := common.JoinKeyParts(common.RowPrefix, "c\xfefoo1")
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if !st.isConflict {
t.Fatalf("Didn't detect a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "2" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
if st.res.ty != pickRemote {
t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
db := createDatabase(t, svc)
valbuf, err := db.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
}
tx := db.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
// TestLogStreamConflictNoAncestor tests that a local and a remote log stream
// can be correctly applied when there are conflicts from the start where the
// two versions of an object have no common ancestor. Commands are in files
// testdata/<local-init-00.log.sync,remote-conf-03.log.sync>.
func TestLogStreamConflictNoAncestor(t *testing.T) {
svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync", false)
defer cleanup()
objid := common.JoinKeyParts(common.RowPrefix, "c\xfefoo1")
// Verify conflict state.
if len(iSt.updObjects) != 1 {
t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
}
st := iSt.updObjects[objid]
if !st.isConflict {
t.Fatalf("Didn't detect a conflict %v", st)
}
if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "" {
t.Fatalf("Conflict detection didn't succeed %v", st)
}
if st.res.ty != pickRemote {
t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
}
// Verify DAG state.
if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
}
// Verify Database state.
db := createDatabase(t, svc)
valbuf, err := db.St().Get([]byte(objid), nil)
var val string
if err := vom.Decode(valbuf, &val); err != nil {
t.Fatalf("Value decode failed, err %v", err)
}
if err != nil || val != "abc" {
t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
}
tx := db.St().NewTransaction()
versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
if err != nil || string(versbuf) != "6" {
t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
}
tx.Abort()
}
//////////////////////////////
// Helpers.
func testInit(t *testing.T, lfile, rfile string, sg bool) (*mockService, *initiationState, func()) {
// Set a large value to prevent the initiator from running.
peerSyncInterval = 1 * time.Hour
svc := createService(t)
cleanup := func() {
destroyService(t, svc)
}
// If there's an error during testInit, we bail out and should clean up after
// ourselves.
var err error
defer func() {
if err != nil {
cleanup()
}
}()
now, err := svc.vclock.Now()
if err != nil {
t.Fatalf("unable to get time: %v\n", err)
}
s := svc.sync
s.id = 10 // initiator
sgId1 := interfaces.GroupId("1234")
nullInfo := wire.SyncgroupMemberInfo{}
sgInfo := sgMember{
sgId1: interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
}
info := &memberInfo{
db2sg: map[wire.Id]sgMember{
mockDbId: sgInfo,
},
mtTables: map[string]struct{}{
"1/2/3/4": struct{}{},
"5/6/7/8": struct{}{},
},
}
sg1 := &interfaces.Syncgroup{
Id: wire.Id{Name: "sg1", Blessing: "b1"},
DbId: mockDbId,
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: wire.SyncgroupSpec{
Collections: []wire.Id{makeCxId("foo"), makeCxId("bar")},
Perms: mockSgPerms,
MountTables: []string{"1/2/3/4", "5/6/7/8"},
},
Joiners: map[string]interfaces.SyncgroupMemberState{
"a": interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
"b": interfaces.SyncgroupMemberState{WhenUpdated: now.Unix(), MemberInfo: nullInfo},
},
}
tx := createDatabase(t, svc).St().NewWatchableTransaction()
if err = s.addSyncgroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
t.Fatalf("cannot add syncgroup %v, err %v", sg1.Id, err)
}
if err = tx.Commit(); err != nil {
t.Fatalf("cannot commit adding syncgroup %v, err %v", sg1.Id, err)
}
if lfile != "" {
replayLocalCommands(t, svc, lfile)
}
if rfile == "" {
return svc, nil, cleanup
}
var c *initiationConfig
if c, err = newInitiationConfig(nil, s, mockDbId, info.db2sg[mockDbId], connInfo{relName: "b", mtTbls: set.String.ToSlice(info.mtTables)}); err != nil {
t.Fatalf("newInitiationConfig failed with err %v", err)
}
iSt := newInitiationState(nil, c, sg)
sgs := make(sgSet)
sgs[sgId1] = struct{}{}
iSt.config.sharedSgPfxs = map[string]sgSet{
toCollectionPrefixStr(sg1.Spec.Collections[0]): sgs,
toCollectionPrefixStr(sg1.Spec.Collections[1]): sgs,
}
sort.Strings(iSt.config.peer.mtTbls)
sort.Strings(sg1.Spec.MountTables)
if !reflect.DeepEqual(iSt.config.peer.mtTbls, sg1.Spec.MountTables) {
// Set err so that the deferred cleanup func runs.
err = fmt.Errorf("Mount tables are not equal: config %v, spec %v", iSt.config.peer.mtTbls, sg1.Spec.MountTables)
t.Fatal(err)
}
s.initSyncStateInMem(nil, mockDbId, sgOID(sgId1))
iSt.stream = createReplayStream(t, rfile)
var wantVecs interfaces.Knowledge
if sg {
if err = iSt.prepareSGDeltaReq(nil); err != nil {
t.Fatalf("prepareSGDeltaReq failed with err %v", err)
}
sg := string(sgId1)
wantVecs = interfaces.Knowledge{
sg: interfaces.GenVector{10: 0},
}
} else {
if err = iSt.prepareDataDeltaReq(nil); err != nil {
t.Fatalf("prepareDataDeltaReq failed with err %v", err)
}
wantVecs = interfaces.Knowledge{
"mockuser,foo\xfe": interfaces.GenVector{10: 0},
"mockuser,bar\xfe": interfaces.GenVector{10: 0},
}
}
if !reflect.DeepEqual(iSt.local, wantVecs) {
// Set err so that the deferred cleanup func runs.
err = fmt.Errorf("createLocalGenVec failed: got %v, want %v", iSt.local, wantVecs)
t.Fatal(err)
}
if err = iSt.recvAndProcessDeltas(nil); err != nil {
t.Fatalf("recvAndProcessDeltas failed with err %v", err)
}
if err = iSt.processUpdatedObjects(nil); err != nil {
t.Fatalf("processUpdatedObjects failed with err %v", err)
}
return svc, iSt, cleanup
}