syncbase/vsync: more sync integration tests

* Pre-populate a DB before creating a SyncGroup.
* Sync across multiple apps/dbs/tables/prefixes.

Change-Id: I3122487966296b0feee7f336d68ea001b5465651
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 505805f..f127e0d 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -74,11 +74,11 @@
 
 	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
 
 	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
 	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
-	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
 }
 
 // V23TestSyncbasedExchangeDeltas tests the exchange of deltas between two
@@ -106,14 +106,14 @@
 
 	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
 
 	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
 	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
-	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
 
 	tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
-	tu.RunClient(t, client1Creds, runPopulateSyncGroup, "sync1", "foo", "10")
+	tu.RunClient(t, client1Creds, runPopulateData, "sync1", "foo", "10")
 
 	tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
 }
@@ -145,12 +145,12 @@
 	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s1")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
 
 	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
 	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
-	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
 	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg2Name)
 	tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
 }
@@ -191,12 +191,12 @@
 	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg1Name, "tb:foo", "root/s0", "root/s1", "root/s2")
 	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sg2Name, "tb:f", "root/s0", "root/s2")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "f", "0")
-	tu.RunClient(t, client0Creds, runPopulateSyncGroup, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "f", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
 
 	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
 	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sg1Name)
-	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "0", "10")
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
 	tu.RunClient(t, client1Creds, runCreateSyncGroup, "sync1", sg3Name, "tb:f", "root/s1", "root/s2")
 
 	tu.RunClient(t, client2Creds, runSetupAppA, "sync2")
@@ -207,6 +207,70 @@
 	tu.RunClient(t, client1Creds, runVerifyNestedSyncGroupData, "sync1")
 }
 
+// V23TestSyncbasedGetDeltasPrePopulate tests the sending of deltas between two
+// Syncbase instances and their clients with data existing before the creation
+// of a SyncGroup.  The 1st client puts entries in a database then creates a
+// SyncGroup over that data.  The 2nd client joins that SyncGroup and reads the
+// database entries.
+func V23TestSyncbasedGetDeltasPrePopulate(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+	// Populate table data before creating the SyncGroup.  Also populate
+	// with data that is not part of the SyncGroup to verify filtering.
+	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "bar", "0")
+	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+
+	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+	tu.RunClient(t, client1Creds, runVerifyNonSyncGroupData, "sync1", "bar")
+}
+
+// V23TestSyncbasedGetDeltasMultiApp tests the sending of deltas between two
+// Syncbase instances and their clients across multiple apps, databases, and
+// tables.  The 1st client puts entries in multiple tables across multiple
+// app databases then creates multiple SyncGroups (one per database) over that
+// data.  The 2nd client joins these SyncGroups and reads all the data.
+func V23TestSyncbasedGetDeltasMultiApp(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	sgNamePrefix := naming.Join("sync0", constants.SyncbaseSuffix)
+	na, nd, nt := "2", "2", "2" // number of apps, dbs, tables
+
+	tu.RunClient(t, client0Creds, runSetupAppMulti, "sync0", na, nd, nt)
+	tu.RunClient(t, client0Creds, runPopulateSyncGroupMulti, "sync0", sgNamePrefix, na, nd, nt, "foo", "bar")
+
+	tu.RunClient(t, client1Creds, runSetupAppMulti, "sync1", na, nd, nt)
+	tu.RunClient(t, client1Creds, runJoinSyncGroupMulti, "sync1", sgNamePrefix, na, nd)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupDataMulti, "sync1", na, nd, nt, "foo", "bar")
+}
+
 ////////////////////////////////////
 // Helpers.
 
@@ -241,7 +305,7 @@
 	sg := d.SyncGroup(args[1])
 	info := wire.SyncGroupMemberInfo{8}
 	if err := sg.Create(ctx, spec, info); err != nil {
-		return fmt.Errorf("Create SG %q failed: %v", args[1], err)
+		return fmt.Errorf("Create SG %q failed: %v\n", args[1], err)
 	}
 	return nil
 }, "runCreateSyncGroup")
@@ -256,12 +320,12 @@
 	sg := d.SyncGroup(args[1])
 	info := wire.SyncGroupMemberInfo{10}
 	if _, err := sg.Join(ctx, info); err != nil {
-		return fmt.Errorf("Join SG %q failed: %v", args[1], err)
+		return fmt.Errorf("Join SG %q failed: %v\n", args[1], err)
 	}
 	return nil
 }, "runJoinSyncGroup")
 
