syncbase: Change Prefixes to Collections in the SyncgroupSpec.
1. Fix all the uses of Prefixes.
2. Clean up feature tests.
3. Move sync feature tests into featuretests dir.
MultiPart: 1/4
Change-Id: Ia94a0fa59131e3f63f7cd745b7737c81eed9d413
diff --git a/services/syncbase/.api b/services/syncbase/.api
index 0af96a4..a8b1487 100644
--- a/services/syncbase/.api
+++ b/services/syncbase/.api
@@ -682,11 +682,11 @@
pkg syncbase, type SyncgroupMemberInfo struct, BlobDevType byte
pkg syncbase, type SyncgroupMemberInfo struct, SyncPriority byte
pkg syncbase, type SyncgroupSpec struct
+pkg syncbase, type SyncgroupSpec struct, Collections []Id
pkg syncbase, type SyncgroupSpec struct, Description string
pkg syncbase, type SyncgroupSpec struct, IsPrivate bool
pkg syncbase, type SyncgroupSpec struct, MountTables []string
pkg syncbase, type SyncgroupSpec struct, Perms access.Permissions
-pkg syncbase, type SyncgroupSpec struct, Prefixes []CollectionRow
pkg syncbase, type SyncgroupSpec struct, PublishSyncbaseName string
pkg syncbase, type Value struct
pkg syncbase, type Value struct, Bytes *vom.RawBytes
diff --git a/services/syncbase/syncbase.vdl.go b/services/syncbase/syncbase.vdl.go
index a89023c..01167c9 100644
--- a/services/syncbase/syncbase.vdl.go
+++ b/services/syncbase/syncbase.vdl.go
@@ -646,8 +646,8 @@
PublishSyncbaseName string
// Permissions governing access to this syncgroup.
Perms access.Permissions
- // Data (collectionId-rowPrefix pairs) covered by this syncgroup.
- Prefixes []CollectionRow
+ // Data (set of collectionIds) covered by this syncgroup.
+ Collections []Id
// Mount tables at which to advertise this syncgroup, for rendezvous purposes.
// (Note that in addition to these mount tables, Syncbase also uses
// network-neighborhood-based discovery for rendezvous.)
@@ -678,7 +678,7 @@
if len(x.Perms) != 0 {
return false
}
- if len(x.Prefixes) != 0 {
+ if len(x.Collections) != 0 {
return false
}
if len(x.MountTables) != 0 {
@@ -730,11 +730,11 @@
return err
}
}
- if len(x.Prefixes) != 0 {
- if err := enc.NextField("Prefixes"); err != nil {
+ if len(x.Collections) != 0 {
+ if err := enc.NextField("Collections"); err != nil {
return err
}
- if err := __VDLWriteAnon_list_1(enc, x.Prefixes); err != nil {
+ if err := __VDLWriteAnon_list_1(enc, x.Collections); err != nil {
return err
}
}
@@ -766,7 +766,7 @@
return enc.FinishValue()
}
-func __VDLWriteAnon_list_1(enc vdl.Encoder, x []CollectionRow) error {
+func __VDLWriteAnon_list_1(enc vdl.Encoder, x []Id) error {
if err := enc.StartValue(__VDLType_list_11); err != nil {
return err
}
@@ -853,8 +853,8 @@
if err := x.Perms.VDLRead(dec); err != nil {
return err
}
- case "Prefixes":
- if err := __VDLReadAnon_list_1(dec, &x.Prefixes); err != nil {
+ case "Collections":
+ if err := __VDLReadAnon_list_1(dec, &x.Collections); err != nil {
return err
}
case "MountTables":
@@ -880,13 +880,13 @@
}
}
-func __VDLReadAnon_list_1(dec vdl.Decoder, x *[]CollectionRow) error {
+func __VDLReadAnon_list_1(dec vdl.Decoder, x *[]Id) error {
if err := dec.StartValue(__VDLType_list_11); err != nil {
return err
}
switch len := dec.LenHint(); {
case len > 0:
- *x = make([]CollectionRow, 0, len)
+ *x = make([]Id, 0, len)
default:
*x = nil
}
@@ -897,7 +897,7 @@
case done:
return dec.FinishValue()
}
- var elem CollectionRow
+ var elem Id
if err := elem.VDLRead(dec); err != nil {
return err
}
@@ -6756,7 +6756,7 @@
__VDLType_struct_8 = vdl.TypeOf((*CollectionRow)(nil)).Elem()
__VDLType_struct_9 = vdl.TypeOf((*SyncgroupSpec)(nil)).Elem()
__VDLType_map_10 = vdl.TypeOf((*access.Permissions)(nil))
- __VDLType_list_11 = vdl.TypeOf((*[]CollectionRow)(nil))
+ __VDLType_list_11 = vdl.TypeOf((*[]Id)(nil))
__VDLType_list_12 = vdl.TypeOf((*[]string)(nil))
__VDLType_struct_13 = vdl.TypeOf((*SyncgroupMemberInfo)(nil)).Elem()
__VDLType_enum_14 = vdl.TypeOf((*ResolverType)(nil))
diff --git a/services/syncbase/types.vdl b/services/syncbase/types.vdl
index fcab0bc..092ef8e 100644
--- a/services/syncbase/types.vdl
+++ b/services/syncbase/types.vdl
@@ -81,8 +81,8 @@
// Permissions governing access to this syncgroup.
Perms access.Permissions
- // Data (collectionId-rowPrefix pairs) covered by this syncgroup.
- Prefixes []CollectionRow
+ // Data (set of collectionIds) covered by this syncgroup.
+ Collections []Id
// Mount tables at which to advertise this syncgroup, for rendezvous purposes.
// (Note that in addition to these mount tables, Syncbase also uses
diff --git a/syncbase/featuretests/blob_v23_test.go b/syncbase/featuretests/blob_v23_test.go
index f02cd02..fbd5ee8 100644
--- a/syncbase/featuretests/blob_v23_test.go
+++ b/syncbase/featuretests/blob_v23_test.go
@@ -28,26 +28,27 @@
sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
- ok(t, populateData(sbs[0].clientCtx, "s0", "foo", 0, 10))
- ok(t, createSyncgroup(sbs[0].clientCtx, "s0", sgId, "c:foo", "", sbBlessings(sbs), nil))
- ok(t, joinSyncgroup(sbs[1].clientCtx, "s1", "s0", sgId))
- ok(t, verifySyncgroupData(sbs[1].clientCtx, "s1", "foo", 0, 10))
+ ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
// FetchBlob first.
- ok(t, generateBlob(sbs[0].clientCtx, "s0", "foo", 0, []byte("foobarbaz")))
- ok(t, fetchBlob(sbs[1].clientCtx, "s1", "foo", 0, 9, false))
- ok(t, getBlob(sbs[1].clientCtx, "s1", "foo", 0, []byte("foobarbaz"), 0))
+ ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
+ ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, false))
+ ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
// GetBlob directly.
- ok(t, generateBlob(sbs[1].clientCtx, "s1", "foo", 0, []byte("abcdefghijklmn")))
+ ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("abcdefghijklmn")))
// Sleep so that the update to key "foo0" makes it to the other side.
time.Sleep(10 * time.Second)
- ok(t, getBlob(sbs[0].clientCtx, "s0", "foo", 0, []byte("fghijklmn"), 5))
- ok(t, fetchBlob(sbs[0].clientCtx, "s0", "foo", 0, 14, true))
+ ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("fghijklmn"), 5))
+ ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 14, true))
// Test with a big blob (1 MB).
- ok(t, generateBigBlob(sbs[0].clientCtx, "s0", "foo", 1))
- ok(t, getBigBlob(sbs[1].clientCtx, "s1", "foo", 1))
+ ok(t, generateBigBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1))
+ ok(t, getBigBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1))
}
////////////////////////////////////
diff --git a/syncbase/featuretests/client_v23_test.go b/syncbase/featuretests/client_v23_test.go
index 9e4c52c..323cc97 100644
--- a/syncbase/featuretests/client_v23_test.go
+++ b/syncbase/featuretests/client_v23_test.go
@@ -14,7 +14,7 @@
"v.io/x/ref/test/v23test"
)
-func TestV23SyncbasedPutGet(t *testing.T) {
+func TestV23ClientPutGet(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index 9744c8c..29757c8 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -84,8 +84,8 @@
// Add new seperate keys to each syncbase so that we can verify if sync
// has happened between the two syncbases by waiting on the other's key.
- ok(t, populateData(client0Ctx, "s0", "foo", 22, 23))
- ok(t, populateData(client1Ctx, "s1", "foo", 44, 45))
+ ok(t, populateData(client0Ctx, "s0", testCx.Name, "foo", 22, 23))
+ ok(t, populateData(client1Ctx, "s1", testCx.Name, "foo", 44, 45))
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
@@ -180,9 +180,9 @@
// Make sure that the sync has completed by injecting a row on s0 and
// reading it on s1.
- ok(t, populateData(client0Ctx, "s0", "foo", 200, 201))
+ ok(t, populateData(client0Ctx, "s0", testCx.Name, "foo", 200, 201))
ok(t, waitForValue(client1Ctx, "s1", "foo200", "testkey", ""))
- ok(t, populateData(client1Ctx, "s1", "foo", 400, 401))
+ ok(t, populateData(client1Ctx, "s1", testCx.Name, "foo", 400, 401))
ok(t, waitForValue(client0Ctx, "s0", "foo400", "testkey", ""))
ok(t, verifyConflictResolvedBatch(client0Ctx, "s0", "foo", 0, 100, "concurrentBatchUpdate"))
@@ -325,14 +325,14 @@
sgId = wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
// Create syncgroup and populate data on s0.
- ok(t, createSyncgroup(sbs[0].clientCtx, "s0", sgId, "c:foo", "", sbBlessings(sbs), nil))
- ok(t, populateData(sbs[0].clientCtx, "s0", "foo", 0, numInitRows))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, numInitRows))
// Join syncgroup and verify data on s1.
- ok(t, joinSyncgroup(sbs[1].clientCtx, "s1", "s0", sgId))
- ok(t, verifySyncgroupData(sbs[1].clientCtx, "s1", "foo", 0, numInitRows))
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, numInitRows))
- return sbs[0].clientCtx, sbs[1].clientCtx, "s0", sgId
+ return sbs[0].clientCtx, sbs[1].clientCtx, sbs[0].sbName, sgId
}
// TODO(sadovsky): This pattern is not ideal in that it makes the test code
diff --git a/syncbase/featuretests/ping_pong_test.go b/syncbase/featuretests/ping_pong_test.go
index d28f0a6..c30e2d6 100644
--- a/syncbase/featuretests/ping_pong_test.go
+++ b/syncbase/featuretests/ping_pong_test.go
@@ -59,13 +59,7 @@
// Syncbase s0 is the creator.
sgId := wire.Id{Name: fmt.Sprintf("SG%d", g+1), Blessing: sbBlessings(sbs)}
- // TODO(alexfandrianto): Was unable to use the empty prefix ("c:").
- // Observation: w0's watch isn't working with the empty prefix.
- // Possible explanation: The empty prefix ACL receives an initial value
- // from the Collection ACL. If this value is synced over from the opposing
- // peer, conflict resolution can mean that s0 loses the ability to watch.
- syncString := fmt.Sprintf("%s:p", testCx.Name)
- ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, syncString, "", sbBlessings(sbs), nil))
+ ok(b, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", sbBlessings(sbs), nil, clBlessings(sbs)))
// The other syncbases will attempt to join the syncgroup.
for i := 1; i < *numSync; i++ {
diff --git a/syncbase/featuretests/sync_v23_test.go b/syncbase/featuretests/sync_v23_test.go
new file mode 100644
index 0000000..e87b758
--- /dev/null
+++ b/syncbase/featuretests/sync_v23_test.go
@@ -0,0 +1,677 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package featuretests_test
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "reflect"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/security/access"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/services/watch"
+ "v.io/v23/syncbase"
+ "v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/roaming"
+ "v.io/x/ref/services/syncbase/syncbaselib"
+ tu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test/v23test"
+)
+
+// NOTE(sadovsky): These tests take a very long time to run - nearly 4 minutes
+// on my Macbook Pro! Various instances of time.Sleep() below likely contribute
+// to the problem.
+
+// TestV23VSyncCompEval is a comprehensive sniff test for core sync
+// functionality. It tests the exchange of data and syncgroup deltas between two
+// Syncbase instances and their clients. The 1st client creates a syncgroup and
+// puts some database entries in it. The 2nd client joins that syncgroup and
+// reads the database entries. Syncbases are restarted. The 1st client then
+// updates syncgroup metadata and the 2nd client obtains the metadata. The 2nd
+// client then updates a subset of existing keys, adds more keys, and updates
+// syncgroup metadata and the 1st client verifies that it can read these
+// updates. This verifies the end-to-end bi-directional synchronization of data
+// and syncgroup metadata along the path:
+// client0--Syncbase0--Syncbase1--client1. After this phase is done, both
+// Syncbase instances are shutdown and restarted, and new data is synced once
+// again.
+func TestV23VSyncCompEval(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 2 Syncbases.
+ sbs := setupSyncbases(t, sh, 2, false)
+
+ sbName := sbs[0].sbName
+ sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
+
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
+
+ // This is a decoy syncgroup that no other Syncbase joins, but is on the
+ // same database as the first syncgroup. Populating it after the first
+ // syncgroup causes the database generations to go up, but the joiners
+ // on the first syncgroup do not get any data belonging to this
+ // syncgroup. This triggers the handling of filtered log records in the
+ // restartability code.
+ sgId1 := wire.Id{Name: "SG2", Blessing: sbBlessings(sbs)}
+
+ // Verify data syncing (client0 updates).
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId1, "c1", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c1", "foo", 0, 10))
+
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbName, sgId))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
+
+ // Shutdown and restart Syncbase instances.
+ sbs[0].cleanup(os.Interrupt)
+ sbs[1].cleanup(os.Interrupt)
+
+ acl := func(clientId string) string {
+ return fmt.Sprintf(`{"Resolve":{"In":["root:%s"]},"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId, clientId)
+ }
+
+ sbs[0].cleanup = sh.StartSyncbase(sbs[0].sbCreds, syncbaselib.Opts{Name: sbs[0].sbName, RootDir: sbs[0].rootDir}, acl(sbs[0].clientId))
+ sbs[1].cleanup = sh.StartSyncbase(sbs[1].sbCreds, syncbaselib.Opts{Name: sbs[1].sbName, RootDir: sbs[1].rootDir}, acl(sbs[1].clientId))
+
+ // Verify syncgroup syncing (client0 updates).
+ ok(t, setSyncgroupSpec(sbs[0].clientCtx, sbs[0].sbName, sgId, "v2", "c", "", "root:s0;root:s1;root:s3", nil))
+ ok(t, verifySyncgroupSpec(sbs[1].clientCtx, sbs[1].sbName, sgId, "v2", "c", "root:s0;root:s1;root:s3"))
+
+ // Verify data and syncgroup syncing (client1 updates).
+ ok(t, updateData(sbs[1].clientCtx, sbs[1].sbName, 5, 10, ""))
+ ok(t, populateData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", 10, 20))
+ ok(t, setSyncgroupSpec(sbs[1].clientCtx, sbs[1].sbName, sgId, "v3", "c", "", "root:s0;root:s1;root:s4", nil))
+
+ // Verify that the last updates are synced right away so that we are
+ // assured that all the updates are synced.
+ ok(t, verifySyncgroupData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", "", 10, 10))
+ ok(t, verifySyncgroupData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", "testkey"+sbs[1].sbName, 5, 5))
+ ok(t, verifySyncgroupData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", "", 0, 5))
+ ok(t, verifySyncgroupSpec(sbs[0].clientCtx, sbs[0].sbName, sgId, "v3", "c", "root:s0;root:s1;root:s4"))
+
+ // Shutdown and restart Syncbase instances.
+ sbs[0].cleanup(os.Interrupt)
+ sbs[1].cleanup(os.Interrupt)
+
+ sbs[0].cleanup = sh.StartSyncbase(sbs[0].sbCreds, syncbaselib.Opts{Name: sbs[0].sbName, RootDir: sbs[0].rootDir}, acl(sbs[0].clientId))
+ sbs[1].cleanup = sh.StartSyncbase(sbs[1].sbCreds, syncbaselib.Opts{Name: sbs[1].sbName, RootDir: sbs[1].rootDir}, acl(sbs[1].clientId))
+
+ ok(t, verifySyncgroupSpec(sbs[0].clientCtx, sbs[0].sbName, sgId, "v3", "c", "root:s0;root:s1;root:s4"))
+ ok(t, verifySyncgroupSpec(sbs[1].clientCtx, sbs[1].sbName, sgId, "v3", "c", "root:s0;root:s1;root:s4"))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 20, 30))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 10, 20))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "testkey"+sbs[1].sbName, 5, 5))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 5))
+}
+
+// TestV23VSyncWithPutDelWatch tests the sending of deltas between two Syncbase
+// instances and their clients. The 1st client creates a syncgroup and puts some
+// database entries in it. The 2nd client joins that syncgroup and reads and
+// watches the database entries. The 1st client then deletes a portion of this
+// data, and adds new entries. The 2nd client verifies that these changes are
+// correctly synced. This verifies the end-to-end synchronization of data along
+// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
+// deletes.
+func TestV23VSyncWithPutDelWatch(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 2 Syncbases.
+ sbs := setupSyncbases(t, sh, 2, false)
+
+ sbName := sbs[0].sbName
+ sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
+
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c1,c2", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c1", "foo", 0, 10))
+
+ beforeSyncMarker, err := getResumeMarker(sbs[1].clientCtx, sbs[1].sbName)
+ ok(t, err)
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbName, sgId))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c1", "foo", "", 0, 10))
+ ok(t, verifySyncgroupDataWithWatch(sbs[1].clientCtx, sbs[1].sbName, "c1", "foo", "", 10, false, beforeSyncMarker))
+
+ beforeDeleteMarker, err := getResumeMarker(sbs[1].clientCtx, sbs[1].sbName)
+ ok(t, err)
+ ok(t, deleteData(sbs[0].clientCtx, sbs[0].sbName, "c1", "foo", 0, 5))
+ ok(t, verifySyncgroupDeletedData(sbs[0].clientCtx, sbs[0].sbName, "c1", "foo", "", 5, 5))
+ ok(t, verifySyncgroupDeletedData(sbs[1].clientCtx, sbs[1].sbName, "c1", "foo", "", 5, 5))
+ ok(t, verifySyncgroupDataWithWatch(sbs[1].clientCtx, sbs[1].sbName, "c1", "foo", "", 5, true, beforeDeleteMarker))
+
+ beforeSyncMarker, err = getResumeMarker(sbs[1].clientCtx, sbs[1].sbName)
+ ok(t, err)
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c2", "foo", 0, 10))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c2", "foo", "", 0, 10))
+ ok(t, verifySyncgroupDataWithWatch(sbs[1].clientCtx, sbs[1].sbName, "c2", "foo", "", 10, false, beforeSyncMarker))
+}
+
+// TestV23VSyncWithAcls tests the exchange of deltas including acls between two
+// Syncbase instances and their clients. The 1st client creates a syncgroup on a
+// collection and puts some database entries in it. The 2nd client joins that
+// syncgroup and reads the database entries. The 2nd client then changes the
+// collection acl to allow access to only itself. The 1st client should be
+// unable to access the keys. The 2nd client then modifies the collection acl to
+// restore access to both clients. The 1st client should regain access.
+func TestV23VSyncWithAcls(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 2 Syncbases.
+ sbs := setupSyncbases(t, sh, 2, false)
+
+ sbName := sbs[0].sbName
+ sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
+
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c", "foo", 0, 10))
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbName, sgId))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c", "foo", "", 0, 10))
+
+ ok(t, setCollectionPermissions(sbs[1].clientCtx, sbs[1].sbName, "root:c1"))
+ ok(t, verifyLostAccess(sbs[0].clientCtx, sbs[0].sbName, "c", "foo", 0, 10))
+
+ ok(t, setCollectionPermissions(sbs[1].clientCtx, sbs[1].sbName, "root:c0;root:c1"))
+ ok(t, verifySyncgroupData(sbs[0].clientCtx, sbs[0].sbName, "c", "foo", "", 0, 10))
+}
+
+// TestV23VSyncWithPeerSyncgroups tests the exchange of deltas between three
+// Syncbase instances and their clients consisting of peer syncgroups. The 1st
+// client creates a syncgroup: SG1 at collection "c" and puts some database
+// entries. The 2nd client joins the syncgroup SG1 and verifies that it reads
+// the corresponding database entries. Client 2 then creates SG2 at the same
+// collection. The 3rd client joins the syncgroup SG2 and verifies that it can
+// read all the keys created by client 1. Client 2 also verifies that it can
+// read all the keys created by client 1.
+func TestV23VSyncWithPeerSyncgroups(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 3 Syncbases.
+ sbs := setupSyncbases(t, sh, 3, false)
+
+ sb1Name := sbs[0].sbName
+ sg1Id := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
+ sb2Name := sbs[1].sbName
+ sg2Id := wire.Id{Name: "SG2", Blessing: sbBlessings(sbs)}
+
+ // Pre-populate the data before creating the syncgroup.
+ ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, "c"))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c", "f", 0, 10))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sg1Id, "c", "", "root:s0;root:s1", nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c", "foo", 0, 10))
+
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sb1Name, sg1Id))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c", "foo", "", 0, 10))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c", "f", "", 0, 10))
+
+ // We are setting the collection ACL again at syncbase2 but it is not
+ // required.
+ ok(t, createSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sg2Id, "c", "", "root:s1;root:s2", nil, clBlessings(sbs)))
+
+ ok(t, joinSyncgroup(sbs[2].clientCtx, sbs[2].sbName, sb2Name, sg2Id))
+ ok(t, verifySyncgroupData(sbs[2].clientCtx, sbs[2].sbName, "c", "foo", "", 0, 10))
+ ok(t, verifySyncgroupData(sbs[2].clientCtx, sbs[2].sbName, "c", "f", "", 0, 10))
+}
+
+// TestV23VSyncSyncgroups tests the syncing of syncgroup metadata. The 1st
+// client creates the syncgroup SG1, and clients 2 and 3 join this
+// syncgroup. All three clients must learn of the remaining two. Note that
+// client 2 relies on syncgroup metadata syncing to learn of client 3 .
+func TestV23VSyncSyncgroups(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 3 Syncbases.
+ sbs := setupSyncbases(t, sh, 3, false)
+
+ sbName := sbs[0].sbName
+ sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
+
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c", "", sbBlessings(sbs), nil, clBlessings(sbs)))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, "c", "foo", 0, 10))
+ ok(t, verifySyncgroupMembers(sbs[0].clientCtx, sbs[0].sbName, sgId, 1))
+
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbName, sgId))
+ ok(t, verifySyncgroupMembers(sbs[0].clientCtx, sbs[0].sbName, sgId, 2))
+ ok(t, verifySyncgroupMembers(sbs[1].clientCtx, sbs[1].sbName, sgId, 2))
+
+ ok(t, joinSyncgroup(sbs[2].clientCtx, sbs[2].sbName, sbName, sgId))
+ ok(t, verifySyncgroupMembers(sbs[0].clientCtx, sbs[0].sbName, sgId, 3))
+ ok(t, verifySyncgroupMembers(sbs[1].clientCtx, sbs[1].sbName, sgId, 3))
+ ok(t, verifySyncgroupMembers(sbs[2].clientCtx, sbs[2].sbName, sgId, 3))
+
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, "c", "foo", "", 0, 10))
+ ok(t, verifySyncgroupData(sbs[2].clientCtx, sbs[2].sbName, "c", "foo", "", 0, 10))
+}
+
+// TestV23VSyncMultiApp tests the sending of deltas between two Syncbase
+// instances and their clients across multiple databases and collections. The
+// 1st client puts entries in multiple collections across multiple databases
+// then creates multiple syncgroups (one per database) over that data. The 2nd
+// client joins these syncgroups and reads all the data.
+func TestV23VSyncMultiApp(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ // Setup 2 Syncbases.
+ sbs := setupSyncbases(t, sh, 2, false)
+
+ na, nd, nc := 2, 2, 2 // number of apps, dbs, collections
+
+ ok(t, setupAppMulti(sbs[0].clientCtx, sbs[0].sbName, na, nd))
+ ok(t, setupAppMulti(sbs[1].clientCtx, sbs[1].sbName, na, nd))
+
+ ok(t, populateAndCreateSyncgroupMulti(sbs[0].clientCtx, sbs[0].sbName, na, nd, nc, "foo,bar", sbBlessings(sbs), clBlessings(sbs)))
+ ok(t, joinSyncgroupMulti(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, na, nd))
+ ok(t, verifySyncgroupDataMulti(sbs[1].clientCtx, sbs[1].sbName, na, nd, nc, "foo,bar"))
+}
+
+////////////////////////////////////////////////////////////
+// Helpers for adding or updating data
+
+func setSyncgroupSpec(ctx *context.T, syncbaseName string, sgId wire.Id, sgDesc, sgColls, mtName, blessingPatterns string, perms access.Permissions) error {
+ if mtName == "" {
+ roots := v23.GetNamespace(ctx).Roots()
+ if len(roots) == 0 {
+ return errors.New("no namespace roots")
+ }
+ mtName = roots[0]
+ }
+
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+
+ if perms == nil {
+ perms = tu.DefaultPerms(strings.Split(blessingPatterns, ";")...)
+ }
+
+ spec := wire.SyncgroupSpec{
+ Description: sgDesc,
+ Perms: perms,
+ Collections: parseSgCollections(sgColls),
+ MountTables: []string{mtName},
+ }
+
+ sg := d.SyncgroupForId(sgId)
+ if err := sg.SetSpec(ctx, spec, ""); err != nil {
+ return fmt.Errorf("SetSpec SG %q failed: %v\n", sgId, err)
+ }
+ return nil
+}
+
+func deleteData(ctx *context.T, syncbaseName string, collectionName, keyPrefix string, start, end int) error {
+ if end <= start {
+ return fmt.Errorf("end (%d) <= start (%d)", end, start)
+ }
+
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ c := d.CollectionForId(collectionId)
+
+ for i := start; i < end; i++ {
+ key := fmt.Sprintf("%s%d", keyPrefix, i)
+ r := c.Row(key)
+ if err := r.Delete(ctx); err != nil {
+ return fmt.Errorf("r.Delete() failed: %v\n", err)
+ }
+ }
+ return nil
+}
+
+func setCollectionPermissions(ctx *context.T, syncbaseName string, blessingPatterns string) error {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ c := d.CollectionForId(testCx)
+
+ perms := tu.DefaultPerms(strings.Split(blessingPatterns, ";")...)
+
+ if err := c.SetPermissions(ctx, perms); err != nil {
+ return fmt.Errorf("c.SetPermissions() failed: %v\n", err)
+ }
+ return nil
+}
+
+func setupAppMulti(ctx *context.T, syncbaseName string, numApps, numDbs int) error {
+ svc := syncbase.NewService(syncbaseName)
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
+ d.Create(ctx, nil)
+ }
+ }
+ return nil
+}
+
+func populateAndCreateSyncgroupMulti(ctx *context.T, syncbaseName string, numApps, numDbs, numCxs int, prefixStr, sbBlessings, clBlessings string) error {
+ roots := v23.GetNamespace(ctx).Roots()
+ if len(roots) == 0 {
+ return errors.New("no namespace roots")
+ }
+ mtName := roots[0]
+
+ sbperms := tu.DefaultPerms(strings.Split(sbBlessings, ";")...)
+ clperms := tu.DefaultPerms(strings.Split(clBlessings, ";")...)
+
+ svc := syncbase.NewService(syncbaseName)
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+
+ for j := 0; j < numDbs; j++ {
+
+ dbName := fmt.Sprintf("d%d", j)
+ d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
+
+ // For each collection, pre-populate entries on each prefix.
+ // Also determine the syncgroup collections.
+ var sgColls []wire.Id
+ for k := 0; k < numCxs; k++ {
+ cName := fmt.Sprintf("c%d", k)
+ cId := wire.Id{"u", cName}
+ c := d.CollectionForId(cId)
+ if err := c.Create(ctx, clperms); err != nil {
+ return fmt.Errorf("{%q, %v} c.Create failed %v", syncbaseName, cId, err)
+ }
+
+ sgColls = append(sgColls, cId)
+
+ prefixes := strings.Split(prefixStr, ",")
+ for _, pfx := range prefixes {
+ for n := 0; n < 10; n++ {
+ key := fmt.Sprintf("%s%d", pfx, n)
+ r := c.Row(key)
+ if err := r.Put(ctx, "testkey"+key); err != nil {
+ return fmt.Errorf("r.Put() failed: %v\n", err)
+ }
+ }
+ }
+ }
+
+ // Create one syncgroup per database across all collections.
+ sgName := fmt.Sprintf("%s_%s", appName, dbName)
+ sgId := wire.Id{Name: sgName, Blessing: "blessing"}
+ spec := wire.SyncgroupSpec{
+ Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
+ Perms: sbperms,
+ Collections: sgColls,
+ MountTables: []string{mtName},
+ }
+
+ sg := d.SyncgroupForId(sgId)
+ info := wire.SyncgroupMemberInfo{SyncPriority: 8}
+ if err := sg.Create(ctx, spec, info); err != nil {
+ return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
+ }
+ }
+ }
+ return nil
+}
+
+func joinSyncgroupMulti(ctx *context.T, sbNameLocal, sbNameRemote string, numApps, numDbs int) error {
+ svc := syncbase.NewService(sbNameLocal)
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
+
+ sgName := fmt.Sprintf("%s_%s", appName, dbName)
+ sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
+ info := wire.SyncgroupMemberInfo{SyncPriority: 10}
+ if _, err := sg.Join(ctx, sbNameRemote, nil, info); err != nil {
+ return fmt.Errorf("Join SG Multi %q failed: %v\n", sgName, err)
+ }
+ }
+ }
+ return nil
+}
+
+func getResumeMarker(ctx *context.T, syncbaseName string) (watch.ResumeMarker, error) {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ return d.GetResumeMarker(ctx)
+}
+
+// ByRow implements sort.Interface for []syncbase.WatchChange based on the Row field.
+type ByRow []syncbase.WatchChange
+
+func (c ByRow) Len() int { return len(c) }
+func (c ByRow) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
+func (c ByRow) Less(i, j int) bool { return c[i].Row < c[j].Row }
+
+////////////////////////////////////////////////////////////
+// Helpers for verifying data
+
+func verifySyncgroupSpec(ctx *context.T, syncbaseName string, sgId wire.Id, wantDesc, wantColls, wantBlessingPatterns string) error {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ sg := d.SyncgroupForId(sgId)
+
+ wantCollections := parseSgCollections(wantColls)
+ wantPerms := tu.DefaultPerms(strings.Split(wantBlessingPatterns, ";")...)
+
+ var spec wire.SyncgroupSpec
+ var err error
+ for i := 0; i < 20; i++ {
+ time.Sleep(500 * time.Millisecond)
+ spec, _, err = sg.GetSpec(ctx)
+ if err != nil {
+ return fmt.Errorf("GetSpec SG %q failed: %v\n", sgId, err)
+ }
+ if spec.Description == wantDesc {
+ break
+ }
+ }
+ if spec.Description != wantDesc || !reflect.DeepEqual(spec.Collections, wantCollections) || !reflect.DeepEqual(spec.Perms, wantPerms) {
+ return fmt.Errorf("GetSpec SG %q failed: description got %v, want %v, collections got %v, want %v, perms got %v, want %v\n",
+ sgId, spec.Description, wantDesc, spec.Collections, wantCollections, spec.Perms, wantPerms)
+ }
+ return nil
+}
+
+func verifySyncgroupDeletedData(ctx *context.T, syncbaseName, collectionName, keyPrefix, valuePrefix string, start, count int) error {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ c := d.CollectionForId(collectionId)
+
+ // Wait for a bit for deletions to propagate.
+ lastKey := fmt.Sprintf("%s%d", keyPrefix, start-1)
+ for i := 0; i < 20; i++ {
+ time.Sleep(500 * time.Millisecond)
+ r := c.Row(lastKey)
+ var value string
+ if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
+ break
+ }
+ }
+
+ if valuePrefix == "" {
+ valuePrefix = "testkey"
+ }
+
+ // Verify using a scan operation.
+ stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
+ cGot := 0
+ for i := start; stream.Advance(); i++ {
+ want := fmt.Sprintf("%s%d", keyPrefix, i)
+ got := stream.Key()
+ if got != want {
+ return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+ }
+ want = valuePrefix + want
+ if err := stream.Value(&got); err != nil {
+ return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+ }
+ if got != want {
+ return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+ }
+ cGot++
+ }
+
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("scan stream error: %v\n", err)
+ }
+
+ if cGot != count {
+ return fmt.Errorf("scan stream count error: %v %v\n", cGot, count)
+ }
+ return nil
+}
+
+func verifySyncgroupDataWithWatch(ctx *context.T, syncbaseName, collectionName, keyPrefix, valuePrefix string, count int, expectDelete bool, beforeSyncMarker watch.ResumeMarker) error {
+ if count == 0 {
+ return fmt.Errorf("count cannot be 0: got %d", count)
+ }
+ if valuePrefix == "" {
+ valuePrefix = "testkey"
+ }
+
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+
+ ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+
+ stream, err := d.Watch(ctxWithTimeout, collectionId, keyPrefix, beforeSyncMarker)
+ if err != nil {
+ return fmt.Errorf("watch error: %v\n", err)
+ }
+
+ var changes []syncbase.WatchChange
+ for i := 0; stream.Advance() && i < count; i++ {
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("watch stream error: %v\n", err)
+ }
+ changes = append(changes, stream.Change())
+ }
+
+ sort.Sort(ByRow(changes))
+
+ if got, want := len(changes), count; got != want {
+ return fmt.Errorf("unexpected number of changes: got %d, want %d", got, want)
+ }
+
+ for i, change := range changes {
+ if got, want := change.Collection, collectionId; got != want {
+ return fmt.Errorf("unexpected watch collection: got %v, want %v", got, want)
+ }
+ if got, want := change.Row, fmt.Sprintf("%s%d", keyPrefix, i); got != want {
+ return fmt.Errorf("unexpected watch row: got %q, want %q", got, want)
+ }
+ if got, want := change.FromSync, true; got != want {
+ return fmt.Errorf("unexpected FromSync value: got %t, want %t", got, want)
+ }
+ if expectDelete {
+ if got, want := change.ChangeType, syncbase.DeleteChange; got != want {
+ return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+ }
+ return nil
+ }
+ var result string
+ if got, want := change.ChangeType, syncbase.PutChange; got != want {
+ return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
+ }
+ if err := change.Value(&result); err != nil {
+ return fmt.Errorf("couldn't decode watch value: %v", err)
+ }
+ if got, want := result, fmt.Sprintf("%s%s%d", valuePrefix, keyPrefix, i); got != want {
+ return fmt.Errorf("unexpected watch value: got %q, want %q", got, want)
+ }
+ }
+ return nil
+}
+
+func verifyLostAccess(ctx *context.T, syncbaseName, collectionName, keyPrefix string, start, count int) error {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ c := d.CollectionForId(collectionId)
+
+ lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
+ r := c.Row(lastKey)
+ for i := 0; i < 20; i++ {
+ time.Sleep(500 * time.Millisecond)
+ var value string
+ if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoAccess.ID {
+ break
+ }
+ }
+
+ // Verify that all keys have lost access.
+ for i := start; i < start+count; i++ {
+ key := fmt.Sprintf("%s%d", keyPrefix, i)
+ r := c.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ return fmt.Errorf("r.Get() didn't fail: %v\n", err)
+ }
+ }
+ return nil
+}
+
+func verifySyncgroupDataMulti(ctx *context.T, syncbaseName string, numApps, numDbs, numCxs int, prefixStr string) error {
+ svc := syncbase.NewService(syncbaseName)
+
+ for i := 0; i < numApps; i++ {
+ appName := fmt.Sprintf("a%d", i)
+
+ for j := 0; j < numDbs; j++ {
+ dbName := fmt.Sprintf("d%d", j)
+ d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
+
+ for k := 0; k < numCxs; k++ {
+ cName := fmt.Sprintf("c%d", k)
+ c := d.CollectionForId(wire.Id{"u", cName})
+
+ prefixes := strings.Split(prefixStr, ",")
+ for _, pfx := range prefixes {
+ for n := 0; n < 10; n++ {
+ key := fmt.Sprintf("%s%d", pfx, n)
+ r := c.Row(key)
+ var got string
+ var err error
+ // Wait for some time to sync.
+ for t := 0; t < 20; t++ {
+ time.Sleep(500 * time.Millisecond)
+ if err = r.Get(ctx, &got); err == nil {
+ break
+ }
+ }
+ if err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := "testkey" + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n",
+ got, want)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/syncbase/featuretests/syncgroup_v23_test.go b/syncbase/featuretests/syncgroup_v23_test.go
index 49aba8c..d934ea9 100644
--- a/syncbase/featuretests/syncgroup_v23_test.go
+++ b/syncbase/featuretests/syncgroup_v23_test.go
@@ -37,7 +37,7 @@
sbName := sbs[0].sbName
sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
- ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c:foo", "", sbBlessings(sbs), nil))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", sbBlessings(sbs), nil, clBlessings(sbs)))
// Remaining syncbases run the specified workload concurrently.
for i := 1; i < len(sbs); i++ {
@@ -48,7 +48,7 @@
// Populate data on creator as well.
keypfx := "foo==" + sbs[0].sbName + "=="
- ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, keypfx, 0, 5))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, keypfx, 0, 5))
// Verify steady state sequentially.
for _, sb := range sbs {
@@ -80,7 +80,7 @@
sbName := sbs[N].sbName
sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
- ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c:foo", "", sbBlessings(sbs), nil))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", sbBlessings(sbs), nil, clBlessings(sbs)))
// Remaining N-1 syncbases run the specified workload concurrently.
for i := 1; i < N; i++ {
@@ -91,7 +91,7 @@
// Populate data on creator as well.
keypfx := "foo==" + sbs[0].sbName + "=="
- ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, keypfx, 0, 5))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, keypfx, 0, 5))
// Verify steady state sequentially.
for i := 0; i < N; i++ {
@@ -138,7 +138,7 @@
}
perms.Add(security.BlessingPattern("root:"+sbs[0].sbName), string(access.Admin))
- ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c:foo", "/mttable", "root", perms))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "/mttable", "", perms, clBlessings(sbs)))
// Remaining syncbases run the specified workload concurrently.
for i := 1; i < len(sbs); i++ {
@@ -149,7 +149,7 @@
// Populate data on creator as well.
keypfx := "foo==" + sbs[0].sbName + "=="
- ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, keypfx, 0, 5))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, keypfx, 0, 5))
// Verify steady state sequentially.
for _, sb := range sbs {
@@ -178,20 +178,19 @@
// stagger the process.
sbName := sbs[0].sbName
sgId := wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
- ok(t, joinOrCreateSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sbName, sgId, "c:foo", "", sbBlessings(sbs)))
+ ok(t, joinOrCreateSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sbName, sgId, testCx.Name, "", sbBlessings(sbs), clBlessings(sbs)))
- // Remaining syncbases run the specified workload concurrently.
+ // Remaining syncbases join the syncgroup concurrently.
for i := 1; i < len(sbs); i++ {
go func(i int) {
- ok(t, joinOrCreateSyncgroup(sbs[i].clientCtx, sbs[i].sbName, sbName, sgId, "c:foo", "", sbBlessings(sbs)))
+ ok(t, joinOrCreateSyncgroup(sbs[i].clientCtx, sbs[i].sbName, sbName, sgId, testCx.Name, "", sbBlessings(sbs), clBlessings(sbs)))
}(i)
}
- // Populate and join occur concurrently.
+ // Populate data concurrently.
for _, sb := range sbs {
go func(sb *testSyncbase) {
- keypfx := "foo==" + sb.sbName + "=="
- ok(t, populateData(sb.clientCtx, sb.sbName, keypfx, 0, 5))
+ ok(t, populateDataWithRetry(sb.clientCtx, sb.sbName, "foo"))
}(sb)
}
@@ -220,23 +219,36 @@
}
// Populate some data without colliding with data from other Syncbases.
+ return populateDataWithRetry(ctx, syncbaseName, prefix)
+}
+
+func populateDataWithRetry(ctx *context.T, syncbaseName, prefix string) error {
+ var err error
keypfx := prefix + "==" + syncbaseName + "=="
- return populateData(ctx, syncbaseName, keypfx, 0, 5)
+ for i := 0; i < 8; i++ {
+ // Wait for the presence of the collection and its ACL to be
+ // synced.
+ time.Sleep(500 * time.Millisecond)
+ if err = populateData(ctx, syncbaseName, testCx.Name, keypfx, 0, 5); err == nil {
+ break
+ }
+ }
+ return err
}
func verifySync(ctx *context.T, syncbaseName string, numSyncbases int, prefix string) error {
for i := numSyncbases - 1; i >= 0; i-- {
keypfx := fmt.Sprintf("%s==s%d==", prefix, i)
- if err := verifySyncgroupData(ctx, syncbaseName, keypfx, 0, 5); err != nil {
+ if err := verifySyncgroupData(ctx, syncbaseName, testCx.Name, keypfx, "", 0, 5); err != nil {
return err
}
}
return nil
}
-func joinOrCreateSyncgroup(ctx *context.T, sbNameLocal, sbNameRemote string, sgId wire.Id, sgPrefixes, mtName, bps string) error {
+func joinOrCreateSyncgroup(ctx *context.T, sbNameLocal, sbNameRemote string, sgId wire.Id, sgColls, mtName, bps, clbps string) error {
if err := joinSyncgroup(ctx, sbNameLocal, sbNameRemote, sgId); err == nil {
return nil
}
- return createSyncgroup(ctx, sbNameLocal, sgId, sgPrefixes, mtName, bps, nil)
+ return createSyncgroup(ctx, sbNameLocal, sgId, sgColls, mtName, bps, nil, clbps)
}
diff --git a/syncbase/featuretests/test_util_test.go b/syncbase/featuretests/test_util_test.go
index b1148b0..e7148f6 100644
--- a/syncbase/featuretests/test_util_test.go
+++ b/syncbase/featuretests/test_util_test.go
@@ -37,15 +37,19 @@
func setupHierarchy(ctx *context.T, syncbaseName string) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
- if err := d.Create(ctx, nil); err != nil {
- return err
- }
- return d.CollectionForId(testCx).Create(ctx, nil)
+ return d.Create(ctx, nil)
+}
+
+func createCollection(ctx *context.T, syncbaseName, collectionName string) error {
+ d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ return d.CollectionForId(collectionId).Create(ctx, nil)
}
type testSyncbase struct {
sbName string
sbCreds *v23test.Credentials
+ rootDir string
clientId string
clientCtx *context.T
cleanup func(sig os.Signal)
@@ -59,12 +63,13 @@
sbs[i] = &testSyncbase{
sbName: sbName,
sbCreds: sh.ForkCredentials(sbName),
+ rootDir: sh.MakeTempDir(),
clientId: clientId,
clientCtx: sh.ForkContext(clientId),
}
// Give XRWA permissions to this Syncbase's client.
acl := fmt.Sprintf(`{"Resolve":{"In":["root:%s"]},"Read":{"In":["root:%s"]},"Write":{"In":["root:%s"]},"Admin":{"In":["root:%s"]}}`, clientId, clientId, clientId, clientId)
- sbs[i].cleanup = sh.StartSyncbase(sbs[i].sbCreds, syncbaselib.Opts{Name: sbs[i].sbName, DevMode: devMode}, acl)
+ sbs[i].cleanup = sh.StartSyncbase(sbs[i].sbCreds, syncbaselib.Opts{Name: sbs[i].sbName, RootDir: sbs[i].rootDir, DevMode: devMode}, acl)
}
// Call setupHierarchy on each Syncbase.
for _, sb := range sbs {
@@ -82,16 +87,26 @@
return strings.Join(names, ";")
}
+// Returns a ";"-separated list of Syncbase clients blessing names.
+func clBlessings(sbs []*testSyncbase) string {
+ names := make([]string, len(sbs))
+ for i, sb := range sbs {
+ names[i] = "root:" + sb.clientId
+ }
+ return strings.Join(names, ";")
+}
+
////////////////////////////////////////////////////////////
// Helpers for adding or updating data
-func populateData(ctx *context.T, syncbaseName, keyPrefix string, start, end int) error {
+func populateData(ctx *context.T, syncbaseName, collectionName, keyPrefix string, start, end int) error {
if end <= start {
return fmt.Errorf("end (%d) <= start (%d)", end, start)
}
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
- c := d.CollectionForId(testCx)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ c := d.CollectionForId(collectionId)
for i := start; i < end; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
@@ -166,11 +181,10 @@
////////////////////////////////////////////////////////////
// Helpers for verifying data
-const skipScan = true
-
-func verifySyncgroupData(ctx *context.T, syncbaseName, keyPrefix string, start, count int) error {
+func verifySyncgroupData(ctx *context.T, syncbaseName, collectionName, keyPrefix, valuePrefix string, start, count int) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
- c := d.CollectionForId(testCx)
+ collectionId := wire.Id{Blessing: "u", Name: collectionName}
+ c := d.CollectionForId(collectionId)
// Wait a bit (up to 10 seconds) for the last key to appear.
lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
@@ -182,6 +196,10 @@
}
}
+ if valuePrefix == "" {
+ valuePrefix = "testkey"
+ }
+
// Verify that all keys and values made it over correctly.
for i := start; i < start+count; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
@@ -189,33 +207,11 @@
if err := c.Get(ctx, key, &got); err != nil {
return fmt.Errorf("c.Get() failed: %v", err)
}
- want := "testkey" + key
+ want := valuePrefix + key
if got != want {
return fmt.Errorf("unexpected value: got %q, want %q", got, want)
}
}
-
- // TODO(sadovsky): Drop this? (Does it buy us much?)
- if !skipScan {
- // Re-verify using a scan operation.
- stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
- for i := 0; stream.Advance(); i++ {
- got, want := stream.Key(), fmt.Sprintf("%s%d", keyPrefix, i)
- if got != want {
- return fmt.Errorf("unexpected key in scan: got %q, want %q", got, want)
- }
- want = "testkey" + want
- if err := stream.Value(&got); err != nil {
- return fmt.Errorf("failed to fetch value in scan: %v", err)
- }
- if got != want {
- return fmt.Errorf("unexpected value in scan: got %q, want %q", got, want)
- }
- }
- if err := stream.Err(); err != nil {
- return fmt.Errorf("scan stream error: %v", err)
- }
- }
return nil
}
@@ -223,7 +219,7 @@
// Helpers for managing syncgroups
// blessingPatterns is a ";"-separated list of blessing patterns.
-func createSyncgroup(ctx *context.T, syncbaseName string, sgId wire.Id, sgPrefixes, mtName, blessingPatterns string, perms access.Permissions) error {
+func createSyncgroup(ctx *context.T, syncbaseName string, sgId wire.Id, sgColls, mtName, sbBlessings string, perms access.Permissions, clBlessings string) error {
if mtName == "" {
roots := v23.GetNamespace(ctx).Roots()
if len(roots) == 0 {
@@ -235,17 +231,29 @@
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
if perms == nil {
- perms = tu.DefaultPerms(strings.Split(blessingPatterns, ";")...)
+ perms = tu.DefaultPerms(strings.Split(sbBlessings, ";")...)
}
+ clperms := tu.DefaultPerms(strings.Split(clBlessings, ";")...)
spec := wire.SyncgroupSpec{
Description: "test syncgroup sg",
Perms: perms,
- Prefixes: parseSgPrefixes(sgPrefixes),
+ Collections: parseSgCollections(sgColls),
MountTables: []string{mtName},
}
+ // Change the collection ACLs to enable syncing.
+ for _, cId := range spec.Collections {
+ c := d.CollectionForId(cId)
+ // Ignore the error since sometimes a collection might already exist.
+ c.Create(ctx, nil)
+ if err := c.SetPermissions(ctx, clperms); err != nil {
+ return fmt.Errorf("{%q, %v} c.SetPermissions failed: %v", syncbaseName, cId, err)
+ }
+ }
+
sg := d.SyncgroupForId(sgId)
+
info := wire.SyncgroupMemberInfo{SyncPriority: 8}
if err := sg.Create(ctx, spec, info); err != nil {
return fmt.Errorf("{%q, %v} sg.Create() failed: %v", syncbaseName, sgId, err)
@@ -296,18 +304,14 @@
////////////////////////////////////////////////////////////
// Syncbase-specific testing helpers
-// parseSgPrefixes converts, for example, "a:b,c:" to
-// [{Collection: {"u", "a"}, Row: "b"}, {Collection: {"u", "c"}, Row: ""}].
+// parseSgCollections converts, for example, "a,c" to
+// [Collection: {"u", "a"}, Collection: {"u", "c"}].
// TODO(ivanpi): Change format to support user blessings other than "u".
-func parseSgPrefixes(csv string) []wire.CollectionRow {
+func parseSgCollections(csv string) []wire.Id {
strs := strings.Split(csv, ",")
- res := make([]wire.CollectionRow, len(strs))
+ res := make([]wire.Id, len(strs))
for i, v := range strs {
- parts := strings.SplitN(v, ":", 2)
- if len(parts) != 2 {
- panic(fmt.Sprintf("invalid prefix string: %q", v))
- }
- res[i] = wire.CollectionRow{CollectionId: wire.Id{"u", parts[0]}, Row: parts[1]}
+ res[i] = wire.Id{"u", v}
}
return res
}
diff --git a/syncbase/featuretests/vclock_v23_test.go b/syncbase/featuretests/vclock_v23_test.go
index dac8098..19967a6 100644
--- a/syncbase/featuretests/vclock_v23_test.go
+++ b/syncbase/featuretests/vclock_v23_test.go
@@ -259,14 +259,14 @@
sbs := setupSyncbases(t, sh, 4, true)
- checkSbTimeNotEq(t, "s0", sbs[0].clientCtx, jan2015)
+ checkSbTimeNotEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015)
// Do NTP at s0.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
NtpHost: startFakeNtpServer(t, sh, jan2015),
DoNtpUpdate: true,
}))
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015)
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015)
// Set up a chain of syncgroups, then wait for a few seconds to allow the
// Syncbases to sync.
@@ -276,9 +276,9 @@
time.Sleep(fiveSecs)
// s1 and s2 should sync s0's clock; s3 should not.
- checkSbTimeApproxEq(t, "s1", sbs[1].clientCtx, jan2015.Add(fiveSecs))
- checkSbTimeApproxEq(t, "s2", sbs[2].clientCtx, jan2015.Add(fiveSecs))
- checkSbTimeNotEq(t, "s3", sbs[3].clientCtx, jan2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[1].sbName, sbs[1].clientCtx, jan2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[2].sbName, sbs[2].clientCtx, jan2015.Add(fiveSecs))
+ checkSbTimeNotEq(t, sbs[3].sbName, sbs[3].clientCtx, jan2015.Add(fiveSecs))
}
// Tests p2p clock sync where multiple devices are NTP-synced.
@@ -291,17 +291,17 @@
sbs := setupSyncbases(t, sh, 3, true)
// Do NTP at s0 and s2.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
NtpHost: startFakeNtpServer(t, sh, jan2015),
DoNtpUpdate: true,
}))
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015)
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015)
- ok(t, sc("s2").DevModeUpdateVClock(sbs[2].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[2].sbName).DevModeUpdateVClock(sbs[2].clientCtx, wire.DevModeUpdateVClockOpts{
NtpHost: startFakeNtpServer(t, sh, feb2015),
DoNtpUpdate: true,
}))
- checkSbTimeApproxEq(t, "s2", sbs[2].clientCtx, feb2015)
+ checkSbTimeApproxEq(t, sbs[2].sbName, sbs[2].clientCtx, feb2015)
// Set up a chain of syncgroups, then wait for a few seconds to allow the
// Syncbases to sync.
@@ -317,21 +317,21 @@
// have NTP'ed at times t0 and t1 respectively. If C transitively hears from A
// before talking to D, D will not pick up C's NTP time even though C is just
// one hop away. This probably doesn't matter much in practice.
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, feb2015.Add(fiveSecs))
- checkSbTimeApproxEq(t, "s1", sbs[1].clientCtx, feb2015.Add(fiveSecs))
- checkSbTimeApproxEq(t, "s2", sbs[2].clientCtx, feb2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, feb2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[1].sbName, sbs[1].clientCtx, feb2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[2].sbName, sbs[2].clientCtx, feb2015.Add(fiveSecs))
// Do NTP at s0 again; the update should propagate through the existing
// syncgroups.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
NtpHost: startFakeNtpServer(t, sh, mar2015),
DoNtpUpdate: true,
}))
time.Sleep(fiveSecs)
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, mar2015.Add(fiveSecs))
- checkSbTimeApproxEq(t, "s1", sbs[1].clientCtx, mar2015.Add(fiveSecs))
- checkSbTimeApproxEq(t, "s2", sbs[2].clientCtx, mar2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, mar2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[1].sbName, sbs[1].clientCtx, mar2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[2].sbName, sbs[2].clientCtx, mar2015.Add(fiveSecs))
}
// Tests p2p clock sync where local is not NTP-synced and is 1 hop away from an
@@ -345,44 +345,44 @@
sbs := setupSyncbases(t, sh, 2, true)
// Set s0's local clock.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
Now: jan2015.Add(-time.Hour),
ElapsedTime: 0,
DoLocalUpdate: true,
}))
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015.Add(-time.Hour))
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015.Add(-time.Hour))
// Do NTP at s0. As a result, s0 will think it has a one hour NTP skew, i.e.
// NTP time minus system clock time equals one hour.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
NtpHost: startFakeNtpServer(t, sh, jan2015),
DoNtpUpdate: true,
}))
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015)
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015)
// Set s1's local clock.
- ok(t, sc("s1").DevModeUpdateVClock(sbs[1].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[1].sbName).DevModeUpdateVClock(sbs[1].clientCtx, wire.DevModeUpdateVClockOpts{
Now: feb2015,
ElapsedTime: 0,
DoLocalUpdate: true,
}))
- checkSbTimeApproxEq(t, "s1", sbs[1].clientCtx, feb2015)
+ checkSbTimeApproxEq(t, sbs[1].sbName, sbs[1].clientCtx, feb2015)
// Move time forward at s0, and reset its elapsed time to 0. Syncbase should
// detect a reboot, and should reflect the new time. Note that the time
// reported by s0.GetTime should reflect its one hour NTP skew.
- ok(t, sc("s0").DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
+ ok(t, sc(sbs[0].sbName).DevModeUpdateVClock(sbs[0].clientCtx, wire.DevModeUpdateVClockOpts{
Now: jan2015.Add(8 * time.Hour),
ElapsedTime: 0,
DoLocalUpdate: true,
}))
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015.Add(9*time.Hour))
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015.Add(9*time.Hour))
// Since s0 thinks it has rebooted, s1 should not get s0's clock.
setupChain(t, sbs)
time.Sleep(fiveSecs)
- checkSbTimeApproxEq(t, "s0", sbs[0].clientCtx, jan2015.Add(9*time.Hour).Add(fiveSecs))
- checkSbTimeApproxEq(t, "s1", sbs[1].clientCtx, feb2015.Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[0].sbName, sbs[0].clientCtx, jan2015.Add(9*time.Hour).Add(fiveSecs))
+ checkSbTimeApproxEq(t, sbs[1].sbName, sbs[1].clientCtx, feb2015.Add(fiveSecs))
}
////////////////////////////////////////////////////////////////////////////////
@@ -396,9 +396,13 @@
break
}
a, b := sbs[i], sbs[i+1]
+
sgId := wire.Id{Name: fmt.Sprintf("syncgroup%d", i), Blessing: "root"}
- ok(t, createSyncgroup(a.clientCtx, a.sbName, sgId, testCx.Name+":"+a.sbName+b.sbName, "", "root", nil))
+ collectionName := testCx.Name + "_" + a.sbName + b.sbName
+ ok(t, createCollection(a.clientCtx, a.sbName, collectionName))
+ ok(t, createSyncgroup(a.clientCtx, a.sbName, sgId, collectionName, "", "root", nil, clBlessings(sbs)))
ok(t, joinSyncgroup(b.clientCtx, b.sbName, a.sbName, sgId))
+
// Wait for a to see b.
ok(t, verifySyncgroupMembers(a.clientCtx, a.sbName, sgId, 2))
}
diff --git a/syncbase/syncgroup_test.go b/syncbase/syncgroup_test.go
index b71c378..4e26d13 100644
--- a/syncbase/syncgroup_test.go
+++ b/syncbase/syncgroup_test.go
@@ -17,6 +17,8 @@
tu "v.io/x/ref/services/syncbase/testutil"
)
+var testCollection = wire.Id{"v.io:u:sam", "c"}
+
// Tests that Syncgroup.Create works as expected.
func TestCreateSyncgroup(t *testing.T) {
ctx, sName, cleanup := tu.SetupOrDie(perms("root:client"))
@@ -34,19 +36,18 @@
// Prefill entries before creating a syncgroup to exercise the bootstrap
// of a syncgroup through Snapshot operations to the watcher.
- t1 := tu.CreateCollection(t, ctx, d, "t1")
+ c := tu.CreateCollection(t, ctx, d, "c")
for _, k := range []string{"foo123", "foobar123", "xyz"} {
- if err := t1.Put(ctx, k, "value@"+k); err != nil {
- t.Fatalf("t1.Put() of %s failed: %v", k, err)
+ if err := c.Put(ctx, k, "value@"+k); err != nil {
+ t.Fatalf("c1.Put() of %s failed: %v", k, err)
}
}
// Create successfully.
- // TODO(rdaoud): switch prefixes to (collection, prefix) tuples.
spec = wire.SyncgroupSpec{
Description: "test syncgroup sg1",
Perms: nil,
- Prefixes: []wire.CollectionRow{{CollectionId: testCollection, Row: "foo"}},
+ Collections: []wire.Id{testCollection},
}
createSyncgroup(t, ctx, d, sg1, spec, verror.ID(""))
@@ -68,15 +69,12 @@
verifySyncgroups(t, ctx, d, wantGroups, verror.ID(""))
verifySyncgroupInfo(t, ctx, d, sg2, spec, 1)
- // Create a nested syncgroup.
+ // Check if creating a syncgroup on a non-existing collection fails.
spec.Description = "test syncgroup sg3"
- spec.Prefixes = []wire.CollectionRow{{CollectionId: testCollection, Row: "foobar"}}
+ spec.Collections = []wire.Id{wire.Id{"u", "c1"}}
sg3 := wire.Id{Name: "sg3", Blessing: "b3"}
- createSyncgroup(t, ctx, d, sg3, spec, verror.ID(""))
-
- wantGroups = []wire.Id{sg1, sg2, sg3}
+ createSyncgroup(t, ctx, d, sg3, spec, verror.ErrNoExist.ID)
verifySyncgroups(t, ctx, d, wantGroups, verror.ID(""))
- verifySyncgroupInfo(t, ctx, d, sg3, spec, 1)
// Check that create fails if the perms disallow access.
perms := perms("root:client")
@@ -85,6 +83,7 @@
t.Fatalf("d.SetPermissions() failed: %v", err)
}
spec.Description = "test syncgroup sg4"
+ spec.Collections = []wire.Id{wire.Id{"u", "c"}}
sg4 := wire.Id{Name: "sg4", Blessing: "b4"}
createSyncgroup(t, ctx, d, sg4, spec, verror.ErrNoAccess.ID)
verifySyncgroups(t, ctx, d, nil, verror.ErrNoAccess.ID)
@@ -99,10 +98,11 @@
defer cleanup()
d1 := tu.CreateDatabase(t, ctx1, syncbase.NewService(sName), "d")
+ tu.CreateCollection(t, ctx1, d1, "c")
specA := wire.SyncgroupSpec{
Description: "test syncgroup sgA",
Perms: perms("root:client1"),
- Prefixes: []wire.CollectionRow{{CollectionId: testCollection, Row: "foo"}},
+ Collections: []wire.Id{testCollection},
}
sgIdA := wire.Id{Name: "sgA", Blessing: "bA"}
createSyncgroup(t, ctx1, d1, sgIdA, specA, verror.ID(""))
@@ -136,7 +136,7 @@
specB := wire.SyncgroupSpec{
Description: "test syncgroup sgB",
Perms: perms("root:client1", "root:client2"),
- Prefixes: []wire.CollectionRow{{CollectionId: testCollection, Row: "foo"}},
+ Collections: []wire.Id{testCollection},
}
sgIdB := wire.Id{Name: "sgB", Blessing: "bB"}
createSyncgroup(t, ctx1, d1, sgIdB, specB, verror.ID(""))
@@ -158,13 +158,14 @@
ctx, sName, cleanup := tu.SetupOrDie(perms("root:client"))
defer cleanup()
d := tu.CreateDatabase(t, ctx, syncbase.NewService(sName), "d")
+ tu.CreateCollection(t, ctx, d, "c")
// Create successfully.
sgId := wire.Id{Name: "sg1", Blessing: "b1"}
spec := wire.SyncgroupSpec{
Description: "test syncgroup sg1",
Perms: perms("root:client"),
- Prefixes: []wire.CollectionRow{{CollectionId: testCollection, Row: "foo"}},
+ Collections: []wire.Id{testCollection},
}
createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
diff --git a/syncbase/syncgroup_v23_test.go b/syncbase/syncgroup_v23_test.go
deleted file mode 100644
index 72cac81..0000000
--- a/syncbase/syncgroup_v23_test.go
+++ /dev/null
@@ -1,1229 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package syncbase_test
-
-import (
- "fmt"
- "math/rand"
- "os"
- "reflect"
- "runtime/debug"
- "sort"
- "strings"
- "testing"
- "time"
-
- "v.io/v23"
- "v.io/v23/context"
- wire "v.io/v23/services/syncbase"
- "v.io/v23/services/watch"
- "v.io/v23/syncbase"
- "v.io/v23/verror"
- _ "v.io/x/ref/runtime/factories/roaming"
- "v.io/x/ref/services/syncbase/syncbaselib"
- "v.io/x/ref/test/v23test"
-)
-
-var testCollection = wire.Id{"u", "c"}
-
-func dbHandle(serviceName string) syncbase.Database {
- return syncbase.NewService(serviceName).DatabaseForId(wire.Id{"a", "d"}, nil)
-}
-
-// NOTE(sadovsky): These tests take a very long time to run - nearly 4 minutes
-// on my Macbook Pro! Various instances of time.Sleep() below likely contribute
-// to the problem.
-
-// TODO(ivanpi): Move to featuretests and deduplicate helpers.
-
-// TestV23SyncbasedJoinSyncgroup tests the creation and joining of a syncgroup.
-// Client0 creates a syncgroup at Syncbase0. Client1 requests to join the
-// syncgroup at Syncbase1. Syncbase1 in turn requests Syncbase0 to join the
-// syncgroup.
-func TestV23SyncbasedJoinSyncgroup(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
-}
-
-// TestV23SyncbasedGetDeltas tests the sending of deltas between two Syncbase
-// instances and their clients. The 1st client creates a syncgroup and puts
-// some database entries in it. The 2nd client joins that syncgroup and reads
-// the database entries. This verifies the end-to-end synchronization of data
-// along the path: client0--Syncbase0--Syncbase1--client1.
-func TestV23SyncbasedGetDeltas(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- // TODO(aghassemi): Resolve permission is currently needed for Watch.
- // See https://github.com/vanadium/issues/issues/1110
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo,c:bar", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
- ok(t, err)
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
-}
-
-// TestV23SyncbasedGetDeltasWithDel tests the sending of deltas between two
-// Syncbase instances and their clients. The 1st client creates a syncgroup and
-// puts some database entries in it. The 2nd client joins that syncgroup and
-// reads the database entries. The 1st client then deletes a portion of this
-// data, and adds new entries. The 2nd client verifies that these changes are
-// correctly synced. This verifies the end-to-end synchronization of data along
-// the path: client0--Syncbase0--Syncbase1--client1 with a workload of puts and
-// deletes.
-func TestV23SyncbasedGetDeltasWithDel(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- // TODO(aghassemi): Resolve permission is currently needed for Watch.
- // See https://github.com/vanadium/issues/issues/1110
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Resolve": {"In":["root:c1"]}, "Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo,c:bar", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- beforeSyncMarker, err := getResumeMarker(client1Ctx, "sync1")
- ok(t, err)
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 10, false, beforeSyncMarker))
-
- beforeDeleteMarker, err := getResumeMarker(client1Ctx, "sync1")
- ok(t, err)
- ok(t, runDeleteData(client0Ctx, "sync0", 0))
- ok(t, runVerifyDeletedData(client0Ctx, "sync0", "foo"))
- ok(t, runVerifyDeletedData(client1Ctx, "sync1", "foo"))
- ok(t, runVerifySyncgroupDataWithWatch(client1Ctx, "sync1", "foo", 5, true, beforeDeleteMarker))
-
- ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "bar", 0, 10, false))
-}
-
-// TestV23SyncbasedCompEval is a comprehensive sniff test for core sync
-// functionality. It tests the exchange of deltas between two Syncbase instances
-// and their clients. The 1st client creates a syncgroup and puts some database
-// entries in it. The 2nd client joins that syncgroup and reads the database
-// entries. The 2nd client then updates a subset of existing keys and adds more
-// keys and the 1st client verifies that it can read these keys. This verifies
-// the end-to-end bi-directional synchronization of data along the path:
-// client0--Syncbase0--Syncbase1--client1. In addition, this test also verifies
-// the bi-directional exchange of syncgroup deltas. After the first phase is
-// done, both Syncbase instances are shutdown and restarted, and new data is
-// synced once again.
-func TestV23SyncbasedCompEval(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- server0RDir := sh.MakeTempDir()
- cleanSync0 := sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- server1RDir := sh.MakeTempDir()
- cleanSync1 := sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- // This is a decoy syncgroup that no other Syncbase joins, but is on the
- // same database as the first syncgroup. Populating it after the first
- // syncgroup causes the database generations to go up, but the joiners
- // on the first syncgroup do not get any data belonging to this
- // syncgroup. This triggers the handling of filtered log records in the
- // restartability code.
- sbName1 := "sync0"
- sgName1 := "SG2"
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName1, sgName1, "c:bar", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
-
- // Shutdown and restart Syncbase instances.
- cleanSync0(os.Interrupt)
- cleanSync1(os.Interrupt)
-
- cleanSync0 = sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
- cleanSync1 = sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- ok(t, runSetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v2", "c:foo", "root:s0", "root:s1", "root:s3"))
- ok(t, runGetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v2", "c:foo", "root:s0", "root:s1", "root:s3"))
-
- ok(t, runUpdateData(client1Ctx, "sync1", 5))
- ok(t, runPopulateData(client1Ctx, "sync1", "foo", 10))
- ok(t, runSetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
-
- ok(t, runVerifyLocalAndRemoteData(client0Ctx, "sync0"))
- ok(t, runGetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
-
- // Shutdown and restart Syncbase instances.
- cleanSync0(os.Interrupt)
- cleanSync1(os.Interrupt)
-
- _ = sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0", RootDir: server0RDir}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
- _ = sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1", RootDir: server1RDir}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- ok(t, runGetSyncgroupSpec(client0Ctx, "sync0", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
- ok(t, runGetSyncgroupSpec(client1Ctx, "sync1", sbName, sgName, "v3", "c:foo", "root:s0", "root:s1", "root:s4"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 20))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 20, 10, true))
-}
-
-// TestV23SyncbasedExchangeDeltasWithAcls tests the exchange of deltas including
-// acls between two Syncbase instances and their clients. The 1st client creates
-// a syncgroup on a collection and puts some database entries in it. The 2nd
-// client joins that syncgroup and reads the database entries. The 2nd client
-// then changes the collection acl to allow access to only itself. The 1st
-// client should be unable to access the keys. The 2nd client then modifies the
-// collection acl to restore access to both clients. The 1st client should
-// regain access.
-func TestV23SyncbasedExchangeDeltasWithAcls(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}, "Admin": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}, "Admin": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- // Note, since collection ACLs are resolved last-one-wins, both collections
- // must be set up before calling runSetCollectionPermissions to ensure the
- // newly set value is the latest.
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runSetupAppA(client1Ctx, "sync1"))
-
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
- ok(t, runSetCollectionPermissions(client0Ctx, "sync0", "root:c0", "root:c1"))
-
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, true))
-
- ok(t, runSetCollectionPermissions(client1Ctx, "sync1", "root:c1"))
- ok(t, runVerifyLostAccess(client0Ctx, "sync0", "foo", 0, 10))
-
- ok(t, runSetCollectionPermissions(client1Ctx, "sync1", "root:c0", "root:c1"))
- ok(t, runVerifySyncgroupData(client0Ctx, "sync0", "foo", 0, 10, false))
-}
-
-// TestV23SyncbasedExchangeDeltasWithConflicts tests the exchange of deltas
-// between two Syncbase instances and their clients in the presence of
-// conflicting updates. The 1st client creates a syncgroup and puts some
-// database entries in it. The 2nd client joins that syncgroup and reads the
-// database entries. Both clients then update a subset of existing keys
-// concurrently, and sync with each other. During sync, the following
-// possibilities arise: (1) Both clients make their local updates first, sync
-// with each other to detect conflicts. Resolution will cause one of the clients
-// to see a new value based on the timestamp. (2) One client's update is synced
-// before the other client has a chance to commit its update. The other client's
-// update will then not be a conflict but a valid update building on the first
-// one's change.
-//
-// Note that the verification done from the client side can have false positives
-// re. the test's success. Since we cannot accurately predict which client's
-// updates win, the test passes if we find either outcome. However, this could
-// also imply that the sync failed, and each client is merely reading its own
-// local value. The verification step mainly verifies that syncbased is still
-// responsive and that the data is not corrupt.
-//
-// TODO(hpucha): We could diff the states of the two clients and ensure they are
-// identical. Optionally we could expose inner state of syncbased via some
-// debug methods.
-func TestV23SyncbasedExchangeDeltasWithConflicts(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
-
- // Run it multiple times to exercise different interactions between sync
- // and local updates that change every run due to timing.
- for i := 0; i < 10; i++ {
- testSyncbasedExchangeDeltasWithConflicts(t)
- }
-}
-
-func testSyncbasedExchangeDeltasWithConflicts(t *testing.T) {
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
-
- go func() { ok(t, runUpdateData(client0Ctx, "sync0", 5)) }()
- d := time.Duration(rand.Int63n(50)) * time.Millisecond
- time.Sleep(d)
- ok(t, runUpdateData(client1Ctx, "sync1", 5))
-
- time.Sleep(10 * time.Second)
-
- ok(t, runVerifyConflictResolution(client0Ctx, "sync0"))
- ok(t, runVerifyConflictResolution(client1Ctx, "sync1"))
-}
-
-// TestV23NestedSyncgroups tests the exchange of deltas between two Syncbase
-// instances and their clients with nested syncgroups. The 1st client creates
-// two syncgroups at prefixes "f" and "foo" and puts some database entries in
-// both of them. The 2nd client first joins the syncgroup with prefix "foo" and
-// verifies that it reads the corresponding database entries. The 2nd client
-// then joins the syncgroup with prefix "f" and verifies that it can read the
-// "f" keys.
-func TestV23NestedSyncgroups(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sb1Name := "sync0"
- sg1Name := "SG1"
- sb2Name := "sync0"
- sg2Name := "SG2"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb1Name, sg1Name, "c:foo", "", "root:s0", "root:s1"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb2Name, sg2Name, "c:f", "", "root:s0", "root:s1"))
- ok(t, runPopulateData(client0Ctx, "sync0", "f", 0))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb1Name, sg1Name))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb2Name, sg2Name))
- ok(t, runVerifyNestedSyncgroupData(client1Ctx, "sync1"))
-}
-
-// TestV23NestedAndPeerSyncgroups tests the exchange of deltas between three
-// Syncbase instances and their clients consisting of nested/peer
-// syncgroups. The 1st client creates two syncgroups: SG1 at prefix "foo" and
-// SG2 at "f" and puts some database entries in both of them. The 2nd client
-// joins the syncgroup SG1 and verifies that it reads the corresponding database
-// entries. Client 2 then creates SG3 at prefix "f". The 3rd client joins the
-// syncgroups SG2 and SG3 and verifies that it can read all the "f" and "foo"
-// keys created by client 1. Client 2 also verifies that it can read all the "f"
-// and "foo" keys created by client 1.
-func TestV23NestedAndPeerSyncgroups(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- server2Creds := sh.ForkCredentials("s2")
- client2Ctx := sh.ForkContext("c2")
- sh.StartSyncbase(server2Creds, syncbaselib.Opts{Name: "sync2"}, `{"Read": {"In":["root:c2"]}, "Write": {"In":["root:c2"]}}`)
-
- sb1Name := "sync0"
- sg1Name := "SG1"
- sb2Name := "sync0"
- sg2Name := "SG2"
- sb3Name := "sync1"
- sg3Name := "SG3"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb1Name, sg1Name, "c:foo", "", "root:s0", "root:s1", "root:s2"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sb2Name, sg2Name, "c:f", "", "root:s0", "root:s2"))
- ok(t, runPopulateData(client0Ctx, "sync0", "f", 0))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sb1Name, sg1Name))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runCreateSyncgroup(client1Ctx, "sync1", sb3Name, sg3Name, "c:f", "", "root:s1", "root:s2"))
-
- ok(t, runSetupAppA(client2Ctx, "sync2"))
- ok(t, runJoinSyncgroup(client2Ctx, "sync2", sb2Name, sg2Name))
- ok(t, runJoinSyncgroup(client2Ctx, "sync2", sb3Name, sg3Name))
- ok(t, runVerifyNestedSyncgroupData(client2Ctx, "sync2"))
-
- ok(t, runVerifyNestedSyncgroupData(client1Ctx, "sync1"))
-}
-
-// TestV23SyncbasedGetDeltasPrePopulate tests the sending of deltas between two
-// Syncbase instances and their clients with data existing before the creation
-// of a syncgroup. The 1st client puts entries in a database then creates a
-// syncgroup over that data. The 2nd client joins that syncgroup and reads the
-// database entries.
-func TestV23SyncbasedGetDeltasPrePopulate(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- // Populate collection data before creating the syncgroup. Also populate
- // with data that is not part of the syncgroup to verify filtering.
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
- ok(t, runPopulateData(client0Ctx, "sync0", "bar", 0))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1"))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runVerifyNonSyncgroupData(client1Ctx, "sync1", "bar"))
-}
-
-// TestV23SyncbasedGetDeltasMultiApp tests the sending of deltas between two
-// Syncbase instances and their clients across multiple databases and
-// collections. The 1st client puts entries in multiple collections across
-// multiple databases then creates multiple syncgroups (one per database) over
-// that data. The 2nd client joins these syncgroups and reads all the data.
-func TestV23SyncbasedGetDeltasMultiApp(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- na, nd, nt := 2, 2, 2 // number of apps, dbs, collections
-
- ok(t, runSetupAppMulti(client0Ctx, "sync0", na, nd, nt))
- ok(t, runPopulateSyncgroupMulti(client0Ctx, "sync0", na, nd, nt, "foo", "bar"))
-
- ok(t, runSetupAppMulti(client1Ctx, "sync1", na, nd, nt))
- ok(t, runJoinSyncgroupMulti(client1Ctx, "sync1", "sync0", na, nd))
- ok(t, runVerifySyncgroupDataMulti(client1Ctx, "sync1", na, nd, nt, "foo", "bar"))
-}
-
-// TestV23SyncgroupSync tests the syncing of syncgroup metadata. The 1st client
-// creates the syncgroup SG1, and clients 2 and 3 join this syncgroup. All three
-// clients must learn of the remaining two. Note that client 2 relies on
-// syncgroup metadata syncing to learn of client 3 .
-func TestV23SyncgroupSync(t *testing.T) {
- v23test.SkipUnlessRunningIntegrationTests(t)
- sh := v23test.NewShell(t, nil)
- defer sh.Cleanup()
- sh.StartRootMountTable()
-
- server0Creds := sh.ForkCredentials("s0")
- client0Ctx := sh.ForkContext("c0")
- sh.StartSyncbase(server0Creds, syncbaselib.Opts{Name: "sync0"}, `{"Read": {"In":["root:c0"]}, "Write": {"In":["root:c0"]}}`)
-
- server1Creds := sh.ForkCredentials("s1")
- client1Ctx := sh.ForkContext("c1")
- sh.StartSyncbase(server1Creds, syncbaselib.Opts{Name: "sync1"}, `{"Read": {"In":["root:c1"]}, "Write": {"In":["root:c1"]}}`)
-
- server2Creds := sh.ForkCredentials("s2")
- client2Ctx := sh.ForkContext("c2")
- sh.StartSyncbase(server2Creds, syncbaselib.Opts{Name: "sync2"}, `{"Read": {"In":["root:c2"]}, "Write": {"In":["root:c2"]}}`)
-
- sbName := "sync0"
- sgName := "SG1"
-
- ok(t, runSetupAppA(client0Ctx, "sync0"))
- ok(t, runCreateSyncgroup(client0Ctx, "sync0", sbName, sgName, "c:foo", "", "root:s0", "root:s1", "root:s2"))
- ok(t, runPopulateData(client0Ctx, "sync0", "foo", 0))
-
- ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 1))
-
- ok(t, runSetupAppA(client1Ctx, "sync1"))
- ok(t, runJoinSyncgroup(client1Ctx, "sync1", sbName, sgName))
-
- ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 2))
- ok(t, runGetSyncgroupMembers(client1Ctx, "sync1", sbName, sgName, 2))
-
- ok(t, runSetupAppA(client2Ctx, "sync2"))
- ok(t, runJoinSyncgroup(client2Ctx, "sync2", sbName, sgName))
-
- ok(t, runGetSyncgroupMembers(client0Ctx, "sync0", sbName, sgName, 3))
- ok(t, runGetSyncgroupMembers(client1Ctx, "sync1", sbName, sgName, 3))
- ok(t, runGetSyncgroupMembers(client2Ctx, "sync2", sbName, sgName, 3))
-
- ok(t, runVerifySyncgroupData(client1Ctx, "sync1", "foo", 0, 10, false))
- ok(t, runVerifySyncgroupData(client2Ctx, "sync2", "foo", 0, 10, false))
-}
-
-////////////////////////////////////
-// Helpers.
-
-// toSgPrefixes converts, for example, "a:b,c:" to
-// [{Collection: {"u", "a"}, Row: "b"}, {Collection: {"u", "c"}, Row: ""}].
-func toSgPrefixes(csv string) []wire.CollectionRow {
- strs := strings.Split(csv, ",")
- res := make([]wire.CollectionRow, len(strs))
- for i, v := range strs {
- parts := strings.SplitN(v, ":", 2)
- if len(parts) != 2 {
- panic(fmt.Sprintf("invalid prefix string: %q", v))
- }
- res[i] = wire.CollectionRow{CollectionId: wire.Id{"u", parts[0]}, Row: parts[1]}
- }
- return res
-}
-
-// TODO(hpucha): Look into refactoring scan logic out of the helpers, and
-// avoiding gets when we can scan.
-
-func runSetupAppA(ctx *context.T, serviceName string) error {
- d := dbHandle(serviceName)
- if err := d.Create(ctx, nil); err != nil {
- return err
- }
- c := d.CollectionForId(testCollection)
- if err := c.Create(ctx, nil); err != nil {
- return err
- }
-
- return nil
-}
-
-func runCreateSyncgroup(ctx *context.T, serviceName, sbName, sgName, sgPrefixes, mtName string, sgBlessings ...string) error {
- d := dbHandle(serviceName)
-
- mtNames := v23.GetNamespace(ctx).Roots()
- if mtName != "" {
- mtNames = []string{mtName}
- }
-
- spec := wire.SyncgroupSpec{
- Description: "test syncgroup sg",
- Perms: perms(sgBlessings...),
- Prefixes: toSgPrefixes(sgPrefixes),
- MountTables: mtNames,
- }
-
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
- info := wire.SyncgroupMemberInfo{SyncPriority: 8}
- if err := sg.Create(ctx, spec, info); err != nil {
- return fmt.Errorf("Create SG %v failed: %v\n", sgName, err)
- }
- return nil
-}
-
-func runJoinSyncgroup(ctx *context.T, sbNameLocal, sbNameRemote, sgName string) error {
- d := dbHandle(sbNameLocal)
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
- info := wire.SyncgroupMemberInfo{SyncPriority: 10}
- if _, err := sg.Join(ctx, sbNameRemote, nil, info); err != nil {
- return fmt.Errorf("Join SG %q, %q failed: %v\n", sbNameRemote, sgName, err)
- }
- return nil
-}
-
-func runSetSyncgroupSpec(ctx *context.T, serviceName, sbName, sgName, sgDesc, sgPrefixes string, sgBlessings ...string) error {
- d := dbHandle(serviceName)
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
-
- mtNames := v23.GetNamespace(ctx).Roots()
- spec := wire.SyncgroupSpec{
- Description: sgDesc,
- Prefixes: toSgPrefixes(sgPrefixes),
- Perms: perms(sgBlessings...),
- MountTables: mtNames,
- }
-
- if err := sg.SetSpec(ctx, spec, ""); err != nil {
- return fmt.Errorf("SetSpec SG %q failed: %v\n", sgName, err)
- }
- return nil
-}
-
-func runGetSyncgroupSpec(ctx *context.T, serviceName, sbName, sgName, wantDesc, wantPrefixes string, wantBlessings ...string) error {
- d := dbHandle(serviceName)
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
-
- wantPfxs := toSgPrefixes(wantPrefixes)
- wantPerms := perms(wantBlessings...)
-
- var spec wire.SyncgroupSpec
- var err error
- for i := 0; i < 8; i++ {
- time.Sleep(500 * time.Millisecond)
- spec, _, err = sg.GetSpec(ctx)
- if err != nil {
- return fmt.Errorf("GetSpec SG %q failed: %v\n", sgName, err)
- }
- if spec.Description == wantDesc {
- break
- }
- }
- if spec.Description != wantDesc || !reflect.DeepEqual(spec.Prefixes, wantPfxs) || !reflect.DeepEqual(spec.Perms, wantPerms) {
- return fmt.Errorf("GetSpec SG %q failed: description got %v, want %v, prefixes got %v, want %v, perms got %v, want %v\n", sgName, spec.Description, wantDesc, spec.Prefixes, wantPfxs, spec.Perms, wantPerms)
- }
- return nil
-}
-
-func runGetSyncgroupMembers(ctx *context.T, serviceName, sbName, sgName string, wantMembers uint64) error {
- d := dbHandle(serviceName)
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
-
- var gotMembers uint64
- var members map[string]wire.SyncgroupMemberInfo
-
- for i := 0; i < 4; i++ {
- time.Sleep(500 * time.Millisecond)
- members, err := sg.GetMembers(ctx)
- if err != nil {
- return fmt.Errorf("GetMembers SG %q failed: %v\n", sgName, err)
- }
- gotMembers = uint64(len(members))
- if wantMembers == gotMembers {
- return nil
- }
- }
- return fmt.Errorf("GetMembers SG %q failed: members got %+v, want %v\n", sgName, members, wantMembers)
-}
-
-func runPopulateData(ctx *context.T, serviceName, keyPrefix string, start uint64) error {
- d := dbHandle(serviceName)
-
- // Do Puts.
- c := d.CollectionForId(testCollection)
-
- for i := start; i < start+10; i++ {
- key := fmt.Sprintf("%s%d", keyPrefix, i)
- r := c.Row(key)
- if err := r.Put(ctx, "testkey"+key); err != nil {
- return fmt.Errorf("r.Put() failed: %v\n", err)
- }
- }
- return nil
-}
-
-func runUpdateData(ctx *context.T, serviceName string, start uint64) error {
- d := dbHandle(serviceName)
-
- // Do Puts.
- c := d.CollectionForId(testCollection)
-
- for i := start; i < start+5; i++ {
- key := fmt.Sprintf("foo%d", i)
- r := c.Row(key)
- if err := r.Put(ctx, "testkey"+serviceName+key); err != nil {
- return fmt.Errorf("r.Put() failed: %v\n", err)
- }
- }
-
- return nil
-}
-
-func runDeleteData(ctx *context.T, serviceName string, start uint64) error {
- d := dbHandle(serviceName)
-
- // Do Puts.
- c := d.CollectionForId(testCollection)
-
- for i := start; i < start+5; i++ {
- key := fmt.Sprintf("foo%d", i)
- r := c.Row(key)
- if err := r.Delete(ctx); err != nil {
- return fmt.Errorf("r.Delete() failed: %v\n", err)
- }
- }
-
- return nil
-}
-
-func runSetCollectionPermissions(ctx *context.T, serviceName string, aclBlessings ...string) error {
- d := dbHandle(serviceName)
-
- // Set acl.
- c := d.CollectionForId(testCollection)
-
- if err := c.SetPermissions(ctx, perms(aclBlessings...)); err != nil {
- return fmt.Errorf("c.SetPermissions() failed: %v\n", err)
- }
-
- return nil
-}
-
-func runVerifySyncgroupData(ctx *context.T, serviceName, keyPrefix string, start, count uint64, skipScan bool) error {
- d := dbHandle(serviceName)
-
- // Wait for a bit (up to 4 sec) until the last key appears.
- c := d.CollectionForId(testCollection)
-
- lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
-
- r := c.Row(lastKey)
- for i := 0; i < 8; i++ {
- time.Sleep(500 * time.Millisecond)
- var value string
- if err := r.Get(ctx, &value); err == nil {
- break
- }
- }
-
- // Verify that all keys and values made it correctly.
- for i := start; i < start+count; i++ {
- key := fmt.Sprintf("%s%d", keyPrefix, i)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); err != nil {
- return fmt.Errorf("r.Get() failed: %v\n", err)
- }
- want := "testkey" + key
- if got != want {
- return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
- }
- }
-
- if !skipScan {
- // Re-verify using a scan operation.
- stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
- for i := 0; stream.Advance(); i++ {
- want := fmt.Sprintf("%s%d", keyPrefix, i)
- got := stream.Key()
- if got != want {
- return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
- }
- want = "testkey" + want
- if err := stream.Value(&got); err != nil {
- return fmt.Errorf("cannot fetch value in scan: %v\n", err)
- }
- if got != want {
- return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
- }
- }
-
- if err := stream.Err(); err != nil {
- return fmt.Errorf("scan stream error: %v\n", err)
- }
- }
- return nil
-}
-
-func runVerifySyncgroupDataWithWatch(ctx *context.T, serviceName, keyPrefix string, count int, expectDelete bool, beforeSyncMarker watch.ResumeMarker) error {
- if count == 0 {
- return fmt.Errorf("count cannot be 0: got %d", count)
- }
- d := dbHandle(serviceName)
- ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
-
- stream, err := d.Watch(ctxWithTimeout, testCollection, keyPrefix, beforeSyncMarker)
- if err != nil {
- return fmt.Errorf("watch error: %v\n", err)
- }
-
- var changes []syncbase.WatchChange
- for i := 0; stream.Advance() && i < count; i++ {
- if err := stream.Err(); err != nil {
- return fmt.Errorf("watch stream error: %v\n", err)
- }
- changes = append(changes, stream.Change())
- }
-
- sort.Sort(ByRow(changes))
-
- if got, want := len(changes), count; got != want {
- return fmt.Errorf("unexpected number of changes: got %d, want %d", got, want)
- }
-
- for i, change := range changes {
- if got, want := change.Collection, testCollection; got != want {
- return fmt.Errorf("unexpected watch collection: got %v, want %v", got, want)
- }
- if got, want := change.Row, fmt.Sprintf("%s%d", keyPrefix, i); got != want {
- return fmt.Errorf("unexpected watch row: got %q, want %q", got, want)
- }
- if got, want := change.FromSync, true; got != want {
- return fmt.Errorf("unexpected FromSync value: got %t, want %t", got, want)
- }
- if expectDelete {
- if got, want := change.ChangeType, syncbase.DeleteChange; got != want {
- return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
- }
- return nil
- }
- var result string
- if got, want := change.ChangeType, syncbase.PutChange; got != want {
- return fmt.Errorf("unexpected watch change type: got %q, want %q", got, want)
- }
- if err := change.Value(&result); err != nil {
- return fmt.Errorf("couldn't decode watch value: %v", err)
- }
- if got, want := result, fmt.Sprintf("testkey%s%d", keyPrefix, i); got != want {
- return fmt.Errorf("unexpected watch value: got %q, want %q", got, want)
- }
- }
- return nil
-}
-
-func runVerifyDeletedData(ctx *context.T, serviceName, keyPrefix string) error {
- d := dbHandle(serviceName)
-
- // Wait for a bit for deletions to propagate.
- c := d.CollectionForId(testCollection)
-
- r := c.Row("foo4")
- for i := 0; i < 8; i++ {
- time.Sleep(500 * time.Millisecond)
- var value string
- if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoExist.ID {
- break
- }
- }
-
- // Verify using a scan operation.
- stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
- count := 0
- for i := 5; stream.Advance(); i++ {
- want := fmt.Sprintf("%s%d", keyPrefix, i)
- got := stream.Key()
- if got != want {
- return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
- }
- want = "testkey" + want
- if err := stream.Value(&got); err != nil {
- return fmt.Errorf("cannot fetch value in scan: %v\n", err)
- }
- if got != want {
- return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
- }
- count++
- }
-
- if err := stream.Err(); err != nil {
- return fmt.Errorf("scan stream error: %v\n", err)
- }
-
- if count != 5 {
- return fmt.Errorf("scan stream count error: %v\n", count)
- }
-
- return nil
-}
-
-func runVerifyConflictResolution(ctx *context.T, serviceName string) error {
- d := dbHandle(serviceName)
- c := d.CollectionForId(testCollection)
-
- wantData := []struct {
- start uint64
- count uint64
- valPfx []string
- }{
- {0, 5, []string{"testkey"}},
- {5, 5, []string{"testkeysync0", "testkeysync1"}},
- }
-
- // Verify that all keys and values made it correctly.
- for _, d := range wantData {
- for i := d.start; i < d.start+d.count; i++ {
- key := fmt.Sprintf("foo%d", i)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); err != nil {
- return fmt.Errorf("r.Get() failed: %v\n", err)
- }
- match := 0
- for _, p := range d.valPfx {
- want := p + key
- if got == want {
- match++
- }
- }
- if match != 1 {
- return fmt.Errorf("unexpected value: got %q, match %v, want %v\n", got, match, d.valPfx)
- }
- }
- }
- return nil
-}
-
-func runVerifyNonSyncgroupData(ctx *context.T, serviceName, keyPrefix string) error {
- d := dbHandle(serviceName)
- c := d.CollectionForId(testCollection)
-
- // Verify through a scan that none of that data exists.
- count := 0
- stream := c.Scan(ctx, syncbase.Prefix(keyPrefix))
- for stream.Advance() {
- count++
- }
-
- if err := stream.Err(); err != nil {
- return fmt.Errorf("scan stream error: %v\n", err)
- }
- if count > 0 {
- return fmt.Errorf("found %d entries in %q prefix that should not be there\n", count, keyPrefix)
- }
-
- return nil
-}
-
-func runVerifyLocalAndRemoteData(ctx *context.T, serviceName string) error {
- d := dbHandle(serviceName)
- c := d.CollectionForId(testCollection)
-
- // Wait for a bit (up to 4 sec) until the last key appears.
- r := c.Row("foo19")
- for i := 0; i < 8; i++ {
- time.Sleep(500 * time.Millisecond)
- var value string
- if err := r.Get(ctx, &value); err == nil {
- break
- }
- }
-
- wantData := []struct {
- start uint64
- count uint64
- valPfx string
- }{
- {0, 5, "testkey"},
- {5, 5, "testkeysync1"},
- {10, 10, "testkey"},
- }
-
- // Verify that all keys and values made it correctly.
- for _, d := range wantData {
- for i := d.start; i < d.start+d.count; i++ {
- key := fmt.Sprintf("foo%d", i)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); err != nil {
- return fmt.Errorf("r.Get() failed: %v\n", err)
- }
- want := d.valPfx + key
- if got != want {
- return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
- }
- }
- }
- return nil
-}
-
-func runVerifyLostAccess(ctx *context.T, serviceName, keyPrefix string, start, count uint64) error {
- d := dbHandle(serviceName)
-
- // Wait for a bit (up to 4 sec) until the last key disappears.
- c := d.CollectionForId(testCollection)
-
- lastKey := fmt.Sprintf("%s%d", keyPrefix, start+count-1)
-
- r := c.Row(lastKey)
- for i := 0; i < 8; i++ {
- time.Sleep(500 * time.Millisecond)
- var value string
- if err := r.Get(ctx, &value); verror.ErrorID(err) == verror.ErrNoAccess.ID {
- break
- }
- }
-
- // Verify that all keys and values have lost access.
- for i := start; i < start+count; i++ {
- key := fmt.Sprintf("%s%d", keyPrefix, i)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); verror.ErrorID(err) != verror.ErrNoAccess.ID {
- return fmt.Errorf("r.Get() didn't fail: %v\n", err)
- }
- }
-
- return nil
-}
-
-func runVerifyNestedSyncgroupData(ctx *context.T, serviceName string) error {
- d := dbHandle(serviceName)
-
- // Wait for a bit (up to 8 sec) until the last key appears. This chosen
- // time interval is dependent on how fast the membership view is
- // refreshed (currently 2 seconds) and how frequently we sync with peers
- // (every 50 ms), and then adding a substantial safeguard to it to
- // ensure that the test is not flaky even in somewhat abnormal
- // conditions. Note that we wait longer than the 2 node tests since more
- // nodes implies more pair-wise communication before achieving steady
- // state.
- c := d.CollectionForId(testCollection)
-
- r := c.Row("f9")
- for i := 0; i < 8; i++ {
- time.Sleep(1 * time.Second)
- var value string
- if err := r.Get(ctx, &value); err == nil {
- break
- }
- }
-
- // Verify that all keys and values made it correctly.
- pfxs := []string{"foo", "f"}
- for _, p := range pfxs {
- for i := 0; i < 10; i++ {
- key := fmt.Sprintf("%s%d", p, i)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); err != nil {
- return fmt.Errorf("r.Get() failed: %v\n", err)
- }
- want := "testkey" + key
- if got != want {
- return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
- }
- }
- }
- return nil
-}
-
-func runSetupAppMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int) error {
- svc := syncbase.NewService(serviceName)
-
- for i := 0; i < numApps; i++ {
- appName := fmt.Sprintf("a%d", i)
-
- for j := 0; j < numDbs; j++ {
- dbName := fmt.Sprintf("d%d", j)
- d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
- d.Create(ctx, nil)
-
- for k := 0; k < numCxs; k++ {
- cName := fmt.Sprintf("c%d", k)
- d.CollectionForId(wire.Id{"u", cName}).Create(ctx, nil)
- }
- }
- }
-
- return nil
-}
-
-func runPopulateSyncgroupMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int, prefixes ...string) error {
- mtNames := v23.GetNamespace(ctx).Roots()
-
- svc := syncbase.NewService(serviceName)
-
- for i := 0; i < numApps; i++ {
- appName := fmt.Sprintf("a%d", i)
-
- for j := 0; j < numDbs; j++ {
- dbName := fmt.Sprintf("d%d", j)
- d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
-
- // For each collection, pre-populate entries on each prefix.
- // Also determine the syncgroup prefixes.
- var sgPrefixes []string
- for k := 0; k < numCxs; k++ {
- cName := fmt.Sprintf("c%d", k)
- c := d.CollectionForId(wire.Id{"u", cName})
-
- for _, pfx := range prefixes {
- p := fmt.Sprintf("%s:%s", cName, pfx)
- sgPrefixes = append(sgPrefixes, p)
-
- for n := 0; n < 10; n++ {
- key := fmt.Sprintf("%s%d", pfx, n)
- r := c.Row(key)
- if err := r.Put(ctx, "testkey"+key); err != nil {
- return fmt.Errorf("r.Put() failed: %v\n", err)
- }
- }
- }
- }
-
- // Create one syncgroup per database across all collections
- // and prefixes.
- sgName := fmt.Sprintf("%s_%s", appName, dbName)
- spec := wire.SyncgroupSpec{
- Description: fmt.Sprintf("test sg %s/%s", appName, dbName),
- Perms: perms("root:s0", "root:s1"),
- Prefixes: toSgPrefixes(strings.Join(sgPrefixes, ",")),
- MountTables: mtNames,
- }
-
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
- info := wire.SyncgroupMemberInfo{SyncPriority: 8}
- if err := sg.Create(ctx, spec, info); err != nil {
- return fmt.Errorf("Create SG %q failed: %v\n", sgName, err)
- }
- }
- }
-
- return nil
-}
-
-func runJoinSyncgroupMulti(ctx *context.T, sbNameLocal, sbNameRemote string, numApps, numDbs int) error {
- svc := syncbase.NewService(sbNameLocal)
-
- for i := 0; i < numApps; i++ {
- appName := fmt.Sprintf("a%d", i)
-
- for j := 0; j < numDbs; j++ {
- dbName := fmt.Sprintf("d%d", j)
- d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
-
- sgName := fmt.Sprintf("%s_%s", appName, dbName)
- sg := d.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
- info := wire.SyncgroupMemberInfo{SyncPriority: 10}
- if _, err := sg.Join(ctx, sbNameRemote, nil, info); err != nil {
- return fmt.Errorf("Join SG Multi %q failed: %v\n", sgName, err)
- }
- }
- }
-
- return nil
-}
-
-func runVerifySyncgroupDataMulti(ctx *context.T, serviceName string, numApps, numDbs, numCxs int, prefixes ...string) error {
- svc := syncbase.NewService(serviceName)
-
- time.Sleep(20 * time.Second)
-
- for i := 0; i < numApps; i++ {
- appName := fmt.Sprintf("a%d", i)
-
- for j := 0; j < numDbs; j++ {
- dbName := fmt.Sprintf("d%d", j)
- d := svc.DatabaseForId(wire.Id{appName, dbName}, nil)
-
- for k := 0; k < numCxs; k++ {
- cName := fmt.Sprintf("c%d", k)
- c := d.CollectionForId(wire.Id{"u", cName})
-
- for _, pfx := range prefixes {
- for n := 0; n < 10; n++ {
- key := fmt.Sprintf("%s%d", pfx, n)
- r := c.Row(key)
- var got string
- if err := r.Get(ctx, &got); err != nil {
- return fmt.Errorf("r.Get() failed: %v\n", err)
- }
- want := "testkey" + key
- if got != want {
- return fmt.Errorf("unexpected value: got %q, want %q\n",
- got, want)
- }
- }
- }
- }
- }
- }
-
- return nil
-}
-
-func getResumeMarker(ctx *context.T, serviceName string) (watch.ResumeMarker, error) {
- d := dbHandle(serviceName)
- return d.GetResumeMarker(ctx)
-}
-
-func ok(t *testing.T, err error) {
- if err != nil {
- debug.PrintStack()
- t.Fatal(err)
- }
-}
-
-func TestMain(m *testing.M) {
- v23test.TestMain(m)
-}
-
-// ByRow implements sort.Interface for []syncbase.WatchChange based on the Row field.
-type ByRow []syncbase.WatchChange
-
-func (c ByRow) Len() int { return len(c) }
-func (c ByRow) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
-func (c ByRow) Less(i, j int) bool { return c[i].Row < c[j].Row }