syncbase/vsync: More integrations tests covering nested and peer syncgroups.
Change-Id: I68724f8b89fc1fffaac8819da9eead679d102c64
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 100a38d..505805f 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -42,8 +42,13 @@
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
- tu.RunClient(t, client0Creds, runCreateSyncGroup)
- tu.RunClient(t, client1Creds, runJoinSyncGroup)
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
}
// V23TestSyncbasedGetDeltas tests the sending of deltas between two Syncbase
@@ -65,9 +70,14 @@
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
- tu.RunClient(t, client0Creds, runCreateSyncGroup)
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
- tu.RunClient(t, client1Creds, runJoinSyncGroup)
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
}
@@ -92,44 +102,146 @@
`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
- tu.RunClient(t, client0Creds, runCreateSyncGroup)
- tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
- tu.RunClient(t, client1Creds, runJoinSyncGroup)
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
- tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "10")
+ tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "foo", "10")
tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
}
+// V23TestNestedSyncGroups tests the exchange of deltas between two Syncbase
+// instances and their clients with nested SyncGroups. The 1st client creates
+// two SyncGroups at prefixes "f" and "foo" and puts some database entries in
+// both of them. The 2nd client first joins the SyncGroup with prefix "foo" and
+// verifies that it reads the corresponding database entries. The 2nd client
+// then joins the SyncGroup with prefix "f" and verifies that it can read the
+// "f" keys.
+func V23TestNestedSyncGroups(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ sg1Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+ sg2Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG2")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg2Name)
+ tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
+}
+
+// V23TestNestedAndPeerSyncGroups tests the exchange of deltas between three
+// Syncbase instances and their clients consisting of nested/peer
+// SyncGroups. The 1st client creates two SyncGroups: SG1 at prefix "foo" and
+// SG2 at "f" and puts some database entries in both of them. The 2nd client
+// joins the SyncGroup SG1 and verifies that it reads the corresponding database
+// entries. Client 2 then creates SG3 at prefix "f". The 3rd client joins the
+// SyncGroups SG2 and SG3 and verifies that it can read all the "f" and "foo"
+// keys created by client 1. Client 2 also verifies that it can read all the "f"
+// and "foo" keys created by client 1.
+func V23TestNestedAndPeerSyncGroups(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ server2Creds, _ := t.Shell().NewChildCredentials("s2")
+ client2Creds, _ := t.Shell().NewChildCredentials("c2")
+ cleanSync2 := tu.StartSyncbased(t, server2Creds, "sync2", "",
+ `{"Read": {"In":["root/c2"]}, "Write": {"In":["root/c2"]}}`)
+ defer cleanSync2()
+
+ sg1Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+ sg2Name := naming.Join("sync0", constants.SyncbaseSuffix, "SG2")
+ sg3Name := naming.Join("sync1", constants.SyncbaseSuffix, "SG3")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1", "root/s2")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s2")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+ tu.RunClient(t, client1Creds, runCreateSyncGroup, "sync1", sg3Name, "tb:f", "root/s1", "root/s2")
+
+ tu.RunClient(t, client2Creds, runSetupAppA, "sync2")
+ tu.RunClient(t, client2Creds, runJoinSyncGroup, "sync2", sg2Name)
+ tu.RunClient(t, client2Creds, runJoinSyncGroup, "sync2", sg3Name)
+ tu.RunClient(t, client2Creds, runVerifyNestedSyncGroupData, "sync2")
+
+ tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
+}
+
////////////////////////////////////
// Helpers.
-var runCreateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+var runSetupAppA = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
- a := syncbase.NewService("sync0").App("a")
+ a := syncbase.NewService(args[0]).App("a")
a.Create(ctx, nil)
d := a.NoSQLDatabase("d")
d.Create(ctx, nil)
d.CreateTable(ctx, "tb", nil)
+ return nil
+}, "runSetupAppA")
+
+var runCreateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d")
+
mtName := env.Vars[ref.EnvNamespacePrefix]
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
- Perms: perms("root/s0", "root/s1"),
- Prefixes: []string{"tb:foo"},
+ Perms: perms(args[3:]...),
+ Prefixes: []string{args[2]},
MountTables: []string{mtName},
}
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
- sg := d.SyncGroup(sgName)
+ sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{8}
if err := sg.Create(ctx, spec, info); err != nil {
- return fmt.Errorf("Create SG %q failed: %v", sgName, err)
+ return fmt.Errorf("Create SG %q failed: %v", args[1], err)
}
return nil
}, "runCreateSyncGroup")
@@ -138,17 +250,13 @@
ctx, shutdown := v23.Init()
defer shutdown()
- a := syncbase.NewService("sync1").App("a")
- a.Create(ctx, nil)
+ a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d")
- d.Create(ctx, nil)
- d.CreateTable(ctx, "tb", nil)
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
- sg := d.SyncGroup(sgName)
+ sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
- return fmt.Errorf("Join SG %q failed: %v", sgName, err)
+ return fmt.Errorf("Join SG %q failed: %v", args[1], err)
}
return nil
}, "runJoinSyncGroup")
@@ -162,16 +270,15 @@
// Do Puts.
tb := d.Table("tb")
- start, _ := strconv.ParseUint(args[1], 10, 64)
+ start, _ := strconv.ParseUint(args[2], 10, 64)
for i := start; i < start+10; i++ {
- key := fmt.Sprintf("foo%d", i)
+ key := fmt.Sprintf("%s%d", args[1], i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
return fmt.Errorf("r.Put() failed: %v", err)
}
}
-
return nil
}, "runPopulateSyncGroup")
@@ -302,3 +409,41 @@
}
return nil
}, "runVerifyLocalAndRemoteData")
+
+var runVerifyNestedSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d")
+
+ // Wait for a bit (up to 8 sec) until the last key appears.
+ tb := d.Table("tb")
+
+ r := tb.Row("f9")
+ for i := 0; i < 8; i++ {
+ time.Sleep(1 * time.Second)
+ var value string
+ if err := r.Get(ctx, &value); err == nil {
+ break
+ }
+ }
+
+ // Verify that all keys and values made it correctly.
+ pfxs := []string{"foo", "f"}
+ for _, p := range pfxs {
+ for i := 0; i < 10; i++ {
+ key := fmt.Sprintf("%s%d", p, i)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := "testkey" + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+ }
+ }
+ }
+ return nil
+}, "runVerifyNestedSyncGroupData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index 29df45a..c90e196 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -36,3 +36,11 @@
func TestV23SyncbasedExchangeDeltas(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
}
+
+func TestV23NestedSyncGroups(t *testing.T) {
+ v23tests.RunTest(t, V23TestNestedSyncGroups)
+}
+
+func TestV23NestedAndPeerSyncGroups(t *testing.T) {
+ v23tests.RunTest(t, V23TestNestedAndPeerSyncGroups)
+}