syncbase/vsync: Implementation for SyncGroup related getters and setspec.
Change-Id: I54697c9dd66a4ecd2a19507fbafc4dd7c9cc08f5
diff --git a/v23/syncbase/nosql/syncgroup_test.go b/v23/syncbase/nosql/syncgroup_test.go
index 73e0550..9705c3a 100644
--- a/v23/syncbase/nosql/syncgroup_test.go
+++ b/v23/syncbase/nosql/syncgroup_test.go
@@ -5,6 +5,7 @@
package nosql_test
import (
+ "reflect"
"testing"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -32,6 +33,9 @@
createSyncGroup(t, ctx, d, sg1, spec, verror.ErrBadArg.ID)
+ var wantNames []string
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+
// Prefill entries before creating a SyncGroup to exercise the bootstrap
// of a SyncGroup through Snapshot operations to the watcher.
t1 := tu.CreateTable(t, ctx, d, "t1")
@@ -50,20 +54,34 @@
}
createSyncGroup(t, ctx, d, sg1, spec, verror.ID(""))
+ // Verify SyncGroup is created.
+ wantNames = []string{sg1}
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+ verifySyncGroupInfo(t, ctx, d, sg1, spec, 1)
+
// Check if creating an already existing syncgroup fails.
createSyncGroup(t, ctx, d, sg1, spec, verror.ErrExist.ID)
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
// Create a peer syncgroup.
spec.Description = "test syncgroup sg2"
sg2 := naming.Join(sName, util.SyncbaseSuffix, "sg2")
createSyncGroup(t, ctx, d, sg2, spec, verror.ID(""))
+ wantNames = []string{sg1, sg2}
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+ verifySyncGroupInfo(t, ctx, d, sg2, spec, 1)
+
// Create a nested syncgroup.
spec.Description = "test syncgroup sg3"
spec.Prefixes = []string{"t1:foobar"}
sg3 := naming.Join(sName, util.SyncbaseSuffix, "sg3")
createSyncGroup(t, ctx, d, sg3, spec, verror.ID(""))
+ wantNames = []string{sg1, sg2, sg3}
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+ verifySyncGroupInfo(t, ctx, d, sg3, spec, 1)
+
// Check that create fails if the perms disallow access.
perms := perms("root/client")
perms.Blacklist("root/client", string(access.Read))
@@ -73,13 +91,12 @@
spec.Description = "test syncgroup sg4"
sg4 := naming.Join(sName, util.SyncbaseSuffix, "sg4")
createSyncGroup(t, ctx, d, sg4, spec, verror.ErrNoAccess.ID)
+ verifySyncGroupNames(t, ctx, d, nil, verror.ErrNoAccess.ID)
}
// Tests that SyncGroup.Join works as expected for the case with one Syncbase
// and 2 clients. One client creates the SyncGroup, while the other attempts to
// join it.
-//
-// TODO(hpucha): Add more integration-style testing with 2 syncbases.
func TestJoinSyncGroup(t *testing.T) {
// Create client1-server pair.
ctx, ctx1, sName, rootp, cleanup := tu.SetupOrDieCustom("client1", "server", perms("root/client1"))
@@ -106,6 +123,8 @@
// Check that client2's join fails if the perms disallow access.
joinSyncGroup(t, ctx2, d2, sgNameA, verror.ErrNoAccess.ID)
+ verifySyncGroupNames(t, ctx2, d2, nil, verror.ErrNoAccess.ID)
+
// Client1 gives access to client2.
if err := d1.SetPermissions(ctx1, perms("root/client1", "root/client2"), ""); err != nil {
t.Fatalf("d.SetPermissions() failed: %v", err)
@@ -130,6 +149,46 @@
// Check that client2's join now succeeds.
joinSyncGroup(t, ctx2, d2, sgNameB, verror.ID(""))
+
+ // Verify SyncGroup state.
+ wantNames := []string{sgNameA, sgNameB}
+ verifySyncGroupNames(t, ctx1, d1, wantNames, verror.ID(""))
+ verifySyncGroupNames(t, ctx2, d2, wantNames, verror.ID(""))
+ verifySyncGroupInfo(t, ctx1, d1, sgNameA, specA, 1)
+ verifySyncGroupInfo(t, ctx1, d1, sgNameB, specB, 1)
+ verifySyncGroupInfo(t, ctx2, d2, sgNameB, specB, 1)
+}
+
+// Tests that SyncGroup.SetSpec works as expected.
+func TestSetSpecSyncGroup(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(perms("root/client"))
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+ // Create successfully.
+ sgName := naming.Join(sName, util.SyncbaseSuffix, "sg1")
+ spec := wire.SyncGroupSpec{
+ Description: "test syncgroup sg1",
+ Perms: nil,
+ Prefixes: []string{"t1:foo"},
+ }
+ createSyncGroup(t, ctx, d, sgName, spec, verror.ID(""))
+
+ // Verify SyncGroup is created.
+ wantNames := []string{sgName}
+ verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+ verifySyncGroupInfo(t, ctx, d, sgName, spec, 1)
+
+ spec.Prefixes = []string{"t1:foo", "t2:bar"}
+ spec.Description = "test syncgroup sg1 update"
+ spec.Perms = perms("root/client1")
+
+ sg := d.SyncGroup(sgName)
+ if err := sg.SetSpec(ctx, spec, ""); err != nil {
+ t.Fatalf("sg.SetSpec failed: %v", err)
+ }
+ verifySyncGroupInfo(t, ctx, d, sgName, spec, 1)
}
///////////////////
@@ -144,15 +203,35 @@
return sg
}
-func joinSyncGroup(t *testing.T, ctx *context.T, d nosql.Database, sgName string, errID verror.ID) nosql.SyncGroup {
+func joinSyncGroup(t *testing.T, ctx *context.T, d nosql.Database, sgName string, wantErr verror.ID) nosql.SyncGroup {
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{10}
- if _, err := sg.Join(ctx, info); verror.ErrorID(err) != errID {
+ if _, err := sg.Join(ctx, info); verror.ErrorID(err) != wantErr {
tu.Fatalf(t, "Join SG %v failed: %v", sgName, err)
}
return sg
}
+func verifySyncGroupNames(t *testing.T, ctx *context.T, d nosql.Database, wantNames []string, wantErr verror.ID) {
+ gotNames, gotErr := d.GetSyncGroupNames(ctx)
+ if verror.ErrorID(gotErr) != wantErr || !reflect.DeepEqual(gotNames, wantNames) {
+ t.Fatalf("d.GetSyncGroupNames() failed, got %v, want %v, err %v", gotNames, wantNames, gotErr)
+ }
+}
+
+func verifySyncGroupInfo(t *testing.T, ctx *context.T, d nosql.Database, sgName string, wantSpec wire.SyncGroupSpec, wantMembers int) {
+ sg := d.SyncGroup(sgName)
+ gotSpec, _, err := sg.GetSpec(ctx)
+ if err != nil || !reflect.DeepEqual(gotSpec, wantSpec) {
+ t.Fatalf("sg.GetSpec() failed, got %v, want %v, err %v", gotSpec, wantSpec, err)
+ }
+
+ members, err := sg.GetMembers(ctx)
+ if err != nil || len(members) != wantMembers {
+ t.Fatalf("sg.GetMembers() failed, got %v, want %v, err %v", members, wantMembers, err)
+ }
+}
+
// TODO(sadovsky): This appears to be identical to tu.DefaultPerms(). We should
// just use that.
func perms(bps ...string) access.Permissions {
diff --git a/x/ref/services/syncbase/server/interfaces/database.go b/x/ref/services/syncbase/server/interfaces/database.go
index 32cca17..02e232e 100644
--- a/x/ref/services/syncbase/server/interfaces/database.go
+++ b/x/ref/services/syncbase/server/interfaces/database.go
@@ -26,7 +26,7 @@
// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
// the database perms.
// Designed for use from within App.DeleteNoSQLDatabase.
- CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error
+ CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error
// SetPermsInternal updates the database perms.
// Designed for use from within App.SetDatabasePerms.
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 7d900e7..2943a52 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -388,7 +388,7 @@
return d.a
}
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
diff --git a/x/ref/services/syncbase/server/nosql/database_sgm.go b/x/ref/services/syncbase/server/nosql/database_sgm.go
index 10ad323..cc1a73d 100644
--- a/x/ref/services/syncbase/server/nosql/database_sgm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sgm.go
@@ -19,7 +19,8 @@
if d.batchId != nil {
return nil, wire.NewErrBoundToBatch(ctx)
}
- return nil, verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupNames(ctx, call)
}
func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
@@ -63,19 +64,22 @@
if d.batchId != nil {
return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
}
- return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupSpec(ctx, call, sgName)
}
func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
if d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
- return verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.SetSyncGroupSpec(ctx, call, sgName, spec, version)
}
func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
if d.batchId != nil {
return nil, wire.NewErrBoundToBatch(ctx)
}
- return nil, verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupMembers(ctx, call, sgName)
}
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index a06da74..1b158f6 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
import (
"fmt"
+ "strings"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -246,7 +247,7 @@
// make forEachSyncGroup() stop the iteration earlier; otherwise the function
// loops across all SyncGroups in the Database.
func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+ scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
stream := st.Scan(scanStart, scanLimit)
for stream.Advance() {
var sg interfaces.SyncGroup
@@ -286,10 +287,13 @@
// relationships.
// Use the functions above to manipulate SyncGroups.
-// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
-func sgDataKeyScanPrefix() string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-}
+var (
+ // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
+ sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+
+ // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
+ sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+)
// sgDataKey returns the key used to access the SyncGroup data entry.
func sgDataKey(gid interfaces.GroupId) string {
@@ -301,6 +305,17 @@
return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
}
+// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
+func splitSgNameKey(ctx *context.T, key string) (string, error) {
+ prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
+
+ // Note that the actual SyncGroup name may contain ":" as a separator.
+ if !strings.HasPrefix(key, prefix) {
+ return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
+ }
+ return strings.TrimPrefix(key, prefix), nil
+}
+
// hasSGDataEntry returns true if the SyncGroup data entry exists.
func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
@@ -518,6 +533,115 @@
return sg.Spec, nil
}
+func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ var sgNames []string
+
+ vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
+ defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return nil, err
+ }
+
+ // Scan all the SyncGroup names found in the Database.
+ scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
+ stream := sn.Scan(scanStart, scanLimit)
+ var key []byte
+ for stream.Advance() {
+ sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
+ if err != nil {
+ return nil, err
+ }
+ sgNames = append(sgNames, sgName)
+ }
+
+ if err := stream.Err(); err != nil {
+ return nil, err
+ }
+
+ return sgNames, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+ var spec wire.SyncGroupSpec
+
+ vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return spec, "", err
+ }
+
+ // Get the SyncGroup information.
+ sg, err := getSyncGroupByName(ctx, sn, sgName)
+ if err != nil {
+ return spec, "", err
+ }
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ spec = sg.Spec
+ return spec, sg.SpecVersion, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+ var members map[string]wire.SyncGroupMemberInfo
+
+ vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return members, err
+ }
+
+ // Get the SyncGroup information.
+ sg, err := getSyncGroupByName(ctx, sn, sgName)
+ if err != nil {
+ return members, err
+ }
+
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ members = sg.Joiners
+ return members, nil
+}
+
+// TODO(hpucha): Enable syncing syncgroup metadata.
+func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+ vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
+ defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+
+ err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+ return err
+ }
+
+ gid, err := getSyncGroupId(ctx, tx, sgName)
+ if err != nil {
+ return err
+ }
+ sg, err := getSyncGroupById(ctx, tx, gid)
+
+ // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+
+ sg.Spec = spec
+ return setSGDataEntry(ctx, tx, gid, sg)
+ })
+ return err
+}
+
//////////////////////////////
// Helper functions
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index b145887..d98d35a 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -106,7 +106,7 @@
return d.bst
}
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return verror.NewErrNotImplemented(ctx)
}