syncbase/vsync: more sync integration tests
* Pre-populate a DB before creating a SyncGroup.
* Sync across multiple apps/dbs/tables/prefixes.
Change-Id: I3122487966296b0feee7f336d68ea001b5465651
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 505805f..f127e0d 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -74,11 +74,11 @@
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
- tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
}
// V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
@@ -106,14 +106,14 @@
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
- tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
- tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "foo", "10")
+ tu.RunClient(t, client1Creds, runPopulateData, "sync1", "foo", "10")
tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
}
@@ -145,12 +145,12 @@
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s1")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
- tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg2Name)
tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
}
@@ -191,12 +191,12 @@
tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1", "root/s2")
tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s2")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
- tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
tu.RunClient(t, client1Creds, runCreateSyncGroup, "sync1", sg3Name, "tb:f", "root/s1", "root/s2")
tu.RunClient(t, client2Creds, runSetupAppA, "sync2")
@@ -207,6 +207,70 @@
tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
}
+// V23TestSyncbasedGetDeltasPrePopulate tests the sending of deltas between two
+// Syncbase instances and their clients with data existing before the creation
+// of a SyncGroup. The 1st client puts entries in a database then creates a
+// SyncGroup over that data. The 2nd client joins that SyncGroup and reads the
+// database entries.
+func V23TestSyncbasedGetDeltasPrePopulate(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+ // Populate table data before creating the SyncGroup. Also populate
+ // with data that is not part of the SyncGroup to verify filtering.
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+ tu.RunClient(t, client1Creds, runVerifyNonSyncGroupData, "sync1", "bar")
+}
+
+// V23TestSyncbasedGetDeltasMultiApp tests the sending of deltas between two
+// Syncbase instances and their clients across multiple apps, databases, and
+// tables. The 1st client puts entries in multiple tables across multiple
+// app databases then creates multiple SyncGroups (one per database) over that
+// data. The 2nd client joins these SyncGroups and reads all the data.
+func V23TestSyncbasedGetDeltasMultiApp(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ sgNamePrefix := naming.Join("sync0", constants.SyncbaseSuffix)
+ na, nd, nt := "2", "2", "2" // number of apps, dbs, tables
+
+ tu.RunClient(t, client0Creds, runSetupAppMulti, "sync0", na, nd, nt)
+ tu.RunClient(t, client0Creds, runPopulateSyncGroupMulti, "sync0", sgNamePrefix, na, nd, nt, "foo", "bar")
+
+ tu.RunClient(t, client1Creds, runSetupAppMulti, "sync1", na, nd, nt)
+ tu.RunClient(t, client1Creds, runJoinSyncGroupMulti, "sync1", sgNamePrefix, na, nd)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupDataMulti, "sync1", na, nd, nt, "foo", "bar")
+}
+
////////////////////////////////////
// Helpers.
@@ -241,7 +305,7 @@
sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{8}
if err := sg.Create(ctx, spec, info); err != nil {
- return fmt.Errorf("Create SG %q failed: %v", args[1], err)
+ return fmt.Errorf("Create SG %q failed: %v\n", args[1], err)
}
return nil
}, "runCreateSyncGroup")
@@ -256,12 +320,12 @@
sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
- return fmt.Errorf("Join SG %q failed: %v", args[1], err)
+ return fmt.Errorf("Join SG %q failed: %v\n", args[1], err)
}
return nil
}, "runJoinSyncGroup")
-var runPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+var runPopulateData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -276,11 +340,11 @@
key := fmt.Sprintf("%s%d", args[1], i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
- return fmt.Errorf("r.Put() failed: %v", err)
+ return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
return nil
-}, "runPopulateSyncGroup")
+}, "runPopulateData")
var runUpdateData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
@@ -297,7 +361,7 @@
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey1"+key); err != nil {
- return fmt.Errorf("r.Put() failed: %v", err)
+ return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
@@ -314,9 +378,9 @@
// Wait for a bit (up to 4 sec) until the last key appears.
tb := d.Table("tb")
- start, _ := strconv.ParseUint(args[1], 10, 64)
- count, _ := strconv.ParseUint(args[2], 10, 64)
- lastKey := fmt.Sprintf("foo%d", start+count-1)
+ start, _ := strconv.ParseUint(args[2], 10, 64)
+ count, _ := strconv.ParseUint(args[3], 10, 64)
+ lastKey := fmt.Sprintf("%s%d", args[1], start+count-1)
r := tb.Row(lastKey)
for i := 0; i < 8; i++ {
@@ -329,7 +393,7 @@
// Verify that all keys and values made it correctly.
for i := start; i < start+count; i++ {
- key := fmt.Sprintf("foo%d", i)
+ key := fmt.Sprintf("%s%d", args[1], i)
r := tb.Row(key)
var got string
if err := r.Get(ctx, &got); err != nil {
@@ -342,9 +406,9 @@
}
// Re-verify using a scan operation.
- stream := tb.Scan(ctx, nosql.Prefix("foo"))
+ stream := tb.Scan(ctx, nosql.Prefix(args[1]))
for i := 0; stream.Advance(); i++ {
- want := fmt.Sprintf("foo%d", i)
+ want := fmt.Sprintf("%s%d", args[1], i)
got := stream.Key()
if got != want {
return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
@@ -364,6 +428,31 @@
return nil
}, "runVerifySyncGroupData")
+var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d")
+ tb := d.Table("tb")
+
+ // Verify through a scan that none of that data exists.
+ count := 0
+ stream := tb.Scan(ctx, nosql.Prefix(args[1]))
+ for stream.Advance() {
+ count++
+ }
+
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("scan stream error: %v\n", err)
+ }
+ if count > 0 {
+ return fmt.Errorf("found %d entries in %s prefix that should not be there\n", count, args[1])
+ }
+
+ return nil
+}, "runVerifyNonSyncGroupData")
+
var runVerifyLocalAndRemoteData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -447,3 +536,172 @@
}
return nil
}, "runVerifyNestedSyncGroupData")
+
+var runSetupAppMulti = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ svc := syncbase.NewService(args[0])
+ numApps, _ := strconv.Atoi(args[1])
+ numDbs, _ := strconv.Atoi(args[2])
+ numTbs, _ := strconv.Atoi(args[3])
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+ a := svc.App(appName)
+ a.Create(ctx, nil)
+
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := a.NoSQLDatabase(dbName)
+ d.Create(ctx, nil)
+
+ for k := 0; k < numTbs; k++ {
+ tbName := fmt.Sprintf("tb%d", k)
+ d.CreateTable(ctx, tbName, nil)
+ }
+ }
+ }
+
+ return nil
+}, "runSetupAppMulti")
+
+var runPopulateSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ mtName := env.Vars[ref.EnvNamespacePrefix]
+
+ svc := syncbase.NewService(args[0])
+ sgNamePrefix := args[1]
+ numApps, _ := strconv.Atoi(args[2])
+ numDbs, _ := strconv.Atoi(args[3])
+ numTbs, _ := strconv.Atoi(args[4])
+ prefixes := args[5:]
+
+ // For each app...
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+ a := svc.App(appName)
+
+ // For each database...
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := a.NoSQLDatabase(dbName)
+
+ // For each table, pre-populate entries on each prefix.
+ // Also determine the SyncGroup prefixes.
+ var sgPrefixes []string
+ for k := 0; k < numTbs; k++ {
+ tbName := fmt.Sprintf("tb%d", k)
+ tb := d.Table(tbName)
+
+ for _, pfx := range prefixes {
+ p := fmt.Sprintf("%s:%s", tbName, pfx)
+ sgPrefixes = append(sgPrefixes, p)
+
+ for n := 0; n < 10; n++ {
+ key := fmt.Sprintf("%s%d", pfx, n)
+ r := tb.Row(key)
+ if err := r.Put(ctx, "testkey"+key); err != nil {
+ return fmt.Errorf("r.Put() failed: %v\n", err)
+ }
+ }
+ }
+ }
+
+ // Create one SyncGroup per database across all tables
+ // and prefixes.
+ sgName := naming.Join(sgNamePrefix, appName, dbName)
+ spec := wire.SyncGroupSpec{
+ Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
+ Perms: perms("root/s0", "root/s1"),
+ Prefixes: sgPrefixes,
+ MountTables: []string{mtName},
+ }
+
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{8}
+ if err := sg.Create(ctx, spec, info); err != nil {
+ return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
+ }
+ }
+ }
+
+ return nil
+}, "runPopulateSyncGroupMulti")
+
+var runJoinSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ svc := syncbase.NewService(args[0])
+ sgNamePrefix := args[1]
+ numApps, _ := strconv.Atoi(args[2])
+ numDbs, _ := strconv.Atoi(args[3])
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+ a := svc.App(appName)
+
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := a.NoSQLDatabase(dbName)
+
+ sgName := naming.Join(sgNamePrefix, appName, dbName)
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{10}
+ if _, err := sg.Join(ctx, info); err != nil {
+ return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+ }
+ }
+ }
+
+ return nil
+}, "runJoinSyncGroupMulti")
+
+var runVerifySyncGroupDataMulti = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ svc := syncbase.NewService(args[0])
+ numApps, _ := strconv.Atoi(args[1])
+ numDbs, _ := strconv.Atoi(args[2])
+ numTbs, _ := strconv.Atoi(args[3])
+ prefixes := args[4:]
+
+ time.Sleep(10 * time.Second)
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+ a := svc.App(appName)
+
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := a.NoSQLDatabase(dbName)
+
+ for k := 0; k < numTbs; k++ {
+ tbName := fmt.Sprintf("tb%d", k)
+ tb := d.Table(tbName)
+
+ for _, pfx := range prefixes {
+ for n := 0; n < 10; n++ {
+ key := fmt.Sprintf("%s%d", pfx, n)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := "testkey" + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n",
+ got, want)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+}, "runVerifySyncGroupDataMulti")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index c90e196..3c8bc00 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -44,3 +44,11 @@
func TestV23NestedAndPeerSyncGroups(t *testing.T) {
v23tests.RunTest(t, V23TestNestedAndPeerSyncGroups)
}
+
+func TestV23SyncbasedGetDeltasPrePopulate(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedGetDeltasPrePopulate)
+}
+
+func TestV23SyncbasedGetDeltasMultiApp(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedGetDeltasMultiApp)
+}