Merge "syncbase/vsync: * Integration test for bi-directional syncing. * Made client/servers independent principals in integration tests."
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 849f352..100a38d 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"strconv"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -21,29 +22,91 @@
 	"v.io/x/ref/test/v23tests"
 )
 
-// TODO(rdaoud): change the credentials of servers s0 and s1 to independent
-// principals derived from the same root.
-
 //go:generate v23 test generate
 
+// V23TestSyncbasedJoinSyncGroup tests the creation and joining of a
+// SyncGroup. Client0 creates a SyncGroup at Syncbase0. Client1 requests to join
+// the SyncGroup at Syncbase1. Syncbase1 in turn requests Syncbase0 to join the
+// SyncGroup.
 func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
 	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
 	server0Creds, _ := t.Shell().NewChildCredentials("s0")
-	client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
 	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
-		`{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
 	defer cleanSync0()
 
-	server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
-	client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
 	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
-		`{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
 	defer cleanSync1()
 
 	tu.RunClient(t, client0Creds, runCreateSyncGroup)
 	tu.RunClient(t, client1Creds, runJoinSyncGroup)
 }
 
+// V23TestSyncbasedGetDeltas tests the sending of deltas between two Syncbase
+// instances and their clients.  The 1st client creates a SyncGroup and puts
+// some database entries in it.  The 2nd client joins that SyncGroup and reads
+// the database entries.  This verifies the end-to-end synchronization of data
+// along the path: client0--Syncbase0--Syncbase1--client1.
+func V23TestSyncbasedGetDeltas(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	tu.RunClient(t, client0Creds, runCreateSyncGroup)
+	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
+	tu.RunClient(t, client1Creds, runJoinSyncGroup)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+}
+
+// V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
+// Syncbase instances and their clients.  The 1st client creates a SyncGroup and
+// puts some database entries in it.  The 2nd client joins that SyncGroup and
+// reads the database entries.  The 2nd client then updates a subset of existing
+// keys and adds more keys and the 1st client verifies that it can read these
+// keys. This verifies the end-to-end bi-directional synchronization of data
+// along the path: client0--Syncbase0--Syncbase1--client1.
+func V23TestSyncbasedExchangeDeltas(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	tu.RunClient(t, client0Creds, runCreateSyncGroup)
+	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "0")
+
+	tu.RunClient(t, client1Creds, runJoinSyncGroup)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+
+	tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
+	tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "10")
+
+	tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
+}
+
+////////////////////////////////////
+// Helpers.
+
 var runCreateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -52,16 +115,17 @@
 	a.Create(ctx, nil)
 	d := a.NoSQLDatabase("d")
 	d.Create(ctx, nil)
+	d.CreateTable(ctx, "tb", nil)
 
 	mtName := env.Vars[ref.EnvNamespacePrefix]
-
 	spec := wire.SyncGroupSpec{
 		Description: "test syncgroup sg",
-		Perms:       perms("root/s0", "root/s0/s1"),
-		Prefixes:    []string{"t1:foo"},
+		Perms:       perms("root/s0", "root/s1"),
+		Prefixes:    []string{"tb:foo"},
 		MountTables: []string{mtName},
 	}
-	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
 	sg := d.SyncGroup(sgName)
 	info := wire.SyncGroupMemberInfo{8}
 	if err := sg.Create(ctx, spec, info); err != nil {
@@ -78,8 +142,9 @@
 	a.Create(ctx, nil)
 	d := a.NoSQLDatabase("d")
 	d.Create(ctx, nil)
+	d.CreateTable(ctx, "tb", nil)
 
-	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
 	sg := d.SyncGroup(sgName)
 	info := wire.SyncGroupMemberInfo{10}
 	if _, err := sg.Join(ctx, info); err != nil {
@@ -88,57 +153,18 @@
 	return nil
 }, "runJoinSyncGroup")
 
-// V23TestSyncbasedGetDeltas tests the exchange of deltas between two Syncbase
-// instances and their clients.  The 1st client creates a SyncGroup and puts
-// some database entries in it.  The 2nd client joins that SyncGroup and reads
-// the database entries.  This verifies the end-to-end synchronization of data
-// along the path: client1--Syncbase1--Syncbase2--client2.
-func V23TestSyncbasedGetDeltas(t *v23tests.T) {
-	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
-	server0Creds, _ := t.Shell().NewChildCredentials("s0")
-	client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
-	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
-		`{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
-	defer cleanSync0()
-
-	server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
-	client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
-	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
-		`{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
-	defer cleanSync1()
-
-	tu.RunClient(t, client0Creds, runCreateAndPopulateSyncGroup)
-	tu.RunClient(t, client1Creds, runJoinSyncGroupAndFetchData)
-}
-
-var runCreateAndPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+var runPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	a := syncbase.NewService("sync0").App("a")
-	a.Create(ctx, nil)
+	a := syncbase.NewService(args[0]).App("a")
 	d := a.NoSQLDatabase("d")
-	d.Create(ctx, nil)
-	d.CreateTable(ctx, "tb", nil)
-
-	mtName := env.Vars[ref.EnvNamespacePrefix]
-
-	spec := wire.SyncGroupSpec{
-		Description: "test syncgroup sg",
-		Perms:       perms("root/s0", "root/s0/s1"),
-		Prefixes:    []string{"tb:foo"},
-		MountTables: []string{mtName},
-	}
-	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
-	sg := d.SyncGroup(sgName)
-	info := wire.SyncGroupMemberInfo{8}
-	if err := sg.Create(ctx, spec, info); err != nil {
-		return fmt.Errorf("Create SG %q failed: %v", sgName, err)
-	}
 
 	// Do Puts.
 	tb := d.Table("tb")
-	for i := 0; i < 10; i++ {
+	start, _ := strconv.ParseUint(args[1], 10, 64)
+
+	for i := start; i < start+10; i++ {
 		key := fmt.Sprintf("foo%d", i)
 		r := tb.Row(key)
 		if err := r.Put(ctx, "testkey"+key); err != nil {
@@ -147,28 +173,45 @@
 	}
 
 	return nil
-}, "runCreateAndPopulateSyncGroup")
+}, "runPopulateSyncGroup")
 
