// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vsync

import (
	"bytes"
	"reflect"
	"testing"

	wire "v.io/v23/services/syncbase/nosql"
	"v.io/x/ref/services/syncbase/server/interfaces"
	"v.io/x/ref/services/syncbase/server/watchable"
	"v.io/x/ref/services/syncbase/store"
)

/*
Test setup:

Group1:
Oid: x, isConflict = true, has Local update, has remote update, has ancestor, resolution = pickLocal

Group2:
Oid: b, isConflict = true, has Local update, remote deleted, has ancestor, resolution = pickRemote

Group3:
Oid: c, isConflict = true, Local deleted, has remote update, has ancestor, resolution = pickRemote

Group4:
Oid: p, isConflict = true, has Local update, has remote update, has ancestor, resolution = createNew

Group5:
Oid: y, isConflict = true, has Local update, has remote update, has no ancestor, resolution = pickRemote
Oid: z, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = pickRemote
Oid: e, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = createNew
Oid: f, isConflict = false, no local update, has remote update, has unknown ancestor, resolution = pickLocal
Oid: g, isConflict = false, no local update, has remote update, has unknown ancestor, local head is deleted, resolution = pickLocal
Oid: a, local value rubberbanded in due to localBatch, resolution = createNew

localBatch: {y, a}
remoteBatch: {y, z, e, f}
*/

var (
	updObjectsAppResolves = map[string]*objConflictState{
		// group1
		x: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
		// group2
		b: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
		// group3
		c: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),
		// group4
		p: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, true /*hasAncestor*/),

		// group5
		y: createObjConflictState(true /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
		z: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
		a: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, false /*hasRemote*/, false /*hasAncestor*/),

		e: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
		f: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
		g: createObjConflictState(false /*isConflict*/, true /*hasLocal*/, true /*hasRemote*/, false /*hasAncestor*/),
	}

	localBatchId  uint64 = 34
	remoteBatchId uint64 = 58
)

func createGroupedCrTestData() *groupedCrData {
	groupedCrTestData := &groupedCrData{oids: map[string]bool{}}
	var group *crGroup
	// group1
	group = newGroup()
	addToGroup(group, x, NoBatchId, -1)
	groupedCrTestData.oids[x] = true
	groupedCrTestData.groups = append(groupedCrTestData.groups, group)

	// group2
	group = newGroup()
	addToGroup(group, b, NoBatchId, -1)
	groupedCrTestData.oids[b] = true
	groupedCrTestData.groups = append(groupedCrTestData.groups, group)

	// group3
	group = newGroup()
	addToGroup(group, c, NoBatchId, -1)
	groupedCrTestData.oids[c] = true
	groupedCrTestData.groups = append(groupedCrTestData.groups, group)

	// group4
	group = newGroup()
	addToGroup(group, p, NoBatchId, -1)
	groupedCrTestData.oids[p] = true
	groupedCrTestData.groups = append(groupedCrTestData.groups, group)

	// group5
	group = newGroup()
	addToGroup(group, y, localBatchId, wire.BatchSourceLocal)
	addToGroup(group, y, remoteBatchId, wire.BatchSourceRemote)
	addToGroup(group, z, remoteBatchId, wire.BatchSourceRemote)
	addToGroup(group, e, remoteBatchId, wire.BatchSourceRemote)
	addToGroup(group, f, remoteBatchId, wire.BatchSourceRemote)
	addToGroup(group, g, remoteBatchId, wire.BatchSourceRemote)
	addToGroup(group, a, localBatchId, wire.BatchSourceLocal)
	groupedCrTestData.oids[y] = true
	groupedCrTestData.oids[z] = true
	groupedCrTestData.oids[a] = true
	groupedCrTestData.oids[e] = true
	groupedCrTestData.oids[f] = true
	groupedCrTestData.groups = append(groupedCrTestData.groups, group)

	return groupedCrTestData
}

