blob: 8324b13dd15be46a85b1fc95ce91fa0b8092cd9e [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncbase_test
import (
"fmt"
"math/rand"
"os"
"reflect"
"runtime/debug"
"sort"
"strings"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/services/watch"
"v.io/v23/syncbase"
"v.io/v23/verror"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/syncbase/syncbaselib"
"v.io/x/ref/test/v23test"
)
var testCollection = wire.Id{"u", "c"}
func dbHandle(serviceName string) syncbase.Database {
return syncbase.NewService(serviceName).DatabaseForId(wire.Id{"a", "d"}, nil)
}
// NOTE(sadovsky): These tests take a very long time to run - nearly 4 minutes
// on my Macbook Pro! Various instances of time.Sleep() below likely contribute
// to the problem.
// TODO(ivanpi): Move to featuretests and deduplicate helpers.
// TestV23SyncbasedJoinSyncgroup tests the creation and joining of a syncgroup.
// Client0 creates a syncgroup at Syncbase0. Client1 requests to join the
// syncgroup at Syncbase1. Syncbase1 in turn requests Syncbase0 to join the
// syncgroup.
func TestV23SyncbasedJoinSyncgroup(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
}
// TestV23SyncbasedGetDeltas tests the sending of deltas between two Syncbase
// instances and their clients. The 1st client creates a syncgroup and puts
// some database entries in it. The 2nd client joins that syncgroup and reads
// the database entries. This verifies the end-to-end synchronization of data
// along the path: client0--Syncbase0--Syncbase1--client1.
func TestV23SyncbasedGetDeltas(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
// TODO(aghassemi): Resolve permission is currently needed for Watch.
// See https://github.com/vanadium/issues/issues/1110
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo,c:bar", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
ok(t, err)
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
}
// TestV23SyncbasedGetDeltasWithDel tests the sending of deltas between two
// Syncbase instances and their clients. The 1st client creates a syncgroup and
// puts some database entries in it. The 2nd client joins that syncgroup and
// reads the database entries. The 1st client then deletes a portion of this
// data, and adds new entries. The 2nd client verifies that these changes are
// correctly synced. This verifies the end-to-end synchronization of data along
// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
// deletes.
func TestV23SyncbasedGetDeltasWithDel(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
// TODO(aghassemi): Resolve permission is currently needed for Watch.
// See https://github.com/vanadium/issues/issues/1110
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo,c:bar", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
ok(t, err)
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
beforeDeleteMarker, err := getResumeMarker(client1Ctx, "sync1")
ok(t, err)
ok(t, runDeleteData(client0Ctx, "sync0", 0))
ok(t, runVerifyDeletedData(client0Ctx, "sync0", "foo"))
ok(t, runVerifyDeletedData(client1Ctx, "sync1", "foo"))
ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 5, true, beforeDeleteMarker))
ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "bar", 0, 10, false))
}
// TestV23SyncbasedCompEval is a comprehensive sniff test for core sync
// functionality. It tests the exchange of deltas between two Syncbase instances
// and their clients. The 1st client creates a syncgroup and puts some database
// entries in it. The 2nd client joins that syncgroup and reads the database
// entries. The 2nd client then updates a subset of existing keys and adds more
// keys and the 1st client verifies that it can read these keys. This verifies
// the end-to-end bi-directional synchronization of data along the path:
// client0--Syncbase0--Syncbase1--client1. In addition, this test also verifies
// the bi-directional exchange of syncgroup deltas. After the first phase is
// done, both Syncbase instances are shutdown and restarted, and new data is
// synced once again.
func TestV23SyncbasedCompEval(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
server0RDir := sh.MakeTempDir()
cleanSync0 := sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
server1RDir := sh.MakeTempDir()
cleanSync1 := sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
// This is a decoy syncgroup that no other Syncbase joins, but is on the
// same database as the first syncgroup. Populating it after the first
// syncgroup causes the database generations to go up, but the joiners
// on the first syncgroup do not get any data belonging to this
// syncgroup. This triggers the handling of filtered log records in the
// restartability code.
sbName1 := "sync0"
sgName1 := "SG2"
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName1, sgName1, "c:bar", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
// Shutdown and restart Syncbase instances.
cleanSync0(os.Interrupt)
cleanSync1(os.Interrupt)
cleanSync0 = sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
cleanSync1 = sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
ok(t, runSetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v2", "c:foo", "root:s0", "root:s1", "root:s3"))
ok(t, runGetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v2", "c:foo", "root:s0", "root:s1", "root:s3"))
ok(t, runUpdateData(client1Ctx, "sync1", 5))
ok(t, runPopulateData(client1Ctx, "sync1", "foo", 10))
ok(t, runSetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
ok(t, runVerifyLocalAndRemoteData(client0Ctx, "sync0"))
ok(t, runGetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
// Shutdown and restart Syncbase instances.
cleanSync0(os.Interrupt)
cleanSync1(os.Interrupt)
_ = sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
_ = sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
ok(t, runGetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
ok(t, runGetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 20))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 20, 10, true))
}
// TestV23SyncbasedExchangeDeltasWithAcls tests the exchange of deltas including
// acls between two Syncbase instances and their clients. The 1st client creates
// a syncgroup on a collection and puts some database entries in it. The 2nd
// client joins that syncgroup and reads the database entries. The 2nd client
// then changes the collection acl to allow access to only itself. The 1st
// client should be unable to access the keys. The 2nd client then modifies the
// collection acl to restore access to both clients. The 1st client should
// regain access.
func TestV23SyncbasedExchangeDeltasWithAcls(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}, "Admin": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}, "Admin": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
// Note, since collection ACLs are resolved last-one-wins, both collections
// must be set up before calling runSetCollectionPermissions to ensure the
// newly set value is the latest.
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetCollectionPermissions(client0Ctx, "sync0", "root:c0", "root:c1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, true))
ok(t, runSetCollectionPermissions(client1Ctx, "sync1", "root:c1"))
ok(t, runVerifyLostAccess(client0Ctx, "sync0", "foo", 0, 10))
ok(t, runSetCollectionPermissions(client1Ctx, "sync1", "root:c0", "root:c1"))
ok(t, runVerifySyncgroupData(client0Ctx, "sync0", "foo", 0, 10, false))
}
// TestV23SyncbasedExchangeDeltasWithConflicts tests the exchange of deltas
// between two Syncbase instances and their clients in the presence of
// conflicting updates. The 1st client creates a syncgroup and puts some
// database entries in it. The 2nd client joins that syncgroup and reads the
// database entries. Both clients then update a subset of existing keys
// concurrently, and sync with each other. During sync, the following
// possibilities arise: (1) Both clients make their local updates first, sync
// with each other to detect conflicts. Resolution will cause one of the clients
// to see a new value based on the timestamp. (2) One client's update is synced
// before the other client has a chance to commit its update. The other client's
// update will then not be a conflict but a valid update building on the first
// one's change.
//
// Note that the verification done from the client side can have false positives
// re. the test's success. Since we cannot accurately predict which client's
// updates win, the test passes if we find either outcome. However, this could
// also imply that the sync failed, and each client is merely reading its own
// local value. The verification step mainly verifies that syncbased is still
// responsive and that the data is not corrupt.
//
// TODO(hpucha): We could diff the states of the two clients and ensure they are
// identical. Optionally we could expose inner state of syncbased via some
// debug methods.
func TestV23SyncbasedExchangeDeltasWithConflicts(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
// Run it multiple times to exercise different interactions between sync
// and local updates that change every run due to timing.
for i := 0; i < 10; i++ {
testSyncbasedExchangeDeltasWithConflicts(t)
}
}
func testSyncbasedExchangeDeltasWithConflicts(t *testing.T) {
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
go func() { ok(t, runUpdateData(client0Ctx, "sync0", 5)) }()
d := time.Duration(rand.Int63n(50)) * time.Millisecond
time.Sleep(d)
ok(t, runUpdateData(client1Ctx, "sync1", 5))
time.Sleep(10 * time.Second)
ok(t, runVerifyConflictResolution(client0Ctx, "sync0"))
ok(t, runVerifyConflictResolution(client1Ctx, "sync1"))
}
// TestV23NestedSyncgroups tests the exchange of deltas between two Syncbase
// instances and their clients with nested syncgroups. The 1st client creates
// two syncgroups at prefixes "f" and "foo" and puts some database entries in
// both of them. The 2nd client first joins the syncgroup with prefix "foo" and
// verifies that it reads the corresponding database entries. The 2nd client
// then joins the syncgroup with prefix "f" and verifies that it can read the
// "f" keys.
func TestV23NestedSyncgroups(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sb1Name := "sync0"
sg1Name := "SG1"
sb2Name := "sync0"
sg2Name := "SG2"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb1Name, sg1Name, "c:foo", "", "root:s0", "root:s1"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb2Name, sg2Name, "c:f", "", "root:s0", "root:s1"))
ok(t, runPopulateData(client0Ctx, "sync0", "f", 0))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb1Name, sg1Name))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb2Name, sg2Name))
ok(t, runVerifyNestedSyncgroupData(client1Ctx, "sync1"))
}
// TestV23NestedAndPeerSyncgroups tests the exchange of deltas between three
// Syncbase instances and their clients consisting of nested/peer
// syncgroups. The 1st client creates two syncgroups: SG1 at prefix "foo" and
// SG2 at "f" and puts some database entries in both of them. The 2nd client
// joins the syncgroup SG1 and verifies that it reads the corresponding database
// entries. Client 2 then creates SG3 at prefix "f". The 3rd client joins the
// syncgroups SG2 and SG3 and verifies that it can read all the "f" and "foo"
// keys created by client 1. Client 2 also verifies that it can read all the "f"
// and "foo" keys created by client 1.
func TestV23NestedAndPeerSyncgroups(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
server2Creds := sh.ForkCredentials("s2")
client2Ctx := sh.ForkContext("c2")
sh.StartSyncbase(server2Creds, syncbaselib.Opts{Name: "sync2"}, `{"Read": {"In":["root:c2"]}, "Write": {"In":["root:c2"]}}`)
sb1Name := "sync0"
sg1Name := "SG1"
sb2Name := "sync0"
sg2Name := "SG2"
sb3Name := "sync1"
sg3Name := "SG3"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb1Name, sg1Name, "c:foo", "", "root:s0", "root:s1", "root:s2"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb2Name, sg2Name, "c:f", "", "root:s0", "root:s2"))
ok(t, runPopulateData(client0Ctx, "sync0", "f", 0))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb1Name, sg1Name))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runCreateSyncgroup(client1Ctx, "sync1", sb3Name, sg3Name, "c:f", "", "root:s1", "root:s2"))
ok(t, runSetupAppA(client2Ctx, "sync2"))
ok(t, runJoinSyncgroup(client2Ctx, "sync2", sb2Name, sg2Name))
ok(t, runJoinSyncgroup(client2Ctx, "sync2", sb3Name, sg3Name))
ok(t, runVerifyNestedSyncgroupData(client2Ctx, "sync2"))
ok(t, runVerifyNestedSyncgroupData(client1Ctx, "sync1"))
}
// TestV23SyncbasedGetDeltasPrePopulate tests the sending of deltas between two
// Syncbase instances and their clients with data existing before the creation
// of a syncgroup. The 1st client puts entries in a database then creates a
// syncgroup over that data. The 2nd client joins that syncgroup and reads the
// database entries.
func TestV23SyncbasedGetDeltasPrePopulate(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
sbName := "sync0"
sgName := "SG1"
// Populate collection data before creating the syncgroup. Also populate
// with data that is not part of the syncgroup to verify filtering.
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runVerifyNonSyncgroupData(client1Ctx, "sync1", "bar"))
}
// TestV23SyncbasedGetDeltasMultiApp tests the sending of deltas between two
// Syncbase instances and their clients across multiple databases and
// collections. The 1st client puts entries in multiple collections across
// multiple databases then creates multiple syncgroups (one per database) over
// that data. The 2nd client joins these syncgroups and reads all the data.
func TestV23SyncbasedGetDeltasMultiApp(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
na, nd, nt := 2, 2, 2 // number of apps, dbs, collections
ok(t, runSetupAppMulti(client0Ctx, "sync0", na, nd, nt))
ok(t, runPopulateSyncgroupMulti(client0Ctx, "sync0", na, nd, nt, "foo", "bar"))
ok(t, runSetupAppMulti(client1Ctx, "sync1", na, nd, nt))
ok(t, runJoinSyncgroupMulti(client1Ctx, "sync1", "sync0", na, nd))
ok(t, runVerifySyncgroupDataMulti(client1Ctx, "sync1", na, nd, nt, "foo", "bar"))
}
// TestV23SyncgroupSync tests the syncing of syncgroup metadata. The 1st client
// creates the syncgroup SG1, and clients 2 and 3 join this syncgroup. All three
// clients must learn of the remaining two. Note that client 2 relies on
// syncgroup metadata syncing to learn of client 3 .
func TestV23SyncgroupSync(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
server0Creds := sh.ForkCredentials("s0")
client0Ctx := sh.ForkContext("c0")
sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
server1Creds := sh.ForkCredentials("s1")
client1Ctx := sh.ForkContext("c1")
sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
server2Creds := sh.ForkCredentials("s2")
client2Ctx := sh.ForkContext("c2")
sh.StartSyncbase(server2Creds, syncbaselib.Opts{Name: "sync2"}, `{"Read": {"In":["root:c2"]}, "Write": {"In":["root:c2"]}}`)
sbName := "sync0"
sgName := "SG1"
ok(t, runSetupAppA(client0Ctx, "sync0"))
ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1", "root:s2"))
ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 1))
ok(t, runSetupAppA(client1Ctx, "sync1"))
ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 2))
ok(t, runGetSyncgroupMembers(client1Ctx, "sync1", sbName, sgName, 2))
ok(t, runSetupAppA(client2Ctx, "sync2"))
ok(t, runJoinSyncgroup(client2Ctx, "sync2", sbName, sgName))
ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 3))
ok(t, runGetSyncgroupMembers(client1Ctx, "sync1", sbName, sgName, 3))
ok(t, runGetSyncgroupMembers(client2Ctx, "sync2", sbName, sgName, 3))
ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
ok(t, runVerifySyncgroupData(client2Ctx, "sync2", "foo", 0, 10, false))
}
////////////////////////////////////
// Helpers.
// toSgPrefixes converts, for example, "a:b,c:" to
// [{Collection: {"u", "a"}, Row: "b"}, {Collection: {"u", "c"}, Row: ""}].
func toSgPrefixes(csv string) []wire.CollectionRow {
strs := strings.Split(csv, ",")
res := make([]wire.CollectionRow, len(strs))
for i, v := range strs {
parts := strings.SplitN(v, ":", 2)
if len(parts) != 2 {
panic(fmt.Sprintf("invalid prefix string: %q", v))
}
res[i] = wire.CollectionRow{CollectionId: wire.Id{"u", parts[0]}, Row: parts[1]}
}
return res
}
// TODO(hpucha): Look into refactoring scan logic out of the helpers, and
// avoiding gets when we can scan.
func runSetupAppA(ctx *context.T, serviceName string) error {
d := dbHandle(serviceName)
if err := d.Create(ctx, nil); err != nil {
return err
}
c := d.CollectionForId(testCollection)
if err := c.Create(ctx, nil); err != nil {
return err
}
return nil
}
func runCreateSyncgroup(ctx *context.T, serviceName, sbName, sgName, sgPrefixes, mtName string, sgBlessings ...string) error {
d := dbHandle(serviceName)
mtNames := v23.GetNamespace(ctx).Roots()
if mtName != "" {
mtNames = []string{mtName}
}
spec := wire.SyncgroupSpec{
Description: "test syncgroup sg",
Perms: perms(sgBlessings...),
Prefixes: toSgPrefixes(sgPrefixes),
MountTables: mtNames,
}
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
info := wire.SyncgroupMemberInfo{SyncPriority: 8}
if err := sg.Create(ctx, spec, info); err != nil {
return fmt.Errorf("Create SG %v failed: %v\n", sgName, err)
}
return nil
}
func runJoinSyncgroup(ctx *context.T, sbNameLocal, sbNameRemote, sgName string) error {
d := dbHandle(sbNameLocal)
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
info := wire.SyncgroupMemberInfo{SyncPriority: 10}
if _, err := sg.Join(ctx, sbNameRemote, "", info); err != nil {
return fmt.Errorf("Join SG %q, %q failed: %v\n", sbNameRemote, sgName, err)
}
return nil
}
func runSetSyncgroupSpec(ctx *context.T, serviceName, sbName, sgName, sgDesc, sgPrefixes string, sgBlessings ...string) error {
d := dbHandle(serviceName)
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
mtNames := v23.GetNamespace(ctx).Roots()
spec := wire.SyncgroupSpec{
Description: sgDesc,
Prefixes: toSgPrefixes(sgPrefixes),
Perms: perms(sgBlessings...),
MountTables: mtNames,
}
if err := sg.SetSpec(ctx, spec, ""); err != nil {
return fmt.Errorf("SetSpec SG %q failed: %v\n", sgName, err)
}
return nil
}
func runGetSyncgroupSpec(ctx *context.T, serviceName, sbName, sgName, wantDesc, wantPrefixes string, wantBlessings ...string) error {
d := dbHandle(serviceName)
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
wantPfxs := toSgPrefixes(wantPrefixes)
wantPerms := perms(wantBlessings...)
var spec wire.SyncgroupSpec
var err error
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
spec, _, err = sg.GetSpec(ctx)
if err != nil {
return fmt.Errorf("GetSpec SG %q failed: %v\n", sgName, err)
}
if spec.Description == wantDesc {
break
}
}
if spec.Description != wantDesc || !reflect.DeepEqual(spec.Prefixes, wantPfxs) || !reflect.DeepEqual(spec.Perms, wantPerms) {
return fmt.Errorf("GetSpec SG %q failed: description got %v, want %v, prefixes got %v, want %v, perms got %v, want %v\n", sgName, spec.Description, wantDesc, spec.Prefixes, wantPfxs, spec.Perms, wantPerms)
}
return nil
}
func runGetSyncgroupMembers(ctx *context.T, serviceName, sbName, sgName string, wantMembers uint64) error {
d := dbHandle(serviceName)
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
var gotMembers uint64
var members map[string]wire.SyncgroupMemberInfo
for i := 0; i < 4; i++ {
time.Sleep(500 * time.Millisecond)
members, err := sg.GetMembers(ctx)
if err != nil {
return fmt.Errorf("GetMembers SG %q failed: %v\n", sgName, err)
}
gotMembers = uint64(len(members))
if wantMembers == gotMembers {
return nil
}
}
return fmt.Errorf("GetMembers SG %q failed: members got %+v, want %v\n", sgName, members, wantMembers)
}
func runPopulateData(ctx *context.T, serviceName, keyPrefix string, start uint64) error {
d := dbHandle(serviceName)
// Do Puts.
c := d.CollectionForId(testCollection)
for i := start; i < start+10; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
r := c.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
return nil
}
func runUpdateData(ctx *context.T, serviceName string, start uint64) error {
d := dbHandle(serviceName)
// Do Puts.
c := d.CollectionForId(testCollection)
for i := start; i < start+5; i++ {
key := fmt.Sprintf("foo%d", i)
r := c.Row(key)
if err := r.Put(ctx, "testkey"+serviceName+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
return nil
}
func runDeleteData(ctx *context.T, serviceName string, start uint64) error {
d := dbHandle(serviceName)
// Do Puts.
c := d.CollectionForId(testCollection)
for i := start; i < start+5; i++ {
key := fmt.Sprintf("foo%d", i)
r := c.Row(key)
if err := r.Delete(ctx); err != nil {
return fmt.Errorf("r.Delete() failed: %v\n", err)
}
}
return nil
}
func runSetCollectionPermissions(ctx *context.T, serviceName string, aclBlessings ...string) error {
d := dbHandle(serviceName)
// Set acl.
c := d.CollectionForId(testCollection)
if err := c.SetPermissions(ctx, perms(aclBlessings...)); err != nil {
return fmt.Errorf("c.SetPermissions() failed: %v\n", err)
}
return nil
}
func runVerifySyncgroupData(ctx *context.T, serviceName, keyPrefix string, start, count uint64, skipScan bool) error {
d := dbHandle(serviceName)
// Wait for a bit (up to 4 sec) until the last key appears.
c := d.CollectionForId(testCollection)
lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
r := c.Row(lastKey)
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
// Verify that all keys and values made it correctly.
for i := start; i < start+count; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
if !skipScan {
// Re-verify using a scan operation.
stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
for i := 0; stream.Advance(); i++ {
want := fmt.Sprintf("%s%d", keyPrefix, i)
got := stream.Key()
if got != want {
return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
}
want = "testkey" + want
if err := stream.Value(&got); err != nil {
return fmt.Errorf("cannot fetch value in scan: %v\n", err)
}
if got != want {
return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
}
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
}
return nil
}
func runVerifySyncgroupDataWithWatch(ctx *context.T, serviceName, keyPrefix string, count int, expectDelete bool, beforeSyncMarker watch.ResumeMarker) error {
if count == 0 {
return fmt.Errorf("count cannot be 0: got %d", count)
}
d := dbHandle(serviceName)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
stream, err := d.Watch(ctxWithTimeout, testCollection, keyPrefix, beforeSyncMarker)
if err != nil {
return fmt.Errorf("watch error: %v\n", err)
}
var changes []syncbase.WatchChange
for i := 0; stream.Advance() && i < count; i++ {
if err := stream.Err(); err != nil {
return fmt.Errorf("watch stream error: %v\n", err)
}
changes = append(changes, stream.Change())
}
sort.Sort(ByRow(changes))
if got, want := len(changes), count; got != want {
return fmt.Errorf("unexpected number of changes: got %d, want %d", got, want)
}
for i, change := range changes {
if got, want := change.Collection, testCollection; got != want {
return fmt.Errorf("unexpected watch collection: got %v, want %v", got, want)
}
if got, want := change.Row, fmt.Sprintf("%s%d", keyPrefix, i); got != want {
return fmt.Errorf("unexpected watch row: got %q, want %q", got, want)
}
if got, want := change.FromSync, true; got != want {
return fmt.Errorf("unexpected FromSync value: got %t, want %t", got, want)
}
if expectDelete {
if got, want := change.ChangeType, syncbase.DeleteChange; got != want {
return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
}
return nil
}
var result string
if got, want := change.ChangeType, syncbase.PutChange; got != want {
return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
}
if err := change.Value(&result); err != nil {
return fmt.Errorf("couldn't decode watch value: %v", err)
}
if got, want := result, fmt.Sprintf("testkey%s%d", keyPrefix, i); got != want {
return fmt.Errorf("unexpected watch value: got %q, want %q", got, want)
}
}
return nil
}
func runVerifyDeletedData(ctx *context.T, serviceName, keyPrefix string) error {
d := dbHandle(serviceName)
// Wait for a bit for deletions to propagate.
c := d.CollectionForId(testCollection)
r := c.Row("foo4")
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
break
}
}
// Verify using a scan operation.
stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
count := 0
for i := 5; stream.Advance(); i++ {
want := fmt.Sprintf("%s%d", keyPrefix, i)
got := stream.Key()
if got != want {
return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
}
want = "testkey" + want
if err := stream.Value(&got); err != nil {
return fmt.Errorf("cannot fetch value in scan: %v\n", err)
}
if got != want {
return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
}
count++
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
if count != 5 {
return fmt.Errorf("scan stream count error: %v\n", count)
}
return nil
}
func runVerifyConflictResolution(ctx *context.T, serviceName string) error {
d := dbHandle(serviceName)
c := d.CollectionForId(testCollection)
wantData := []struct {
start uint64
count uint64
valPfx []string
}{
{0, 5, []string{"testkey"}},
{5, 5, []string{"testkeysync0", "testkeysync1"}},
}
// Verify that all keys and values made it correctly.
for _, d := range wantData {
for i := d.start; i < d.start+d.count; i++ {
key := fmt.Sprintf("foo%d", i)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
match := 0
for _, p := range d.valPfx {
want := p + key
if got == want {
match++
}
}
if match != 1 {
return fmt.Errorf("unexpected value: got %q, match %v, want %v\n", got, match, d.valPfx)
}
}
}
return nil
}
func runVerifyNonSyncgroupData(ctx *context.T, serviceName, keyPrefix string) error {
d := dbHandle(serviceName)
c := d.CollectionForId(testCollection)
// Verify through a scan that none of that data exists.
count := 0
stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
for stream.Advance() {
count++
}
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
if count > 0 {
return fmt.Errorf("found %d entries in %q prefix that should not be there\n", count, keyPrefix)
}
return nil
}
func runVerifyLocalAndRemoteData(ctx *context.T, serviceName string) error {
d := dbHandle(serviceName)
c := d.CollectionForId(testCollection)
// Wait for a bit (up to 4 sec) until the last key appears.
r := c.Row("foo19")
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
wantData := []struct {
start uint64
count uint64
valPfx string
}{
{0, 5, "testkey"},
{5, 5, "testkeysync1"},
{10, 10, "testkey"},
}
// Verify that all keys and values made it correctly.
for _, d := range wantData {
for i := d.start; i < d.start+d.count; i++ {
key := fmt.Sprintf("foo%d", i)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := d.valPfx + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
}
return nil
}
func runVerifyLostAccess(ctx *context.T, serviceName, keyPrefix string, start, count uint64) error {
d := dbHandle(serviceName)
// Wait for a bit (up to 4 sec) until the last key disappears.
c := d.CollectionForId(testCollection)
lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
r := c.Row(lastKey)
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoAccess.ID {
break
}
}
// Verify that all keys and values have lost access.
for i := start; i < start+count; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); verror.ErrorID(err) != verror.ErrNoAccess.ID {
return fmt.Errorf("r.Get() didn't fail: %v\n", err)
}
}
return nil
}
func runVerifyNestedSyncgroupData(ctx *context.T, serviceName string) error {
d := dbHandle(serviceName)
// Wait for a bit (up to 8 sec) until the last key appears. This chosen
// time interval is dependent on how fast the membership view is
// refreshed (currently 2 seconds) and how frequently we sync with peers
// (every 50 ms), and then adding a substantial safeguard to it to
// ensure that the test is not flaky even in somewhat abnormal
// conditions. Note that we wait longer than the 2 node tests since more
// nodes implies more pair-wise communication before achieving steady
// state.
c := d.CollectionForId(testCollection)
r := c.Row("f9")
for i := 0; i < 8; i++ {
time.Sleep(1 * time.Second)
var value string
if err := r.Get(ctx, &value); err == nil {
break
}
}
// Verify that all keys and values made it correctly.
pfxs := []string{"foo", "f"}
for _, p := range pfxs {
for i := 0; i < 10; i++ {
key := fmt.Sprintf("%s%d", p, i)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
}
}
}
return nil
}
func runSetupAppMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int) error {
svc := syncbase.NewService(serviceName)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
d.Create(ctx, nil)
for k := 0; k < numCxs; k++ {
cName := fmt.Sprintf("c%d", k)
d.CollectionForId(wire.Id{"u", cName}).Create(ctx, nil)
}
}
}
return nil
}
func runPopulateSyncgroupMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int, prefixes ...string) error {
mtNames := v23.GetNamespace(ctx).Roots()
svc := syncbase.NewService(serviceName)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
// For each collection, pre-populate entries on each prefix.
// Also determine the syncgroup prefixes.
var sgPrefixes []string
for k := 0; k < numCxs; k++ {
cName := fmt.Sprintf("c%d", k)
c := d.CollectionForId(wire.Id{"u", cName})
for _, pfx := range prefixes {
p := fmt.Sprintf("%s:%s", cName, pfx)
sgPrefixes = append(sgPrefixes, p)
for n := 0; n < 10; n++ {
key := fmt.Sprintf("%s%d", pfx, n)
r := c.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
}
}
// Create one syncgroup per database across all collections
// and prefixes.
sgName := fmt.Sprintf("%s_%s", appName, dbName)
spec := wire.SyncgroupSpec{
Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
Perms: perms("root:s0", "root:s1"),
Prefixes: toSgPrefixes(strings.Join(sgPrefixes, ",")),
MountTables: mtNames,
}
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
info := wire.SyncgroupMemberInfo{SyncPriority: 8}
if err := sg.Create(ctx, spec, info); err != nil {
return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
}
}
}
return nil
}
func runJoinSyncgroupMulti(ctx *context.T, sbNameLocal, sbNameRemote string, numApps, numDbs int) error {
svc := syncbase.NewService(sbNameLocal)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
sgName := fmt.Sprintf("%s_%s", appName, dbName)
sg := d.Syncgroup(wire.Id{Name: sgName, Blessing: "blessing"})
info := wire.SyncgroupMemberInfo{SyncPriority: 10}
if _, err := sg.Join(ctx, sbNameRemote, "", info); err != nil {
return fmt.Errorf("Join SG Multi %q failed: %v\n", sgName, err)
}
}
}
return nil
}
func runVerifySyncgroupDataMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int, prefixes ...string) error {
svc := syncbase.NewService(serviceName)
time.Sleep(20 * time.Second)
for i := 0; i < numApps; i++ {
appName := fmt.Sprintf("a%d", i)
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
for k := 0; k < numCxs; k++ {
cName := fmt.Sprintf("c%d", k)
c := d.CollectionForId(wire.Id{"u", cName})
for _, pfx := range prefixes {
for n := 0; n < 10; n++ {
key := fmt.Sprintf("%s%d", pfx, n)
r := c.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
want := "testkey" + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q\n",
got, want)
}
}
}
}
}
}
return nil
}
func getResumeMarker(ctx *context.T, serviceName string) (watch.ResumeMarker, error) {
d := dbHandle(serviceName)
return d.GetResumeMarker(ctx)
}
func ok(t *testing.T, err error) {
if err != nil {
debug.PrintStack()
t.Fatal(err)
}
}
func TestMain(m *testing.M) {
v23test.TestMain(m)
}
// ByRow implements sort.Interface for []syncbase.WatchChange based on the Row field.
type ByRow []syncbase.WatchChange
func (c ByRow) Len() int { return len(c) }
func (c ByRow) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ByRow) Less(i, j int) bool { return c[i].Row < c[j].Row }