-var runJoinSyncGroupAndFetchData = modules.Register(func(env *modules.Env, args ...string) error {
+var runUpdateData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	a := syncbase.NewService("sync1").App("a")
-	a.Create(ctx, nil)
+	a := syncbase.NewService(args[0]).App("a")
 	d := a.NoSQLDatabase("d")
-	d.Create(ctx, nil)
-	d.CreateTable(ctx, "tb", nil)
 
-	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
-	sg := d.SyncGroup(sgName)
-	info := wire.SyncGroupMemberInfo{10}
-	if _, err := sg.Join(ctx, info); err != nil {
-		return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+	// Do Puts.
+	tb := d.Table("tb")
+	start, _ := strconv.ParseUint(args[1], 10, 64)
+
+	for i := start; i < start+5; i++ {
+		key := fmt.Sprintf("foo%d", i)
+		r := tb.Row(key)
+		if err := r.Put(ctx, "testkey1"+key); err != nil {
+			return fmt.Errorf("r.Put() failed: %v", err)
+		}
 	}
 
+	return nil
+}, "runUpdateData")
+
+var runVerifySyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d")
+
 	// Wait for a bit (up to 4 sec) until the last key appears.
 	tb := d.Table("tb")
-	r := tb.Row("foo9")
+
+	start, _ := strconv.ParseUint(args[1], 10, 64)
+	count, _ := strconv.ParseUint(args[2], 10, 64)
+	lastKey := fmt.Sprintf("foo%d", start+count-1)
+
+	r := tb.Row(lastKey)
 	for i := 0; i < 8; i++ {
 		time.Sleep(500 * time.Millisecond)
 		var value string
@@ -178,7 +221,7 @@
 	}
 
 	// Verify that all keys and values made it correctly.
-	for i := 0; i < 10; i++ {
+	for i := start; i < start+count; i++ {
 		key := fmt.Sprintf("foo%d", i)
 		r := tb.Row(key)
 		var got string
@@ -207,9 +250,55 @@
 			return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
 		}
 	}
+
 	if err := stream.Err(); err != nil {
 		return fmt.Errorf("scan stream error: %v\n", err)
 	}
-
 	return nil
-}, "runJoinSyncGroupAndFetchData")
+}, "runVerifySyncGroupData")
+
+var runVerifyLocalAndRemoteData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d")
+	tb := d.Table("tb")
+
+	// Wait for a bit (up to 4 sec) until the last key appears.
+	r := tb.Row("foo19")
+	for i := 0; i < 8; i++ {
+		time.Sleep(500 * time.Millisecond)
+		var value string
+		if err := r.Get(ctx, &value); err == nil {
+			break
+		}
+	}
+
+	wantData := []struct {
+		start  uint64
+		count  uint64
+		valPfx string
+	}{
+		{0, 5, "testkey"},
+		{5, 5, "testkey1"},
+		{10, 10, "testkey"},
+	}
+
+	// Verify that all keys and values made it correctly.
+	for _, d := range wantData {
+		for i := d.start; i < d.start+d.count; i++ {
+			key := fmt.Sprintf("foo%d", i)
+			r := tb.Row(key)
+			var got string
+			if err := r.Get(ctx, &got); err != nil {
+				return fmt.Errorf("r.Get() failed: %v\n", err)
+			}
+			want := d.valPfx + key
+			if got != want {
+				return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+			}
+		}
+	}
+	return nil
+}, "runVerifyLocalAndRemoteData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index a14dbe0..29df45a 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -32,3 +32,7 @@
 func TestV23SyncbasedGetDeltas(t *testing.T) {
 	v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
 }
+
+func TestV23SyncbasedExchangeDeltas(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
+}
diff --git a/v23/syncbase/testutil/v23util.go b/v23/syncbase/testutil/v23util.go
index 9a12377..b345bc0 100644
--- a/v23/syncbase/testutil/v23util.go
+++ b/v23/syncbase/testutil/v23util.go
@@ -56,11 +56,11 @@
 }
 
 // RunClient runs modules.Program and waits until it terminates.
-func RunClient(t *v23tests.T, creds *modules.CustomCredentials, program modules.Program) {
+func RunClient(t *v23tests.T, creds *modules.CustomCredentials, program modules.Program, args ...string) {
 	client, err := t.Shell().StartWithOpts(
 		t.Shell().DefaultStartOpts().WithCustomCredentials(creds),
 		nil,
-		program)
+		program, args...)
 	if err != nil {
 		V23Fatalf(t, "unable to start the client: %v", err)
 	}