-var runPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+var runPopulateData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
@@ -276,11 +340,11 @@
 		key := fmt.Sprintf("%s%d", args[1], i)
 		r := tb.Row(key)
 		if err := r.Put(ctx, "testkey"+key); err != nil {
-			return fmt.Errorf("r.Put() failed: %v", err)
+			return fmt.Errorf("r.Put() failed: %v\n", err)
 		}
 	}
 	return nil
-}, "runPopulateSyncGroup")
+}, "runPopulateData")
 
 var runUpdateData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
@@ -297,7 +361,7 @@
 		key := fmt.Sprintf("foo%d", i)
 		r := tb.Row(key)
 		if err := r.Put(ctx, "testkey1"+key); err != nil {
-			return fmt.Errorf("r.Put() failed: %v", err)
+			return fmt.Errorf("r.Put() failed: %v\n", err)
 		}
 	}
 
@@ -314,9 +378,9 @@
 	// Wait for a bit (up to 4 sec) until the last key appears.
 	tb := d.Table("tb")
 
-	start, _ := strconv.ParseUint(args[1], 10, 64)
-	count, _ := strconv.ParseUint(args[2], 10, 64)
-	lastKey := fmt.Sprintf("foo%d", start+count-1)
+	start, _ := strconv.ParseUint(args[2], 10, 64)
+	count, _ := strconv.ParseUint(args[3], 10, 64)
+	lastKey := fmt.Sprintf("%s%d", args[1], start+count-1)
 
 	r := tb.Row(lastKey)
 	for i := 0; i < 8; i++ {
@@ -329,7 +393,7 @@
 
 	// Verify that all keys and values made it correctly.
 	for i := start; i < start+count; i++ {
-		key := fmt.Sprintf("foo%d", i)
+		key := fmt.Sprintf("%s%d", args[1], i)
 		r := tb.Row(key)
 		var got string
 		if err := r.Get(ctx, &got); err != nil {
@@ -342,9 +406,9 @@
 	}
 
 	// Re-verify using a scan operation.
-	stream := tb.Scan(ctx, nosql.Prefix("foo"))
+	stream := tb.Scan(ctx, nosql.Prefix(args[1]))
 	for i := 0; stream.Advance(); i++ {
-		want := fmt.Sprintf("foo%d", i)
+		want := fmt.Sprintf("%s%d", args[1], i)
 		got := stream.Key()
 		if got != want {
 			return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
@@ -364,6 +428,31 @@
 	return nil
 }, "runVerifySyncGroupData")
 
+var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d")
+	tb := d.Table("tb")
+
+	// Verify through a scan that none of that data exists.
+	count := 0
+	stream := tb.Scan(ctx, nosql.Prefix(args[1]))
+	for stream.Advance() {
+		count++
+	}
+
+	if err := stream.Err(); err != nil {
+		return fmt.Errorf("scan stream error: %v\n", err)
+	}
+	if count > 0 {
+		return fmt.Errorf("found %d entries in %s prefix that should not be there\n", count, args[1])
+	}
+
+	return nil
+}, "runVerifyNonSyncGroupData")
+
 var runVerifyLocalAndRemoteData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -447,3 +536,172 @@
 	}
 	return nil
 }, "runVerifyNestedSyncGroupData")
