blob: 640c96ae4c8d655a74ff5f568a22cbec8610c04b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"bytes"
"reflect"
"testing"
wire "v.io/v23/services/syncbase/nosql"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/services/syncbase/server/watchable"
"v.io/x/ref/services/syncbase/store"
)
/*
Test setup:
Group1:
Oid: x, isConflict = true, has Local update, has remote update, has ancestor, resolution = pickLocal
Group2:
Oid: b, isConflict = true, has Local update, remote deleted, has ancestor, resolution = pickRemote
Group3:
Oid: c, isConflict = true, Local deleted, has remote update, has ancestor, resolution = pickRemote
Group4:
Oid: p, isConflict = true, has Local update, has remote update, has ancestor, resolution = createNew
Group5:
Oid: y, isConflict = true, has Local update, has remote update, has no ancestor, resolution = pickRemote
Oid: z, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = pickRemote
Oid: e, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = createNew
Oid: f, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = pickLocal
Oid: g, isConflict = false, no local update, has remote update, has unknown ancestor, local head is deleted, resolution = pickLocal
Oid: a, local value rubberbanded in due to localBatch, resolution = createNew
localBatch: {y, a}
remoteBatch: {y, z, e, f}
*/
var (
updObjectsAppResolves = map[string]*objConflictState{
// group1
x: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
// group2
b: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
// group3
c: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
// group4
p: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
// group5
y: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
z: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
a: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, false /*hasRemote*/, false /*hasAncestor*/),
e: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
f: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
g: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
}
localBatchId uint64 = 34
remoteBatchId uint64 = 58
)
func createGroupedCrTestData() *groupedCrData {
groupedCrTestData := &groupedCrData{oids: map[string]bool{}}
var group *crGroup
// group1
group = newGroup()
addToGroup(group, x, NoBatchId, -1)
groupedCrTestData.oids[x] = true
groupedCrTestData.groups = append(groupedCrTestData.groups, group)
// group2
group = newGroup()
addToGroup(group, b, NoBatchId, -1)
groupedCrTestData.oids[b] = true
groupedCrTestData.groups = append(groupedCrTestData.groups, group)
// group3
group = newGroup()
addToGroup(group, c, NoBatchId, -1)
groupedCrTestData.oids[c] = true
groupedCrTestData.groups = append(groupedCrTestData.groups, group)
// group4
group = newGroup()
addToGroup(group, p, NoBatchId, -1)
groupedCrTestData.oids[p] = true
groupedCrTestData.groups = append(groupedCrTestData.groups, group)
// group5
group = newGroup()
addToGroup(group, y, localBatchId, wire.BatchSourceLocal)
addToGroup(group, y, remoteBatchId, wire.BatchSourceRemote)
addToGroup(group, z, remoteBatchId, wire.BatchSourceRemote)
addToGroup(group, e, remoteBatchId, wire.BatchSourceRemote)
addToGroup(group, f, remoteBatchId, wire.BatchSourceRemote)
addToGroup(group, g, remoteBatchId, wire.BatchSourceRemote)
addToGroup(group, a, localBatchId, wire.BatchSourceLocal)
groupedCrTestData.oids[y] = true
groupedCrTestData.oids[z] = true
groupedCrTestData.oids[a] = true
groupedCrTestData.oids[e] = true
groupedCrTestData.oids[f] = true
groupedCrTestData.groups = append(groupedCrTestData.groups, group)
return groupedCrTestData
}
func createAndSaveNodeAndLogRecDataAppResolves(iSt *initiationState) {
// group1
saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].oldHead, 24, false)
saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].newHead, 25, false)
saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].ancestor, 20, false)
// group2
saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].newHead, 23, true)
saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].ancestor, 15, false)
// group3
saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].oldHead, 56, true)
saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].ancestor, 15, false)
// group4
saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].ancestor, 15, false)
//group5
saveNodeAndLogRec(iSt.tx, y, updObjectsAppResolves[y].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, y, updObjectsAppResolves[y].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, z, updObjectsAppResolves[z].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, z, updObjectsAppResolves[z].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, e, updObjectsAppResolves[e].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, e, updObjectsAppResolves[e].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, f, updObjectsAppResolves[f].oldHead, 56, false)
saveNodeAndLogRec(iSt.tx, f, updObjectsAppResolves[f].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, g, updObjectsAppResolves[g].oldHead, 56, true)
saveNodeAndLogRec(iSt.tx, g, updObjectsAppResolves[g].newHead, 23, false)
saveNodeAndLogRec(iSt.tx, a, updObjectsAppResolves[a].oldHead, 56, false)
}
func writeVersionedValues(t *testing.T, iSt *initiationState) {
// group1
saveValue(t, iSt.tx, x, updObjectsAppResolves[x].oldHead)
saveValue(t, iSt.tx, x, updObjectsAppResolves[x].newHead)
saveValue(t, iSt.tx, x, updObjectsAppResolves[x].ancestor)
// group2
saveValue(t, iSt.tx, b, updObjectsAppResolves[b].oldHead)
saveValue(t, iSt.tx, b, updObjectsAppResolves[b].ancestor)
// group3
saveValue(t, iSt.tx, c, updObjectsAppResolves[c].newHead)
saveValue(t, iSt.tx, c, updObjectsAppResolves[c].ancestor)
// group4
saveValue(t, iSt.tx, p, updObjectsAppResolves[p].oldHead)
saveValue(t, iSt.tx, p, updObjectsAppResolves[p].newHead)
saveValue(t, iSt.tx, p, updObjectsAppResolves[p].ancestor)
// group5
saveValue(t, iSt.tx, y, updObjectsAppResolves[y].oldHead)
saveValue(t, iSt.tx, y, updObjectsAppResolves[y].newHead)
saveValue(t, iSt.tx, z, updObjectsAppResolves[z].oldHead)
saveValue(t, iSt.tx, z, updObjectsAppResolves[z].newHead)
saveValue(t, iSt.tx, e, updObjectsAppResolves[e].oldHead)
saveValue(t, iSt.tx, e, updObjectsAppResolves[e].newHead)
saveValue(t, iSt.tx, f, updObjectsAppResolves[f].oldHead)
saveValue(t, iSt.tx, f, updObjectsAppResolves[f].newHead)
// No value for oldHead for g as g was deleted on local.
saveValue(t, iSt.tx, g, updObjectsAppResolves[g].newHead)
saveValue(t, iSt.tx, a, updObjectsAppResolves[a].oldHead)
}
func setResInfoData(mockCrs *conflictResolverStream) {
// group1
addResInfo(mockCrs, x, wire.ValueSelectionLocal, nil, false)
// group2
addResInfo(mockCrs, b, wire.ValueSelectionRemote, nil, false)
// group3
addResInfo(mockCrs, c, wire.ValueSelectionRemote, nil, false)
// group4
addResInfo(mockCrs, p, wire.ValueSelectionOther, []byte("newValue"), false)
// group5
addResInfo(mockCrs, y, wire.ValueSelectionRemote, nil, true)
addResInfo(mockCrs, z, wire.ValueSelectionRemote, nil, true)
addResInfo(mockCrs, e, wire.ValueSelectionOther, []byte("newValue"), true)
addResInfo(mockCrs, f, wire.ValueSelectionLocal, nil, true)
addResInfo(mockCrs, g, wire.ValueSelectionLocal, nil, true)
addResInfo(mockCrs, a, wire.ValueSelectionOther, []byte("newValue"), false)
setMockCRStream(mockCrs)
}
func TestResolveViaApp(t *testing.T) {
service := createService(t)
defer destroyService(t, service)
iSt := &initiationState{
updObjects: updObjectsAppResolves,
tx: service.St().NewTransaction(),
config: &initiationConfig{
sync: service.sync,
db: newDb(t, service.sync),
},
}
createAndSaveNodeAndLogRecDataAppResolves(iSt)
writeVersionedValues(t, iSt)
mockCrs := &conflictResolverStream{}
setResInfoData(mockCrs)
groupedCrTestData := createGroupedCrTestData()
if err := iSt.resolveViaApp(nil, groupedCrTestData); err != nil {
t.Errorf("Error returned by resolveViaApp: %v", err)
}
// verify
verifyConflictInfos(t, mockCrs)
// group1
verifyResolution(t, iSt.updObjects, x, pickLocal)
// group2
verifyResolution(t, iSt.updObjects, b, pickRemote)
// group3
verifyResolution(t, iSt.updObjects, c, pickRemote)
// group4
verifyCreateNew(t, iSt.updObjects, p, false)
// group5
verifyResolution(t, iSt.updObjects, y, pickRemote)
verifyResolution(t, iSt.updObjects, z, pickRemote)
verifyCreateNew(t, iSt.updObjects, e, false)
verifyCreateNew(t, iSt.updObjects, f, false)
verifyCreateNew(t, iSt.updObjects, g, true)
verifyCreateNew(t, iSt.updObjects, a, false)
// verify batch ids
verifyBatchId(t, iSt.updObjects, NoBatchId, x, b, c, p)
bid := iSt.updObjects[y].res.batchId
if bid == NoBatchId {
t.Errorf("BatchId for group5 should not be NoBatchId")
}
verifyBatchId(t, iSt.updObjects, bid, y, z, e, f, a)
}
func verifyConflictInfos(t *testing.T, mockCrs *conflictResolverStream) {
var ci wire.ConflictInfo
if len(mockCrs.sendQ) != 12 {
t.Errorf("ConflictInfo count expected: %v, actual: %v", 9, len(mockCrs.sendQ))
}
// group1
ci = mockCrs.sendQ[0]
checkConflictRow(t, x, ci, []uint64{}, false /*localDeleted*/, false /*remoteDeleted*/)
checkContinued(t, mockCrs.sendQ[0:1])
// group2
ci = mockCrs.sendQ[1]
checkConflictRow(t, b, ci, []uint64{}, false /*localDeleted*/, true /*remoteDeleted*/)
checkContinued(t, mockCrs.sendQ[1:2])
// group3
ci = mockCrs.sendQ[2]
checkConflictRow(t, c, ci, []uint64{}, true /*localDeleted*/, false /*remoteDeleted*/)
checkContinued(t, mockCrs.sendQ[2:3])
// group4
ci = mockCrs.sendQ[3]
checkConflictRow(t, p, ci, []uint64{}, false /*localDeleted*/, false /*remoteDeleted*/)
checkContinued(t, mockCrs.sendQ[3:4])
// group5
batchMap := map[uint64]wire.ConflictInfo{}
batchMap[getBid(mockCrs.sendQ[4])] = mockCrs.sendQ[4]
batchMap[getBid(mockCrs.sendQ[5])] = mockCrs.sendQ[5]
ci = batchMap[localBatchId]
checkConflictBatch(t, localBatchId, ci, wire.BatchSourceLocal)
ci = batchMap[remoteBatchId]
checkConflictBatch(t, remoteBatchId, ci, wire.BatchSourceRemote)
rowMap := map[string]wire.ConflictInfo{}
rowMap[getOid(mockCrs.sendQ[6])] = mockCrs.sendQ[6]
rowMap[getOid(mockCrs.sendQ[7])] = mockCrs.sendQ[7]
rowMap[getOid(mockCrs.sendQ[8])] = mockCrs.sendQ[8]
rowMap[getOid(mockCrs.sendQ[9])] = mockCrs.sendQ[9]
rowMap[getOid(mockCrs.sendQ[10])] = mockCrs.sendQ[10]
rowMap[getOid(mockCrs.sendQ[11])] = mockCrs.sendQ[11]
ci = rowMap[y]
checkConflictRow(t, y, ci, []uint64{localBatchId, remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
ci = rowMap[z]
checkConflictRow(t, z, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
ci = rowMap[a]
checkConflictRow(t, a, ci, []uint64{localBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
ci = rowMap[e]
checkConflictRow(t, e, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
ci = rowMap[f]
checkConflictRow(t, f, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
ci = rowMap[g]
checkConflictRow(t, g, ci, []uint64{remoteBatchId}, true /*localDeleted*/, false /*remoteDeleted*/)
checkContinued(t, mockCrs.sendQ[4:])
}
func checkContinued(t *testing.T, infoGroup []wire.ConflictInfo) {
lastIndex := len(infoGroup) - 1
for i := range infoGroup {
if infoGroup[i].Continued != (i != lastIndex) {
t.Errorf("Wrong value for continued field in %#v", infoGroup[i])
}
}
}
func getOid(ci wire.ConflictInfo) string {
ciData := ci.Data.(wire.ConflictDataRow).Value
writeOp := ciData.Op.(wire.OperationWrite).Value
return toRowKey(writeOp.Key)
}
func getBid(ci wire.ConflictInfo) uint64 {
ciData := ci.Data.(wire.ConflictDataBatch).Value
return ciData.Id
}
func checkConflictBatch(t *testing.T, bid uint64, ci wire.ConflictInfo, source wire.BatchSource) {
ciData := ci.Data.(wire.ConflictDataBatch).Value
if ciData.Source != source {
t.Errorf("Source for batchid %v expected: %#v, actual: %#v", bid, source, ciData.Source)
}
if ci.Continued != true {
t.Errorf("Bid: %v, Unexpected value for continued: %v", bid, ci.Continued)
}
}
func checkConflictRow(t *testing.T, oid string, ci wire.ConflictInfo, batchIds []uint64, localDeleted, remoteDeleted bool) {
ciData := ci.Data.(wire.ConflictDataRow).Value
writeOp := ciData.Op.(wire.OperationWrite).Value
st, _ := updObjectsAppResolves[oid]
if (st.ancestor != NoVersion) && !bytes.Equal(makeValue(oid, st.ancestor), writeOp.AncestorValue.Bytes) {
t.Errorf("Oid: %v, Ancestor value expected: %v, actual: %v", oid, string(makeValue(oid, st.ancestor)), string(writeOp.AncestorValue.Bytes))
}
if remoteDeleted && writeOp.RemoteValue == nil {
t.Errorf("Oid: %v, for remote deleted remote value is expected to have an instance with no bytes")
}
if !remoteDeleted && (st.newHead != NoVersion) && !bytes.Equal(makeValue(oid, st.newHead), writeOp.RemoteValue.Bytes) {
t.Errorf("Oid: %v, Remote value expected: %v, actual: %v", oid, string(makeValue(oid, st.newHead)), string(writeOp.RemoteValue.Bytes))
}
if localDeleted && writeOp.LocalValue == nil {
t.Errorf("Oid: %v, for local deleted local value is expected to have an instance with no bytes")
}
if !localDeleted && (st.oldHead != NoVersion) && !bytes.Equal(makeValue(oid, st.oldHead), writeOp.LocalValue.Bytes) {
t.Errorf("Oid: %v, Local value expected: %v, actual: %v", oid, string(makeValue(oid, st.oldHead)), string(writeOp.LocalValue.Bytes))
}
if !reflect.DeepEqual(ciData.BatchIds, batchIds) {
t.Errorf("Oid: %v, BatchIds expected: %v, actual: %v", oid, batchIds, ciData.BatchIds)
}
}
func addResInfo(crs *conflictResolverStream, oid string, sel wire.ValueSelection, val []byte, cntd bool) {
var valRes *wire.Value
if val != nil {
valRes = &wire.Value{
Bytes: val,
}
}
rInfo := wire.ResolutionInfo{
Key: util.StripFirstKeyPartOrDie(oid),
Selection: sel,
Result: valRes,
Continued: cntd,
}
crs.recvQ = append(crs.recvQ, rInfo)
}
func saveValue(t *testing.T, tx store.Transaction, oid, version string) {
if err := watchable.PutAtVersion(nil, tx, []byte(oid), makeValue(oid, version), []byte(version)); err != nil {
t.Errorf("Failed to write versioned value for oid,ver: %s,%s", oid, version)
t.FailNow()
}
}
func makeValue(oid, ver string) []byte {
return []byte(oid + ver)
}
func addToGroup(group *crGroup, oid string, bid uint64, source wire.BatchSource) {
if bid != NoBatchId {
group.batchSource[bid] = source
group.batchesByOid[oid] = append(group.batchesByOid[oid], bid)
} else {
group.batchesByOid[oid] = []uint64{}
}
}
func newGroup() *crGroup {
return &crGroup{
batchSource: map[uint64]wire.BatchSource{},
batchesByOid: map[string][]uint64{},
}
}
func newDb(t *testing.T, s *syncService) interfaces.Database {
app, err := s.sv.App(nil, nil, "mockApp")
if err != nil {
t.Errorf("Error while creating App: %v", err)
}
db, err := app.NoSQLDatabase(nil, nil, "mockDB")
if err != nil {
t.Errorf("Error while creating Database: %v", err)
}
return db
}