func createAndSaveNodeAndLogRecDataAppResolves(iSt *initiationState) {
	// group1
	saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].oldHead, 24, false)
	saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].newHead, 25, false)
	saveNodeAndLogRec(iSt.tx, x, updObjectsAppResolves[x].ancestor, 20, false)

	// group2
	saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].newHead, 23, true)
	saveNodeAndLogRec(iSt.tx, b, updObjectsAppResolves[b].ancestor, 15, false)

	// group3
	saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].oldHead, 56, true)
	saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].newHead, 23, false)
	saveNodeAndLogRec(iSt.tx, c, updObjectsAppResolves[c].ancestor, 15, false)

	// group4
	saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].newHead, 23, false)
	saveNodeAndLogRec(iSt.tx, p, updObjectsAppResolves[p].ancestor, 15, false)

	//group5
	saveNodeAndLogRec(iSt.tx, y, updObjectsAppResolves[y].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, y, updObjectsAppResolves[y].newHead, 23, false)

	saveNodeAndLogRec(iSt.tx, z, updObjectsAppResolves[z].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, z, updObjectsAppResolves[z].newHead, 23, false)

	saveNodeAndLogRec(iSt.tx, e, updObjectsAppResolves[e].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, e, updObjectsAppResolves[e].newHead, 23, false)

	saveNodeAndLogRec(iSt.tx, f, updObjectsAppResolves[f].oldHead, 56, false)
	saveNodeAndLogRec(iSt.tx, f, updObjectsAppResolves[f].newHead, 23, false)

	saveNodeAndLogRec(iSt.tx, g, updObjectsAppResolves[g].oldHead, 56, true)
	saveNodeAndLogRec(iSt.tx, g, updObjectsAppResolves[g].newHead, 23, false)

	saveNodeAndLogRec(iSt.tx, a, updObjectsAppResolves[a].oldHead, 56, false)
}

func writeVersionedValues(t *testing.T, iSt *initiationState) {
	// group1
	saveValue(t, iSt.tx, x, updObjectsAppResolves[x].oldHead)
	saveValue(t, iSt.tx, x, updObjectsAppResolves[x].newHead)
	saveValue(t, iSt.tx, x, updObjectsAppResolves[x].ancestor)

	// group2
	saveValue(t, iSt.tx, b, updObjectsAppResolves[b].oldHead)
	saveValue(t, iSt.tx, b, updObjectsAppResolves[b].ancestor)

	// group3
	saveValue(t, iSt.tx, c, updObjectsAppResolves[c].newHead)
	saveValue(t, iSt.tx, c, updObjectsAppResolves[c].ancestor)

	// group4
	saveValue(t, iSt.tx, p, updObjectsAppResolves[p].oldHead)
	saveValue(t, iSt.tx, p, updObjectsAppResolves[p].newHead)
	saveValue(t, iSt.tx, p, updObjectsAppResolves[p].ancestor)

	// group5
	saveValue(t, iSt.tx, y, updObjectsAppResolves[y].oldHead)
	saveValue(t, iSt.tx, y, updObjectsAppResolves[y].newHead)

	saveValue(t, iSt.tx, z, updObjectsAppResolves[z].oldHead)
	saveValue(t, iSt.tx, z, updObjectsAppResolves[z].newHead)

	saveValue(t, iSt.tx, e, updObjectsAppResolves[e].oldHead)
	saveValue(t, iSt.tx, e, updObjectsAppResolves[e].newHead)

	saveValue(t, iSt.tx, f, updObjectsAppResolves[f].oldHead)
	saveValue(t, iSt.tx, f, updObjectsAppResolves[f].newHead)

	// No value for oldHead for g as g was deleted on local.
	saveValue(t, iSt.tx, g, updObjectsAppResolves[g].newHead)

	saveValue(t, iSt.tx, a, updObjectsAppResolves[a].oldHead)
}

func setResInfoData(mockCrs *conflictResolverStream) {
	// group1
	addResInfo(mockCrs, x, wire.ValueSelectionLocal, nil, false)
	// group2
	addResInfo(mockCrs, b, wire.ValueSelectionRemote, nil, false)
	// group3
	addResInfo(mockCrs, c, wire.ValueSelectionRemote, nil, false)
	// group4
	addResInfo(mockCrs, p, wire.ValueSelectionOther, []byte("newValue"), false)
	// group5
	addResInfo(mockCrs, y, wire.ValueSelectionRemote, nil, true)
	addResInfo(mockCrs, z, wire.ValueSelectionRemote, nil, true)
	addResInfo(mockCrs, e, wire.ValueSelectionOther, []byte("newValue"), true)
	addResInfo(mockCrs, f, wire.ValueSelectionLocal, nil, true)
	addResInfo(mockCrs, g, wire.ValueSelectionLocal, nil, true)
	addResInfo(mockCrs, a, wire.ValueSelectionOther, []byte("newValue"), false)
	setMockCRStream(mockCrs)
}

