blob: e88ea9cf734a8512a9e6b780427b19221a8a1da7 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Used to ease the setup of sync test scenarios.
// Parses a sync command file and returns a vector of commands to execute.
// dagReplayCommands() executes the parsed commands at the DAG API level.
import (
"bufio"
"container/list"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"
"v.io/v23/context"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store/watchable"
)
const (
addLocal = iota
addRemote
linkLocal
linkRemote
genvec
)
var (
constTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
)
type syncCommand struct {
cmd int
oid string
version string
parents []string
logrec string
deleted bool
batchId uint64
batchCount uint64
genVecs interfaces.Knowledge
}
// parseSyncCommands parses a sync test file and returns its commands.
func parseSyncCommands(file string) ([]syncCommand, error) {
cmds := []syncCommand{}
sf, err := os.Open("testdata/" + file)
if err != nil {
return nil, err
}
defer sf.Close()
scanner := bufio.NewScanner(sf)
lineno := 0
for scanner.Scan() {
lineno++
line := strings.TrimSpace(scanner.Text())
if line == "" || line[0] == '#' {
continue
}
// The current line encodes a command, i.e. it is not a comment line.
// Use strconv.Unquote to convert \xfe to the desired byte (for example).
// Note, we must wrap the original line in quotes before passing it to
// strconv.Unquote since strconv.Unquote expects the input string to look
// like a Go string literal (quoted).
qline := "\"" + line + "\""
if line, err = strconv.Unquote(qline); err != nil {
return nil, fmt.Errorf("%s: %s", err, qline)
}
args := strings.Split(line, "|")
nargs := len(args)
switch args[0] {
case "addl", "addr":
expNargs := 9
if nargs != expNargs {
return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
file, lineno, expNargs, nargs)
}
var parents []string
for i := 3; i <= 4; i++ {
if args[i] != "" {
parents = append(parents, args[i])
}
}
batchId, err := strconv.ParseUint(args[6], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid batchId: %s", file, lineno, args[6])
}
batchCount, err := strconv.ParseUint(args[7], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid batch count: %s", file, lineno, args[7])
}
del, err := strconv.ParseBool(args[8])
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
}
cmd := syncCommand{
oid: args[1],
version: args[2],
parents: parents,
logrec: args[5],
batchId: batchId,
batchCount: batchCount,
deleted: del,
}
if args[0] == "addl" {
cmd.cmd = addLocal
} else {
cmd.cmd = addRemote
}
cmds = append(cmds, cmd)
case "linkl", "linkr":
expNargs := 6
if nargs != expNargs {
return nil, fmt.Errorf("%s:%d: need %d args instead of %d",
file, lineno, expNargs, nargs)
}
if args[3] == "" {
return nil, fmt.Errorf("%s:%d: parent version not specified", file, lineno)
}
if args[4] != "" {
return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent: %s",
file, lineno, args[4])
}
cmd := syncCommand{
oid: args[1],
version: args[2],
parents: []string{args[3]},
logrec: args[5],
}
if args[0] == "linkl" {
cmd.cmd = linkLocal
} else {
cmd.cmd = linkRemote
}
cmds = append(cmds, cmd)
case "genvec":
cmd := syncCommand{
cmd: genvec,
genVecs: make(interfaces.Knowledge),
}
for i := 1; i < len(args); i = i + 2 {
pfx := args[i]
genVec := make(interfaces.GenVector)
for _, elem := range strings.Split(args[i+1], ",") {
kv := strings.Split(elem, ":")
if len(kv) != 2 {
return nil, fmt.Errorf("%s:%d: invalid gen vector key/val: %s", file, lineno, elem)
}
dev, err := strconv.ParseUint(kv[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid devid: %s", file, lineno, args[i+1])
}
gen, err := strconv.ParseUint(kv[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("%s:%d: invalid gen: %s", file, lineno, args[i+1])
}
genVec[dev] = gen
}
cmd.genVecs[pfx] = genVec
}
cmds = append(cmds, cmd)
default:
return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
}
}
err = scanner.Err()
return cmds, err
}
// dagReplayCommands parses a sync test file and replays its commands, updating
// the DAG structures associated with the sync service.
func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (*graftMap, error) {
cmds, err := parseSyncCommands(syncfile)
if err != nil {
return nil, err
}
st := s.sv.St()
graft := newGraft(st)
for _, cmd := range cmds {
tx := st.NewTransaction()
switch cmd.cmd {
case addLocal:
err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
cmd.deleted, cmd.parents, NoBatchId, nil)
if err != nil {
return nil, fmt.Errorf("cannot add local node %s:%s: %v",
cmd.oid, cmd.version, err)
}
if err = moveHead(ctx, tx, cmd.oid, cmd.version); err != nil {
return nil, fmt.Errorf("cannot move head to %s:%s: %v",
cmd.oid, cmd.version, err)
}
case addRemote:
err = s.addNode(ctx, tx, cmd.oid, cmd.version, cmd.logrec,
cmd.deleted, cmd.parents, NoBatchId, graft)
if err != nil {
return nil, fmt.Errorf("cannot add remote node %s:%s: %v",
cmd.oid, cmd.version, err)
}
case linkLocal:
if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], NoBatchId, nil); err != nil {
return nil, fmt.Errorf("cannot add local parent %s to node %s:%s: %v",
cmd.parents[0], cmd.oid, cmd.version, err)
}
case linkRemote:
if err = s.addParent(ctx, tx, cmd.oid, cmd.version, cmd.parents[0], NoBatchId, graft); err != nil {
return nil, fmt.Errorf("cannot add remote parent %s to node %s:%s: %v",
cmd.parents[0], cmd.oid, cmd.version, err)
}
}
tx.Commit()
}
return graft, nil
}
// dummyStream emulates stream of log records received from RPC.
type dummyStream struct {
l *list.List
entry interfaces.DeltaResp
}
func newStream() *dummyStream {
ds := &dummyStream{
l: list.New(),
}
return ds
}
func (ds *dummyStream) add(entry interfaces.DeltaResp) {
ds.l.PushBack(entry)
}
func (ds *dummyStream) Advance() bool {
if ds.l.Len() > 0 {
ds.entry = ds.l.Remove(ds.l.Front()).(interfaces.DeltaResp)
return true
}
return false
}
func (ds *dummyStream) Value() interfaces.DeltaResp {
return ds.entry
}
func (ds *dummyStream) RecvStream() interface {
Advance() bool
Value() interfaces.DeltaResp
Err() error
} {
return ds
}
func (*dummyStream) Err() error { return nil }
func (ds *dummyStream) Finish() (interfaces.DeltaFinalResp, error) {
return interfaces.DeltaFinalResp{}, nil
}
func (ds *dummyStream) Cancel() {
}
// replayLocalCommands replays local log records parsed from the input file.
func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
cmds, err := parseSyncCommands(syncfile)
if err != nil {
t.Fatalf("parseSyncCommands failed with err %v", err)
}
tx := createDatabase(t, s).St().NewWatchableTransaction()
var pos uint64
for _, cmd := range cmds {
switch cmd.cmd {
case addLocal:
rec := &LocalLogRec{
Metadata: createMetadata(t, interfaces.NodeRec, cmd),
Pos: pos,
}
err = s.sync.processLocalLogRec(nil, tx, rec)
if err != nil {
t.Fatalf("processLocalLogRec failed with err %v", err)
}
// Add to Store.
err = watchable.PutVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte(rec.Metadata.CurVers))
if err != nil {
t.Fatalf("PutVersion failed with err %v", err)
}
err = watchable.PutAtVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte("abc"), []byte(rec.Metadata.CurVers))
if err != nil {
t.Fatalf("PutAtVersion failed with err %v", err)
}
default:
t.Fatalf("replayLocalCommands failed with unknown command %v", cmd)
}
pos++
}
if err := tx.Commit(); err != nil {
t.Fatalf("cannot commit local log records %s, err %v", syncfile, err)
}
}
// createReplayStream creates a dummy stream of log records parsed from the input file.
func createReplayStream(t *testing.T, syncfile string) *dummyStream {
cmds, err := parseSyncCommands(syncfile)
if err != nil {
t.Fatalf("parseSyncCommands failed with err %v", err)
}
stream := newStream()
for _, cmd := range cmds {
var ty byte
switch cmd.cmd {
case genvec:
gvs := interfaces.DeltaRespGvs{cmd.genVecs}
stream.add(gvs)
continue
case addRemote:
ty = interfaces.NodeRec
case linkRemote:
ty = interfaces.LinkRec
default:
t.Fatalf("createReplayStream unknown command %v", cmd)
}
var val string = "abc"
rawValue, err := vom.RawBytesFromValue(val)
if err != nil {
t.Fatalf("createReplayStream encode failed, err %v", err)
}
rec := interfaces.DeltaRespRec{interfaces.LogRec{
Metadata: createMetadata(t, ty, cmd),
Value: rawValue,
}}
stream.add(rec)
}
return stream
}
func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
_, id, gen, err := splitLogRecKey(nil, cmd.logrec)
if err != nil {
t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
}
m := interfaces.LogRecMetadata{
Id: id,
Gen: gen,
RecType: ty,
ObjId: common.JoinKeyParts(common.RowPrefix, cmd.oid),
CurVers: cmd.version,
Parents: cmd.parents,
UpdTime: constTime,
Delete: cmd.deleted,
BatchId: cmd.batchId,
BatchCount: cmd.batchCount,
}
return m
}
// splitLogRecKey is the inverse of logRecKey and returns the prefix, device id
// and generation number.
func splitLogRecKey(ctx *context.T, key string) (string, uint64, uint64, error) {
parts := common.SplitKeyParts(key)
verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
if len(parts) != 5 && len(parts) != 7 {
return "", 0, 0, verr
}
if common.JoinKeyParts(parts[:2]...) != logPrefix {
return "", 0, 0, verr
}
var idStr, genStr, prefix string
if len(parts) == 5 {
if parts[2] != logDataPrefix {
return "", 0, 0, verr
}
prefix, idStr, genStr = parts[2], parts[3], parts[4]
} else { // len(parts) == 7
if _, err := strconv.ParseUint(parts[4], 10, 64); err != nil { // GroupId
return "", 0, 0, verr
}
prefix, idStr, genStr = common.JoinKeyParts(parts[2:5]...), parts[5], parts[6]
}
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
return "", 0, 0, verr
}
gen, err := strconv.ParseUint(genStr, 16, 64)
if err != nil {
return "", 0, 0, verr
}
return prefix, id, gen, nil
}
func TestSplitLogRecKey(t *testing.T) {
invalid := []string{
"y\xfe100\xfebb",
"l\xfe100\xfebb",
"y\xfel\xfed\xfe100\xfexx",
"y\xfel\xfed\xfeaa\xfe100",
"y\xfel\xfex\xfe100\xfe100",
"y\xfel\xfed\xfe100",
"y\xfel\xfed\xfe100\xfe100\xfebb",
"y\xfel\xfey\xfes\xfexx\xfe100\xfe100",
}
for _, k := range invalid {
if _, _, _, err := splitLogRecKey(nil, k); err == nil {
t.Fatalf("splitting log rec key didn't fail %q", k)
}
}
valid := []struct {
pfx string
id uint64
gen uint64
}{
{logDataPrefix, 10, 20},
{"y\xfes\xfe2500", 190, 540},
{"y\xfes\xfe4200", 9999, 999999},
}
for _, v := range valid {
gotPfx, gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.pfx, v.id, v.gen))
if gotPfx != v.pfx || gotId != v.id || gotGen != v.gen || err != nil {
t.Fatalf("failed key conversion: pfx got %v want %v, id got %v want %v, gen got %v want %v, err %v", gotPfx, v.pfx, gotId, v.id, gotGen, v.gen, err)
}
}
}