+
+var runSetupAppMulti = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	svc := syncbase.NewService(args[0])
+	numApps, _ := strconv.Atoi(args[1])
+	numDbs, _ := strconv.Atoi(args[2])
+	numTbs, _ := strconv.Atoi(args[3])
+
+	for i := 0; i < numApps; i++ {
+		appName := fmt.Sprintf("a%d", i)
+		a := svc.App(appName)
+		a.Create(ctx, nil)
+
+		for j := 0; j < numDbs; j++ {
+			dbName := fmt.Sprintf("d%d", j)
+			d := a.NoSQLDatabase(dbName)
+			d.Create(ctx, nil)
+
+			for k := 0; k < numTbs; k++ {
+				tbName := fmt.Sprintf("tb%d", k)
+				d.CreateTable(ctx, tbName, nil)
+			}
+		}
+	}
+
+	return nil
+}, "runSetupAppMulti")
+
+var runPopulateSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	mtName := env.Vars[ref.EnvNamespacePrefix]
+
+	svc := syncbase.NewService(args[0])
+	sgNamePrefix := args[1]
+	numApps, _ := strconv.Atoi(args[2])
+	numDbs, _ := strconv.Atoi(args[3])
+	numTbs, _ := strconv.Atoi(args[4])
+	prefixes := args[5:]
+
+	// For each app...
+	for i := 0; i < numApps; i++ {
+		appName := fmt.Sprintf("a%d", i)
+		a := svc.App(appName)
+
+		// For each database...
+		for j := 0; j < numDbs; j++ {
+			dbName := fmt.Sprintf("d%d", j)
+			d := a.NoSQLDatabase(dbName)
+
+			// For each table, pre-populate entries on each prefix.
+			// Also determine the SyncGroup prefixes.
+			var sgPrefixes []string
+			for k := 0; k < numTbs; k++ {
+				tbName := fmt.Sprintf("tb%d", k)
+				tb := d.Table(tbName)
+
+				for _, pfx := range prefixes {
+					p := fmt.Sprintf("%s:%s", tbName, pfx)
+					sgPrefixes = append(sgPrefixes, p)
+
+					for n := 0; n < 10; n++ {
+						key := fmt.Sprintf("%s%d", pfx, n)
+						r := tb.Row(key)
+						if err := r.Put(ctx, "testkey"+key); err != nil {
+							return fmt.Errorf("r.Put() failed: %v\n", err)
+						}
+					}
+				}
+			}
+
+			// Create one SyncGroup per database across all tables
+			// and prefixes.
+			sgName := naming.Join(sgNamePrefix, appName, dbName)
+			spec := wire.SyncGroupSpec{
+				Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
+				Perms:       perms("root/s0", "root/s1"),
+				Prefixes:    sgPrefixes,
+				MountTables: []string{mtName},
+			}
+
+			sg := d.SyncGroup(sgName)
+			info := wire.SyncGroupMemberInfo{8}
+			if err := sg.Create(ctx, spec, info); err != nil {
+				return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
+			}
+		}
+	}
+
+	return nil
+}, "runPopulateSyncGroupMulti")
+
+var runJoinSyncGroupMulti = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	svc := syncbase.NewService(args[0])
+	sgNamePrefix := args[1]
+	numApps, _ := strconv.Atoi(args[2])
+	numDbs, _ := strconv.Atoi(args[3])
+
+	for i := 0; i < numApps; i++ {
+		appName := fmt.Sprintf("a%d", i)
+		a := svc.App(appName)
+
+		for j := 0; j < numDbs; j++ {
+			dbName := fmt.Sprintf("d%d", j)
+			d := a.NoSQLDatabase(dbName)
+
+			sgName := naming.Join(sgNamePrefix, appName, dbName)
+			sg := d.SyncGroup(sgName)
+			info := wire.SyncGroupMemberInfo{10}
+			if _, err := sg.Join(ctx, info); err != nil {
+				return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+			}
+		}
+	}
+
+	return nil
+}, "runJoinSyncGroupMulti")
+
+var runVerifySyncGroupDataMulti = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	svc := syncbase.NewService(args[0])
+	numApps, _ := strconv.Atoi(args[1])
+	numDbs, _ := strconv.Atoi(args[2])
+	numTbs, _ := strconv.Atoi(args[3])
+	prefixes := args[4:]
+
+	time.Sleep(10 * time.Second)
+
+	for i := 0; i < numApps; i++ {
+		appName := fmt.Sprintf("a%d", i)
+		a := svc.App(appName)
+
+		for j := 0; j < numDbs; j++ {
+			dbName := fmt.Sprintf("d%d", j)
+			d := a.NoSQLDatabase(dbName)
+
+			for k := 0; k < numTbs; k++ {
+				tbName := fmt.Sprintf("tb%d", k)
+				tb := d.Table(tbName)
+
+				for _, pfx := range prefixes {
+					for n := 0; n < 10; n++ {
+						key := fmt.Sprintf("%s%d", pfx, n)
+						r := tb.Row(key)
+						var got string
+						if err := r.Get(ctx, &got); err != nil {
+							return fmt.Errorf("r.Get() failed: %v\n", err)
+						}
+						want := "testkey" + key
+						if got != want {
+							return fmt.Errorf("unexpected value: got %q, want %q\n",
+								got, want)
+						}
+					}
+				}
+			}
+		}
+	}
+
+	return nil
+}, "runVerifySyncGroupDataMulti")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index c90e196..3c8bc00 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -44,3 +44,11 @@
 func TestV23NestedAndPeerSyncGroups(t *testing.T) {
 	v23tests.RunTest(t, V23TestNestedAndPeerSyncGroups)
 }
+
+func TestV23SyncbasedGetDeltasPrePopulate(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedGetDeltasPrePopulate)
+}
+
+func TestV23SyncbasedGetDeltasMultiApp(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedGetDeltasMultiApp)
+}