blob: 77e5d277c5b8add356d4b7b384e7a249a10605ef [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package nosql_test
import (
"fmt"
"math/rand"
"strconv"
"strings"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase"
"v.io/syncbase/v23/syncbase/nosql"
tu "v.io/syncbase/v23/syncbase/testutil"
constants "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23"
"v.io/v23/naming"
"v.io/v23/verror"
"v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
)
//go:generate v23 test generate
// V23TestSyncbasedJoinSyncGroup tests the creation and joining of a
// SyncGroup. Client0 creates a SyncGroup at Syncbase0. Client1 requests to join
// the SyncGroup at Syncbase1. Syncbase1 in turn requests Syncbase0 to join the
// SyncGroup.
func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
}
// V23TestSyncbasedGetDeltas tests the sending of deltas between two Syncbase
// instances and their clients. The 1st client creates a SyncGroup and puts
// some database entries in it. The 2nd client joins that SyncGroup and reads
// the database entries. This verifies the end-to-end synchronization of data
// along the path: client0--Syncbase0--Syncbase1--client1.
func V23TestSyncbasedGetDeltas(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
}
// V23TestSyncbasedGetDeltasWithDel tests the sending of deltas between two
// Syncbase instances and their clients. The 1st client creates a SyncGroup and
// puts some database entries in it. The 2nd client joins that SyncGroup and
// reads the database entries. The 1st client then deletes a portion of this
// data, and adds new entries. The 2nd client verifies that these changes are
// correctly synced. This verifies the end-to-end synchronization of data along
// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
// deletes.
func V23TestSyncbasedGetDeltasWithDel(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo,tb:bar", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client0Creds, runDeleteData, "sync0", "foo", "0")
tu.RunClient(t, client0Creds, runVerifyDeletedData, "sync0", "foo")
tu.RunClient(t, client1Creds, runVerifyDeletedData, "sync1", "foo")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "bar", "0", "10")
}
// V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
// Syncbase instances and their clients. The 1st client creates a SyncGroup and
// puts some database entries in it. The 2nd client joins that SyncGroup and
// reads the database entries. The 2nd client then updates a subset of existing
// keys and adds more keys and the 1st client verifies that it can read these
// keys. This verifies the end-to-end bi-directional synchronization of data
// along the path: client0--Syncbase0--Syncbase1--client1.
func V23TestSyncbasedExchangeDeltas(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
tu.RunClient(t, client1Creds, runPopulateData, "sync1", "foo", "10")
tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
}
// V23 TestSyncbasedExchangeDeltasWithConflicts tests the exchange of deltas
// between two Syncbase instances and their clients in the presence of
// conflicting updates. The 1st client creates a SyncGroup and puts some
// database entries in it. The 2nd client joins that SyncGroup and reads the
// database entries. Both clients then update a subset of existing keys
// concurrently, and sync with each other. During sync, the following
// possibilities arise: (1) Both clients make their local updates first, sync
// with each other to detect conflicts. Resolution will cause one of the clients
// to see a new value based on the timestamp. (2) One client's update is synced
// before the other client has a chance to commit its update. The other client's
// update will then not be a conflict but a valid update building on the first
// one's change.
//
// Note that the verification done from the client side can have false positives
// re. the test's success. Since we cannot accurately predict which client's
// updates win, the test passes if we find either outcome. However, this could
// also imply that the sync failed, and each client is merely reading its own
// local value. The verification step mainly verifies that syncbased is still
// responsive and that the data is not corrupt.
//
// TODO(hpucha): We could diff the states of the two clients and ensure they are
// identical. Optionally we could expose inner state of syncbased via some
// debug methods.
func V23TestSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
// Run it multiple times to exercise different interactions between sync
// and local updates that change every run due to timing.
for i := 0; i < 10; i++ {
testSyncbasedExchangeDeltasWithConflicts(t)
}
}
func testSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
go tu.RunClient(t, client0Creds, runUpdateData, "sync0", "5")
d := time.Duration(rand.Int63n(50)) * time.Millisecond
time.Sleep(d)
tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
time.Sleep(10 * time.Second)
tu.RunClient(t, client0Creds, runVerifyConflictResolution, "sync0")
tu.RunClient(t, client1Creds, runVerifyConflictResolution, "sync1")
}
// V23TestNestedSyncGroups tests the exchange of deltas between two Syncbase
// instances and their clients with nested SyncGroups. The 1st client creates
// two SyncGroups at prefixes "f" and "foo" and puts some database entries in
// both of them. The 2nd client first joins the SyncGroup with prefix "foo" and
// verifies that it reads the corresponding database entries. The 2nd client
// then joins the SyncGroup with prefix "f" and verifies that it can read the
// "f" keys.
func V23TestNestedSyncGroups(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sg1Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
sg2Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG2")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg2Name)
tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
}
// V23TestNestedAndPeerSyncGroups tests the exchange of deltas between three
// Syncbase instances and their clients consisting of nested/peer
// SyncGroups. The 1st client creates two SyncGroups: SG1 at prefix "foo" and
// SG2 at "f" and puts some database entries in both of them. The 2nd client
// joins the SyncGroup SG1 and verifies that it reads the corresponding database
// entries. Client 2 then creates SG3 at prefix "f". The 3rd client joins the
// SyncGroups SG2 and SG3 and verifies that it can read all the "f" and "foo"
// keys created by client 1. Client 2 also verifies that it can read all the "f"
// and "foo" keys created by client 1.
func V23TestNestedAndPeerSyncGroups(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
server2Creds, _ := t.Shell().NewChildCredentials("s2")
client2Creds, _ := t.Shell().NewChildCredentials("c2")
cleanSync2 := tu.StartSyncbased(t, server2Creds, "sync2", "",
`{"Read": {"In":["root/c2"]}, "Write": {"In":["root/c2"]}}`)
defer cleanSync2()
sg1Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
sg2Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG2")
sg3Name := naming.Join("sync1", constants.SyncbaseSuffix, "SG3")
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1", "root/s2")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s2")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runCreateSyncGroup, "sync1", sg3Name, "tb:f", "root/s1", "root/s2")
tu.RunClient(t, client2Creds, runSetupAppA, "sync2")
tu.RunClient(t, client2Creds, runJoinSyncGroup, "sync2", sg2Name)
tu.RunClient(t, client2Creds, runJoinSyncGroup, "sync2", sg3Name)
tu.RunClient(t, client2Creds, runVerifyNestedSyncGroupData, "sync2")
tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
}
// V23TestSyncbasedGetDeltasPrePopulate tests the sending of deltas between two
// Syncbase instances and their clients with data existing before the creation
// of a SyncGroup. The 1st client puts entries in a database then creates a
// SyncGroup over that data. The 2nd client joins that SyncGroup and reads the
// database entries.
func V23TestSyncbasedGetDeltasPrePopulate(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
// Populate table data before creating the SyncGroup. Also populate
// with data that is not part of the SyncGroup to verify filtering.
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runVerifyNonSyncGroupData, "sync1", "bar")
}
// V23TestSyncbasedGetDeltasMultiApp tests the sending of deltas between two
// Syncbase instances and their clients across multiple apps, databases, and
// tables. The 1st client puts entries in multiple tables across multiple
// app databases then creates multiple SyncGroups (one per database) over that
// data. The 2nd client joins these SyncGroups and reads all the data.
func V23TestSyncbasedGetDeltasMultiApp(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s1")
client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
sgNamePrefix := naming.Join("sync0", constants.SyncbaseSuffix)
na, nd, nt := "2", "2", "2" // number of apps, dbs, tables
tu.RunClient(t, client0Creds, runSetupAppMulti, "sync0", na, nd, nt)
tu.RunClient(t, client0Creds, runPopulateSyncGroupMulti, "sync0", sgNamePrefix, na, nd, nt, "foo", "bar")
tu.RunClient(t, client1Creds, runSetupAppMulti, "sync1", na, nd, nt)
tu.RunClient(t, client1Creds, runJoinSyncGroupMulti, "sync1", sgNamePrefix, na, nd)
tu.RunClient(t, client1Creds, runVerifySyncGroupDataMulti, "sync1", na, nd, nt, "foo", "bar")
}
////////////////////////////////////
// Helpers.
// TODO(hpucha): Look into refactoring scan logic out of the helpers, and
// avoiding gets when we can scan.
var runSetupAppA = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
a.Create(ctx, nil)
d := a.NoSQLDatabase("d", nil)
d.Create(ctx, nil)
d.CreateTable(ctx, "tb", nil)
return nil
}, "runSetupAppA")
var runCreateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
mtName := env.Vars[ref.EnvNamespacePrefix]
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
Perms: perms(args[3:]...),
Prefixes: strings.Split(args[2], ","),
MountTables: []string{mtName},
}
sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{8}
if err := sg.Create(ctx, spec, info); err != nil {
return fmt.Errorf("Create SG %q failed: %v\n", args[1], err)
}
return nil
}, "runCreateSyncGroup")
var runJoinSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
return fmt.Errorf("Join SG %q failed: %v\n", args[1], err)
}
return nil
}, "runJoinSyncGroup")
var runPopulateData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Do Puts.
tb := d.Table("tb")
start, _ := strconv.ParseUint(args[2], 10, 64)
for i := start; i < start+10; i++ {
key := fmt.Sprintf("%s%d", args[1], i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
return nil
}, "runPopulateData")
var runUpdateData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Do Puts.
tb := d.Table("tb")
start, _ := strconv.ParseUint(args[1], 10, 64)
for i := start; i < start+5; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+args[0]+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
return nil
}, "runUpdateData")
var runDeleteData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Do Puts.
tb := d.Table("tb")
start, _ := strconv.ParseUint(args[1], 10, 64)
for i := start; i < start+5; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
if err := r.Delete(ctx); err != nil {
return fmt.Errorf("r.Delete() failed: %v\n", err)
}
}
return nil
}, "runDeleteData")
var runVerifySyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Wait for a bit (up to 4 sec) until the last key appears.
tb := d.Table("tb")
start, _ := strconv.ParseUint(args[2], 10, 64)
count, _ := strconv.ParseUint(args[3], 10, 64)
lastKey := fmt.Sprintf("%s%d", args[1], start+count-1)
r := tb.Row(lastKey)
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
// Verify that all keys and values made it correctly.
for i := start; i < start+count; i++ {
key := fmt.Sprintf("%s%d", args[1], i)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
// Re-verify using a scan operation.
stream := tb.Scan(ctx, nosql.Prefix(args[1]))
for i := 0; stream.Advance(); i++ {
want := fmt.Sprintf("%s%d", args[1], i)
got := stream.Key()
if got != want {
return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
}
want = "testkey" + want
if err := stream.Value(&got); err != nil {
return fmt.Errorf("cannot fetch value in scan: %v\n", err)
}
if got != want {
return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
}
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
return nil
}, "runVerifySyncGroupData")
var runVerifyDeletedData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Wait for a bit for deletions to propagate.
tb := d.Table("tb")
r := tb.Row("foo4")
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
break
}
}
// Verify using a scan operation.
stream := tb.Scan(ctx, nosql.Prefix(args[1]))
count := 0
for i := 5; stream.Advance(); i++ {
want := fmt.Sprintf("%s%d", args[1], i)
got := stream.Key()
if got != want {
return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
}
want = "testkey" + want
if err := stream.Value(&got); err != nil {
return fmt.Errorf("cannot fetch value in scan: %v\n", err)
}
if got != want {
return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
}
count++
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
if count != 5 {
return fmt.Errorf("scan stream count error: %v\n", count)
}
return nil
}, "runVerifyDeletedData")
var runVerifyConflictResolution = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
wantData := []struct {
start uint64
count uint64
valPfx []string
}{
{0, 5, []string{"testkey"}},
{5, 5, []string{"testkeysync0", "testkeysync1"}},
}
// Verify that all keys and values made it correctly.
for _, d := range wantData {
for i := d.start; i < d.start+d.count; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
match := 0
for _, p := range d.valPfx {
want := p + key
if got == want {
match++
}
}
if match != 1 {
return fmt.Errorf("unexpected value: got %q, match %v, want %v\n", got, match, d.valPfx)
}
}
}
return nil
}, "runVerifyConflictResolution")
var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
// Verify through a scan that none of that data exists.
count := 0
stream := tb.Scan(ctx, nosql.Prefix(args[1]))
for stream.Advance() {
count++
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
if count > 0 {
return fmt.Errorf("found %d entries in %s prefix that should not be there\n", count, args[1])
}
return nil
}, "runVerifyNonSyncGroupData")
var runVerifyLocalAndRemoteData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
// Wait for a bit (up to 4 sec) until the last key appears.
r := tb.Row("foo19")
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
wantData := []struct {
start uint64
count uint64
valPfx string
}{
{0, 5, "testkey"},
{5, 5, "testkeysync1"},
{10, 10, "testkey"},
}
// Verify that all keys and values made it correctly.
for _, d := range wantData {
for i := d.start; i < d.start+d.count; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := d.valPfx + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
}
return nil
}, "runVerifyLocalAndRemoteData")
var runVerifyNestedSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d", nil)
// Wait for a bit (up to 8 sec) until the last key appears. This chosen
// time interval is dependent on how fast the membership view is
// refreshed (currently 2 seconds) and how frequently we sync with peers
// (every 50 ms), and then adding a substantial safeguard to it to
// ensure that the test is not flaky even in somewhat abnormal
// conditions. Note that we wait longer than the 2 node tests since more
// nodes implies more pair-wise communication before achieving steady
// state.
tb := d.Table("tb")
r := tb.Row("f9")
for i := 0; i < 8; i++ {
time.Sleep(1 * time.Second)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
// Verify that all keys and values made it correctly.
pfxs := []string{"foo", "f"}
for _, p := range pfxs {
for i := 0; i < 10; i++ {
key := fmt.Sprintf("%s%d", p, i)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
}
return nil
}, "runVerifyNestedSyncGroupData")
var runSetupAppMulti = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
svc := syncbase.NewService(args[0])
numApps, _ := strconv.Atoi(args[1])
numDbs, _ := strconv.Atoi(args[2])
numTbs, _ := strconv.Atoi(args[3])
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
a := svc.App(appName)
a.Create(ctx, nil)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := a.NoSQLDatabase(dbName, nil)
d.Create(ctx, nil)
for k := 0; k < numTbs; k++ {
tbName := fmt.Sprintf("tb%d", k)
d.CreateTable(ctx, tbName, nil)
}
}
}
return nil
}, "runSetupAppMulti")
var runPopulateSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
mtName := env.Vars[ref.EnvNamespacePrefix]
svc := syncbase.NewService(args[0])
sgNamePrefix := args[1]
numApps, _ := strconv.Atoi(args[2])
numDbs, _ := strconv.Atoi(args[3])
numTbs, _ := strconv.Atoi(args[4])
prefixes := args[5:]
// For each app...
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
a := svc.App(appName)
// For each database...
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := a.NoSQLDatabase(dbName, nil)
// For each table, pre-populate entries on each prefix.
// Also determine the SyncGroup prefixes.
var sgPrefixes []string
for k := 0; k < numTbs; k++ {
tbName := fmt.Sprintf("tb%d", k)
tb := d.Table(tbName)
for _, pfx := range prefixes {
p := fmt.Sprintf("%s:%s", tbName, pfx)
sgPrefixes = append(sgPrefixes, p)
for n := 0; n < 10; n++ {
key := fmt.Sprintf("%s%d", pfx, n)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
}
}
// Create one SyncGroup per database across all tables
// and prefixes.
sgName := naming.Join(sgNamePrefix, appName, dbName)
spec := wire.SyncGroupSpec{
Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
Perms: perms("root/s0", "root/s1"),
Prefixes: sgPrefixes,
MountTables: []string{mtName},
}
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{8}
if err := sg.Create(ctx, spec, info); err != nil {
return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
}
}
}
return nil
}, "runPopulateSyncGroupMulti")
var runJoinSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
svc := syncbase.NewService(args[0])
sgNamePrefix := args[1]
numApps, _ := strconv.Atoi(args[2])
numDbs, _ := strconv.Atoi(args[3])
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
a := svc.App(appName)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := a.NoSQLDatabase(dbName, nil)
sgName := naming.Join(sgNamePrefix, appName, dbName)
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
}
}
}
return nil
}, "runJoinSyncGroupMulti")
var runVerifySyncGroupDataMulti = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
svc := syncbase.NewService(args[0])
numApps, _ := strconv.Atoi(args[1])
numDbs, _ := strconv.Atoi(args[2])
numTbs, _ := strconv.Atoi(args[3])
prefixes := args[4:]
time.Sleep(20 * time.Second)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
a := svc.App(appName)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := a.NoSQLDatabase(dbName, nil)
for k := 0; k < numTbs; k++ {
tbName := fmt.Sprintf("tb%d", k)
tb := d.Table(tbName)
for _, pfx := range prefixes {
for n := 0; n < 10; n++ {
key := fmt.Sprintf("%s%d", pfx, n)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n",
got, want)
}
}
}
}
}
}
return nil
}, "runVerifySyncGroupDataMulti")