func TestResolveViaApp(t *testing.T) {
	service := createService(t)
	defer destroyService(t, service)

	iSt := &initiationState{
		updObjects: updObjectsAppResolves,
		tx:         service.St().NewTransaction(),
		config: &initiationConfig{
			sync: service.sync,
			db:   newDb(t, service.sync),
		},
	}
	createAndSaveNodeAndLogRecDataAppResolves(iSt)
	writeVersionedValues(t, iSt)
	mockCrs := &conflictResolverStream{}
	setResInfoData(mockCrs)
	groupedCrTestData := createGroupedCrTestData()

	if err := iSt.resolveViaApp(nil, groupedCrTestData); err != nil {
		t.Errorf("Error returned by resolveViaApp: %v", err)
	}

	// verify
	verifyConflictInfos(t, mockCrs)
	// group1
	verifyResolution(t, iSt.updObjects, x, pickLocal)
	// group2
	verifyResolution(t, iSt.updObjects, b, pickRemote)
	// group3
	verifyResolution(t, iSt.updObjects, c, pickRemote)
	// group4
	verifyCreateNew(t, iSt.updObjects, p, false)
	// group5
	verifyResolution(t, iSt.updObjects, y, pickRemote)
	verifyResolution(t, iSt.updObjects, z, pickRemote)
	verifyCreateNew(t, iSt.updObjects, e, false)
	verifyCreateNew(t, iSt.updObjects, f, false)
	verifyCreateNew(t, iSt.updObjects, g, true)
	verifyCreateNew(t, iSt.updObjects, a, false)

	// verify batch ids
	verifyBatchId(t, iSt.updObjects, NoBatchId, x, b, c, p)
	bid := iSt.updObjects[y].res.batchId
	if bid == NoBatchId {
		t.Errorf("BatchId for group5 should not be NoBatchId")
	}
	verifyBatchId(t, iSt.updObjects, bid, y, z, e, f, a)
}

