// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vsync

// Used to ease the setup of sync test scenarios.
// Parses a sync command file and returns a vector of commands to execute.
// dagReplayCommands() executes the parsed commands at the DAG API level.

import (
	"bufio"
	"container/list"
	"fmt"
	"os"
	"strconv"
	"strings"
	"testing"
	"time"

	"v.io/v23/context"
	"v.io/v23/vom"
	"v.io/x/ref/services/syncbase/server/interfaces"
	"v.io/x/ref/services/syncbase/server/util"
	"v.io/x/ref/services/syncbase/server/watchable"
)

const (
	addLocal = iota
	addRemote
	linkLocal
	linkRemote
	genvec
)

var (
	constTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
)

type syncCommand struct {
	cmd        int
	oid        string
	version    string
	parents    []string
	logrec     string
	deleted    bool
	batchId    uint64
	batchCount uint64
	genVec     interfaces.GenVector
}

// parseSyncCommands parses a sync test file and returns its commands.
func parseSyncCommands(file string) ([]syncCommand, error) {
	cmds := []syncCommand{}

	sf, err := os.Open("testdata/" + file)
	if err != nil {
		return nil, err
	}
	defer sf.Close()

	scanner := bufio.NewScanner(sf)
	lineno := 0
	for scanner.Scan() {
		lineno++
		line := strings.TrimSpace(scanner.Text())
		if line == "" || line[0] == '#' {
			continue
		}

		args := strings.Split(line, "|")
		nargs := len(args)

		switch args[0] {
		case "addl", "addr":
			expNargs := 9
			if nargs != expNargs {
				return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
					file, lineno, expNargs, nargs)
			}
			var parents []string
			for i := 3; i <= 4; i++ {
				if args[i] != "" {
					parents = append(parents, args[i])
				}
			}

			batchId, err := strconv.ParseUint(args[6], 10, 64)
			if err != nil {
				return nil, fmt.Errorf("%s:%d: invalid batchId: %s", file, lineno, args[6])
			}
			batchCount, err := strconv.ParseUint(args[7], 10, 64)
			if err != nil {
				return nil, fmt.Errorf("%s:%d: invalid batch count: %s", file, lineno, args[7])
			}
			del, err := strconv.ParseBool(args[8])
			if err != nil {
				return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
			}
			cmd := syncCommand{
				oid:        args[1],
				version:    args[2],
				parents:    parents,
				logrec:     args[5],
				batchId:    batchId,
				batchCount: batchCount,
				deleted:    del,
			}
			if args[0] == "addl" {
				cmd.cmd = addLocal
			} else {
				cmd.cmd = addRemote
			}
			cmds = append(cmds, cmd)

		case "linkl", "linkr":
			expNargs := 6
			if nargs != expNargs {
				return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
					file, lineno, expNargs, nargs)
			}

			if args[3] == "" {
				return nil, fmt.Errorf("%s:%d: parent version not specified", file, lineno)
			}
			if args[4] != "" {
				return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent: %s",
					file, lineno, args[4])
			}

			cmd := syncCommand{
				oid:     args[1],
				version: args[2],
				parents: []string{args[3]},
				logrec:  args[5],
			}
			if args[0] == "linkl" {
				cmd.cmd = linkLocal
			} else {
				cmd.cmd = linkRemote
			}
			cmds = append(cmds, cmd)

		case "genvec":
			cmd := syncCommand{
				cmd:    genvec,
				genVec: make(interfaces.GenVector),
			}
			for i := 1; i < len(args); i = i + 2 {
				pfx := args[i]
				genVec := make(interfaces.PrefixGenVector)
				for _, elem := range strings.Split(args[i+1], ",") {
					kv := strings.Split(elem, ":")
					if len(kv) != 2 {
						return nil, fmt.Errorf("%s:%d: invalid gen vector key/val: %s", file, lineno, elem)
					}
					dev, err := strconv.ParseUint(kv[0], 10, 64)
					if err != nil {
						return nil, fmt.Errorf("%s:%d: invalid devid: %s", file, lineno, args[i+1])
					}
					gen, err := strconv.ParseUint(kv[1], 10, 64)
					if err != nil {
						return nil, fmt.Errorf("%s:%d: invalid gen: %s", file, lineno, args[i+1])
					}
					genVec[dev] = gen
				}
				cmd.genVec[pfx] = genVec
			}
			cmds = append(cmds, cmd)

		default:
			return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
		}
	}

	err = scanner.Err()
	return cmds, err
}

