Merge "syncbase/vsync: * Integration test for bi-directional syncing. * Made client/servers independent principals in integration tests."
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 849f352..100a38d 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "strconv"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -21,29 +22,91 @@
"v.io/x/ref/test/v23tests"
)
-// TODO(rdaoud): change the credentials of servers s0 and s1 to independent
-// principals derived from the same root.
-
//go:generate v23 test generate
+// V23TestSyncbasedJoinSyncGroup tests the creation and joining of a
+// SyncGroup. Client0 creates a SyncGroup at Syncbase0. Client1 requests to join
+// the SyncGroup at Syncbase1. Syncbase1 in turn requests Syncbase0 to join the
+// SyncGroup.
func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
- client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
- `{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
defer cleanSync0()
- server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
- client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
- `{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
defer cleanSync1()
tu.RunClient(t, client0Creds, runCreateSyncGroup)
tu.RunClient(t, client1Creds, runJoinSyncGroup)
}
+// V23TestSyncbasedGetDeltas tests the sending of deltas between two Syncbase
+// instances and their clients. The 1st client creates a SyncGroup and puts
+// some database entries in it. The 2nd client joins that SyncGroup and reads
+// the database entries. This verifies the end-to-end synchronization of data
+// along the path: client0--Syncbase0--Syncbase1--client1.
+func V23TestSyncbasedGetDeltas(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ tu.RunClient(t, client0Creds, runCreateSyncGroup)
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+}
+
+// V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
+// Syncbase instances and their clients. The 1st client creates a SyncGroup and
+// puts some database entries in it. The 2nd client joins that SyncGroup and
+// reads the database entries. The 2nd client then updates a subset of existing
+// keys and adds more keys and the 1st client verifies that it can read these
+// keys. This verifies the end-to-end bi-directional synchronization of data
+// along the path: client0--Syncbase0--Syncbase1--client1.
+func V23TestSyncbasedExchangeDeltas(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ tu.RunClient(t, client0Creds, runCreateSyncGroup)
+ tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
+
+ tu.RunClient(t, client1Creds, runJoinSyncGroup)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+
+ tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
+ tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "10")
+
+ tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
+}
+
+////////////////////////////////////
+// Helpers.
+
var runCreateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -52,16 +115,17 @@
a.Create(ctx, nil)
d := a.NoSQLDatabase("d")
d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
mtName := env.Vars[ref.EnvNamespacePrefix]
-
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
- Perms: perms("root/s0", "root/s0/s1"),
- Prefixes: []string{"t1:foo"},
+ Perms: perms("root/s0", "root/s1"),
+ Prefixes: []string{"tb:foo"},
MountTables: []string{mtName},
}
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{8}
if err := sg.Create(ctx, spec, info); err != nil {
@@ -78,8 +142,9 @@
a.Create(ctx, nil)
d := a.NoSQLDatabase("d")
d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
@@ -88,57 +153,18 @@
return nil
}, "runJoinSyncGroup")
-// V23TestSyncbasedGetDeltas tests the exchange of deltas between two Syncbase
-// instances and their clients. The 1st client creates a SyncGroup and puts
-// some database entries in it. The 2nd client joins that SyncGroup and reads
-// the database entries. This verifies the end-to-end synchronization of data
-// along the path: client1--Syncbase1--Syncbase2--client2.
-func V23TestSyncbasedGetDeltas(t *v23tests.T) {
- v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
- server0Creds, _ := t.Shell().NewChildCredentials("s0")
- client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
- cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
- `{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
- defer cleanSync0()
-
- server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
- client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
- cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
- `{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
- defer cleanSync1()
-
- tu.RunClient(t, client0Creds, runCreateAndPopulateSyncGroup)
- tu.RunClient(t, client1Creds, runJoinSyncGroupAndFetchData)
-}
-
-var runCreateAndPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+var runPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
- a := syncbase.NewService("sync0").App("a")
- a.Create(ctx, nil)
+ a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d")
- d.Create(ctx, nil)
- d.CreateTable(ctx, "tb", nil)
-
- mtName := env.Vars[ref.EnvNamespacePrefix]
-
- spec := wire.SyncGroupSpec{
- Description: "test syncgroup sg",
- Perms: perms("root/s0", "root/s0/s1"),
- Prefixes: []string{"tb:foo"},
- MountTables: []string{mtName},
- }
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
- sg := d.SyncGroup(sgName)
- info := wire.SyncGroupMemberInfo{8}
- if err := sg.Create(ctx, spec, info); err != nil {
- return fmt.Errorf("Create SG %q failed: %v", sgName, err)
- }
// Do Puts.
tb := d.Table("tb")
- for i := 0; i < 10; i++ {
+ start, _ := strconv.ParseUint(args[1], 10, 64)
+
+ for i := start; i < start+10; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
if err := r.Put(ctx, "testkey"+key); err != nil {
@@ -147,28 +173,45 @@
}
return nil
-}, "runCreateAndPopulateSyncGroup")
+}, "runPopulateSyncGroup")
-var runJoinSyncGroupAndFetchData = modules.Register(func(env *modules.Env, args ...string) error {
+var runUpdateData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
- a := syncbase.NewService("sync1").App("a")
- a.Create(ctx, nil)
+ a := syncbase.NewService(args[0]).App("a")
d := a.NoSQLDatabase("d")
- d.Create(ctx, nil)
- d.CreateTable(ctx, "tb", nil)
- sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
- sg := d.SyncGroup(sgName)
- info := wire.SyncGroupMemberInfo{10}
- if _, err := sg.Join(ctx, info); err != nil {
- return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+ // Do Puts.
+ tb := d.Table("tb")
+ start, _ := strconv.ParseUint(args[1], 10, 64)
+
+ for i := start; i < start+5; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ if err := r.Put(ctx, "testkey1"+key); err != nil {
+ return fmt.Errorf("r.Put() failed: %v", err)
+ }
}
+ return nil
+}, "runUpdateData")
+
+var runVerifySyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d")
+
// Wait for a bit (up to 4 sec) until the last key appears.
tb := d.Table("tb")
- r := tb.Row("foo9")
+
+ start, _ := strconv.ParseUint(args[1], 10, 64)
+ count, _ := strconv.ParseUint(args[2], 10, 64)
+ lastKey := fmt.Sprintf("foo%d", start+count-1)
+
+ r := tb.Row(lastKey)
for i := 0; i < 8; i++ {
time.Sleep(500 * time.Millisecond)
var value string
@@ -178,7 +221,7 @@
}
// Verify that all keys and values made it correctly.
- for i := 0; i < 10; i++ {
+ for i := start; i < start+count; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
var got string
@@ -207,9 +250,55 @@
return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
}
}
+
if err := stream.Err(); err != nil {
return fmt.Errorf("scan stream error: %v\n", err)
}
-
return nil
-}, "runJoinSyncGroupAndFetchData")
+}, "runVerifySyncGroupData")
+
+var runVerifyLocalAndRemoteData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d")
+ tb := d.Table("tb")
+
+ // Wait for a bit (up to 4 sec) until the last key appears.
+ r := tb.Row("foo19")
+ for i := 0; i < 8; i++ {
+ time.Sleep(500 * time.Millisecond)
+ var value string
+ if err := r.Get(ctx, &value); err == nil {
+ break
+ }
+ }
+
+ wantData := []struct {
+ start uint64
+ count uint64
+ valPfx string
+ }{
+ {0, 5, "testkey"},
+ {5, 5, "testkey1"},
+ {10, 10, "testkey"},
+ }
+
+ // Verify that all keys and values made it correctly.
+ for _, d := range wantData {
+ for i := d.start; i < d.start+d.count; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := d.valPfx + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+ }
+ }
+ }
+ return nil
+}, "runVerifyLocalAndRemoteData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index a14dbe0..29df45a 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -32,3 +32,7 @@
func TestV23SyncbasedGetDeltas(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
}
+
+func TestV23SyncbasedExchangeDeltas(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
+}
diff --git a/v23/syncbase/testutil/v23util.go b/v23/syncbase/testutil/v23util.go
index 9a12377..b345bc0 100644
--- a/v23/syncbase/testutil/v23util.go
+++ b/v23/syncbase/testutil/v23util.go
@@ -56,11 +56,11 @@
}
// RunClient runs modules.Program and waits until it terminates.
-func RunClient(t *v23tests.T, creds *modules.CustomCredentials, program modules.Program) {
+func RunClient(t *v23tests.T, creds *modules.CustomCredentials, program modules.Program, args ...string) {
client, err := t.Shell().StartWithOpts(
t.Shell().DefaultStartOpts().WithCustomCredentials(creds),
nil,
- program)
+ program, args...)
if err != nil {
V23Fatalf(t, "unable to start the client: %v", err)
}