func verifyConflictInfos(t *testing.T, mockCrs *conflictResolverStream) {
	var ci wire.ConflictInfo
	if len(mockCrs.sendQ) != 12 {
		t.Errorf("ConflictInfo count expected: %v, actual: %v", 9, len(mockCrs.sendQ))
	}

	// group1
	ci = mockCrs.sendQ[0]
	checkConflictRow(t, x, ci, []uint64{}, false /*localDeleted*/, false /*remoteDeleted*/)
	checkContinued(t, mockCrs.sendQ[0:1])

	// group2
	ci = mockCrs.sendQ[1]
	checkConflictRow(t, b, ci, []uint64{}, false /*localDeleted*/, true /*remoteDeleted*/)
	checkContinued(t, mockCrs.sendQ[1:2])

	// group3
	ci = mockCrs.sendQ[2]
	checkConflictRow(t, c, ci, []uint64{}, true /*localDeleted*/, false /*remoteDeleted*/)
	checkContinued(t, mockCrs.sendQ[2:3])

	// group4
	ci = mockCrs.sendQ[3]
	checkConflictRow(t, p, ci, []uint64{}, false /*localDeleted*/, false /*remoteDeleted*/)
	checkContinued(t, mockCrs.sendQ[3:4])

	// group5
	batchMap := map[uint64]wire.ConflictInfo{}
	batchMap[getBid(mockCrs.sendQ[4])] = mockCrs.sendQ[4]
	batchMap[getBid(mockCrs.sendQ[5])] = mockCrs.sendQ[5]
	ci = batchMap[localBatchId]
	checkConflictBatch(t, localBatchId, ci, wire.BatchSourceLocal)
	ci = batchMap[remoteBatchId]
	checkConflictBatch(t, remoteBatchId, ci, wire.BatchSourceRemote)

	rowMap := map[string]wire.ConflictInfo{}
	rowMap[getOid(mockCrs.sendQ[6])] = mockCrs.sendQ[6]
	rowMap[getOid(mockCrs.sendQ[7])] = mockCrs.sendQ[7]
	rowMap[getOid(mockCrs.sendQ[8])] = mockCrs.sendQ[8]
	rowMap[getOid(mockCrs.sendQ[9])] = mockCrs.sendQ[9]
	rowMap[getOid(mockCrs.sendQ[10])] = mockCrs.sendQ[10]
	rowMap[getOid(mockCrs.sendQ[11])] = mockCrs.sendQ[11]
	ci = rowMap[y]
	checkConflictRow(t, y, ci, []uint64{localBatchId, remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
	ci = rowMap[z]
	checkConflictRow(t, z, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
	ci = rowMap[a]
	checkConflictRow(t, a, ci, []uint64{localBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
	ci = rowMap[e]
	checkConflictRow(t, e, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
	ci = rowMap[f]
	checkConflictRow(t, f, ci, []uint64{remoteBatchId}, false /*localDeleted*/, false /*remoteDeleted*/)
	ci = rowMap[g]
	checkConflictRow(t, g, ci, []uint64{remoteBatchId}, true /*localDeleted*/, false /*remoteDeleted*/)

	checkContinued(t, mockCrs.sendQ[4:])
}

func checkContinued(t *testing.T, infoGroup []wire.ConflictInfo) {
	lastIndex := len(infoGroup) - 1
	for i := range infoGroup {
		if infoGroup[i].Continued != (i != lastIndex) {
			t.Errorf("Wrong value for continued field in %#v", infoGroup[i])
		}
	}
}

func getOid(ci wire.ConflictInfo) string {
	ciData := ci.Data.(wire.ConflictDataRow).Value
	writeOp := ciData.Op.(wire.OperationWrite).Value
	return toRowKey(writeOp.Key)
}

func getBid(ci wire.ConflictInfo) uint64 {
	ciData := ci.Data.(wire.ConflictDataBatch).Value
	return ciData.Id
}

func checkConflictBatch(t *testing.T, bid uint64, ci wire.ConflictInfo, source wire.BatchSource) {
	ciData := ci.Data.(wire.ConflictDataBatch).Value
	if ciData.Source != source {
		t.Errorf("Source for batchid %v expected: %#v, actual: %#v", bid, source, ciData.Source)
	}
	if ci.Continued != true {
		t.Errorf("Bid: %v, Unexpected value for continued: %v", bid, ci.Continued)
	}
}

func checkConflictRow(t *testing.T, oid string, ci wire.ConflictInfo, batchIds []uint64, localDeleted, remoteDeleted bool) {
	ciData := ci.Data.(wire.ConflictDataRow).Value
	writeOp := ciData.Op.(wire.OperationWrite).Value
	st, _ := updObjectsAppResolves[oid]
	if (st.ancestor != NoVersion) && !bytes.Equal(makeValue(oid, st.ancestor), writeOp.AncestorValue.Bytes) {
		t.Errorf("Oid: %v, Ancestor value expected: %v, actual: %v", oid, string(makeValue(oid, st.ancestor)), string(writeOp.AncestorValue.Bytes))
	}
	if remoteDeleted && writeOp.RemoteValue == nil {
		t.Errorf("Oid: %v, for remote deleted remote value is expected to have an instance with no bytes")
	}
	if !remoteDeleted && (st.newHead != NoVersion) && !bytes.Equal(makeValue(oid, st.newHead), writeOp.RemoteValue.Bytes) {
		t.Errorf("Oid: %v, Remote value expected: %v, actual: %v", oid, string(makeValue(oid, st.newHead)), string(writeOp.RemoteValue.Bytes))
	}
	if localDeleted && writeOp.LocalValue == nil {
		t.Errorf("Oid: %v, for local deleted local value is expected to have an instance with no bytes")
	}
	if !localDeleted && (st.oldHead != NoVersion) && !bytes.Equal(makeValue(oid, st.oldHead), writeOp.LocalValue.Bytes) {
		t.Errorf("Oid: %v, Local value expected: %v, actual: %v", oid, string(makeValue(oid, st.oldHead)), string(writeOp.LocalValue.Bytes))
	}
	if !reflect.DeepEqual(ciData.BatchIds, batchIds) {
		t.Errorf("Oid: %v, BatchIds expected: %v, actual: %v", oid, batchIds, ciData.BatchIds)
	}
}

func addResInfo(crs *conflictResolverStream, oid string, sel wire.ValueSelection, val []byte, cntd bool) {
	var valRes *wire.Value
	if val != nil {
		valRes = &wire.Value{
			Bytes: val,
		}
	}
	rInfo := wire.ResolutionInfo{
		Key:       extractAppKey(oid),
		Selection: sel,
		Result:    valRes,
		Continued: cntd,
	}
	crs.recvQ = append(crs.recvQ, rInfo)
}

func saveValue(t *testing.T, tx store.Transaction, oid, version string) {
	if err := watchable.PutAtVersion(nil, tx, []byte(oid), makeValue(oid, version), []byte(version)); err != nil {
		t.Errorf("Failed to write versioned value for oid,ver: %s,%s", oid, version)
		t.FailNow()
	}
}

func makeValue(oid, ver string) []byte {
	return []byte(oid + ver)
}

func addToGroup(group *crGroup, oid string, bid uint64, source wire.BatchSource) {
	if bid != NoBatchId {
		group.batchSource[bid] = source
		group.batchesByOid[oid] = append(group.batchesByOid[oid], bid)
	} else {
		group.batchesByOid[oid] = []uint64{}
	}
}

func newGroup() *crGroup {
	return &crGroup{
		batchSource:  map[uint64]wire.BatchSource{},
		batchesByOid: map[string][]uint64{},
	}
}

func newDb(t *testing.T, s *syncService) interfaces.Database {
	app, err := s.sv.App(nil, nil, "mockApp")
	if err != nil {
		t.Errorf("Error while creating App: %v", err)
	}
	db, err := app.NoSQLDatabase(nil, nil, "mockDB")
	if err != nil {
		t.Errorf("Error while creating Database: %v", err)
	}
	return db
}