// dagReplayCommands parses a sync test file and replays its commands, updating
// the DAG structures associated with the sync service.
func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (*graftMap, error) {
	cmds, err := parseSyncCommands(syncfile)
	if err != nil {
		return nil, err
	}

	st := s.sv.St()
	graft := newGraft(st)

	for _, cmd := range cmds {
		tx := st.NewTransaction()

		switch cmd.cmd {
		case addLocal:
			err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
				cmd.deleted, cmd.parents, NoBatchId, nil)
			if err != nil {
				return nil, fmt.Errorf("cannot add local node %s:%s: %v",
					cmd.oid, cmd.version, err)
			}

			if err = moveHead(ctx, tx, cmd.oid, cmd.version); err != nil {
				return nil, fmt.Errorf("cannot move head to %s:%s: %v",
					cmd.oid, cmd.version, err)
			}

		case addRemote:
			err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
				cmd.deleted, cmd.parents, NoBatchId, graft)
			if err != nil {
				return nil, fmt.Errorf("cannot add remote node %s:%s: %v",
					cmd.oid, cmd.version, err)
			}

		case linkLocal:
			if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], nil); err != nil {
				return nil, fmt.Errorf("cannot add local parent %s to node %s:%s: %v",
					cmd.parents[0], cmd.oid, cmd.version, err)
			}

		case linkRemote:
			if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], graft); err != nil {
				return nil, fmt.Errorf("cannot add remote parent %s to node %s:%s: %v",
					cmd.parents[0], cmd.oid, cmd.version, err)
			}
		}

		tx.Commit()
	}

	return graft, nil
}

// dummyStream emulates stream of log records received from RPC.
type dummyStream struct {
	l     *list.List
	entry interfaces.DeltaResp
}

func newStream() *dummyStream {
	ds := &dummyStream{
		l: list.New(),
	}
	return ds
}

func (ds *dummyStream) add(entry interfaces.DeltaResp) {
	ds.l.PushBack(entry)
}

func (ds *dummyStream) Advance() bool {
	if ds.l.Len() > 0 {
		ds.entry = ds.l.Remove(ds.l.Front()).(interfaces.DeltaResp)
		return true
	}
	return false
}

func (ds *dummyStream) Value() interfaces.DeltaResp {
	return ds.entry
}

func (ds *dummyStream) RecvStream() interface {
	Advance() bool
	Value() interfaces.DeltaResp
	Err() error
} {
	return ds
}

func (*dummyStream) Err() error { return nil }

func (ds *dummyStream) Finish() error {
	return nil
}

func (ds *dummyStream) Cancel() {
}

// replayLocalCommands replays local log records parsed from the input file.
func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
	cmds, err := parseSyncCommands(syncfile)
	if err != nil {
		t.Fatalf("parseSyncCommands failed with err %v", err)
	}

	tx := s.St().NewTransaction()
	var pos uint64
	for _, cmd := range cmds {
		switch cmd.cmd {
		case addLocal:
			rec := &localLogRec{
				Metadata: createMetadata(t, interfaces.NodeRec, cmd),
				Pos:      pos,
			}
			err = s.sync.processLocalLogRec(nil, tx, rec)
			if err != nil {
				t.Fatalf("processLocalLogRec failed with err %v", err)
			}

			// Add to Store.
			err = watchable.PutVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte(rec.Metadata.CurVers))
			if err != nil {
				t.Fatalf("PutVersion failed with err %v", err)
			}
			err = watchable.PutAtVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte("abc"), []byte(rec.Metadata.CurVers))
			if err != nil {
				t.Fatalf("PutAtVersion failed with err %v", err)
			}

		default:
			t.Fatalf("replayLocalCommands failed with unknown command %v", cmd)
		}
		pos++
	}
	if err := tx.Commit(); err != nil {
		t.Fatalf("cannot commit local log records %s, err %v", syncfile, err)
	}
}

// createReplayStream creates a dummy stream of log records parsed from the input file.
func createReplayStream(t *testing.T, syncfile string) *dummyStream {
	cmds, err := parseSyncCommands(syncfile)
	if err != nil {
		t.Fatalf("parseSyncCommands failed with err %v", err)
	}

	stream := newStream()
	for _, cmd := range cmds {
		var ty byte
		switch cmd.cmd {
		case genvec:
			gv := interfaces.DeltaRespRespVec{cmd.genVec}
			stream.add(gv)
			continue
		case addRemote:
			ty = interfaces.NodeRec
		case linkRemote:
			ty = interfaces.LinkRec
		default:
			t.Fatalf("createReplayStream unknown command %v", cmd)
		}

		var val string = "abc"
		valbuf, err := vom.Encode(val)
		if err != nil {
			t.Fatalf("createReplayStream encode failed, err %v", err)
		}

		rec := interfaces.DeltaRespRec{interfaces.LogRec{
			Metadata: createMetadata(t, ty, cmd),
			Value:    valbuf,
		}}

		stream.add(rec)
	}
	return stream
}

func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
	_, id, gen, err := splitLogRecKey(nil, cmd.logrec)
	if err != nil {
		t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
	}
	m := interfaces.LogRecMetadata{
		Id:         id,
		Gen:        gen,
		RecType:    ty,
		ObjId:      util.JoinKeyParts(util.RowPrefix, cmd.oid),
		CurVers:    cmd.version,
		Parents:    cmd.parents,
		UpdTime:    constTime,
		Delete:     cmd.deleted,
		BatchId:    cmd.batchId,
		BatchCount: cmd.batchCount,
	}
	return m
}
