syncbase/vsync: Implementation for SyncGroup related getters and setspec.
Change-Id: I54697c9dd66a4ecd2a19507fbafc4dd7c9cc08f5
diff --git a/services/syncbase/server/interfaces/database.go b/services/syncbase/server/interfaces/database.go
index 32cca17..02e232e 100644
--- a/services/syncbase/server/interfaces/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -26,7 +26,7 @@
// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
// the database perms.
// Designed for use from within App.DeleteNoSQLDatabase.
- CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error
+ CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error
// SetPermsInternal updates the database perms.
// Designed for use from within App.SetDatabasePerms.
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 7d900e7..2943a52 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -388,7 +388,7 @@
return d.a
}
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index 10ad323..cc1a73d 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -19,7 +19,8 @@
if d.batchId != nil {
return nil, wire.NewErrBoundToBatch(ctx)
}
- return nil, verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupNames(ctx, call)
}
func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
@@ -63,19 +64,22 @@
if d.batchId != nil {
return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
}
- return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupSpec(ctx, call, sgName)
}
func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
if d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
- return verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.SetSyncGroupSpec(ctx, call, sgName, spec, version)
}
func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
if d.batchId != nil {
return nil, wire.NewErrBoundToBatch(ctx)
}
- return nil, verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.GetSyncGroupMembers(ctx, call, sgName)
}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index a06da74..1b158f6 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
import (
"fmt"
+ "strings"
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -246,7 +247,7 @@
// make forEachSyncGroup() stop the iteration earlier; otherwise the function
// loops across all SyncGroups in the Database.
func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+ scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
stream := st.Scan(scanStart, scanLimit)
for stream.Advance() {
var sg interfaces.SyncGroup
@@ -286,10 +287,13 @@
// relationships.
// Use the functions above to manipulate SyncGroups.
-// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
-func sgDataKeyScanPrefix() string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-}
+var (
+ // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
+ sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+
+ // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
+ sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+)
// sgDataKey returns the key used to access the SyncGroup data entry.
func sgDataKey(gid interfaces.GroupId) string {
@@ -301,6 +305,17 @@
return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
}
+// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
+func splitSgNameKey(ctx *context.T, key string) (string, error) {
+ prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
+
+ // Note that the actual SyncGroup name may contain ":" as a separator.
+ if !strings.HasPrefix(key, prefix) {
+ return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
+ }
+ return strings.TrimPrefix(key, prefix), nil
+}
+
// hasSGDataEntry returns true if the SyncGroup data entry exists.
func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
@@ -518,6 +533,115 @@
return sg.Spec, nil
}
+func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ var sgNames []string
+
+ vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
+ defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return nil, err
+ }
+
+ // Scan all the SyncGroup names found in the Database.
+ scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
+ stream := sn.Scan(scanStart, scanLimit)
+ var key []byte
+ for stream.Advance() {
+ sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
+ if err != nil {
+ return nil, err
+ }
+ sgNames = append(sgNames, sgName)
+ }
+
+ if err := stream.Err(); err != nil {
+ return nil, err
+ }
+
+ return sgNames, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+ var spec wire.SyncGroupSpec
+
+ vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return spec, "", err
+ }
+
+ // Get the SyncGroup information.
+ sg, err := getSyncGroupByName(ctx, sn, sgName)
+ if err != nil {
+ return spec, "", err
+ }
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ spec = sg.Spec
+ return spec, sg.SpecVersion, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+ var members map[string]wire.SyncGroupMemberInfo
+
+ vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+
+ sn := sd.db.St().NewSnapshot()
+ defer sn.Close()
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+ return members, err
+ }
+
+ // Get the SyncGroup information.
+ sg, err := getSyncGroupByName(ctx, sn, sgName)
+ if err != nil {
+ return members, err
+ }
+
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ members = sg.Joiners
+ return members, nil
+}
+
+// TODO(hpucha): Enable syncing syncgroup metadata.
+func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+ vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
+ defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+
+ err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+ return err
+ }
+
+ gid, err := getSyncGroupId(ctx, tx, sgName)
+ if err != nil {
+ return err
+ }
+ sg, err := getSyncGroupById(ctx, tx, gid)
+
+ // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+
+ sg.Spec = spec
+ return setSGDataEntry(ctx, tx, gid, sg)
+ })
+ return err
+}
+
//////////////////////////////
// Helper functions
diff --git a/services/syncbase/vsync/test_util.go b/services/syncbase/vsync/test_util.go
index b145887..d98d35a 100644
--- a/services/syncbase/vsync/test_util.go
+++ b/services/syncbase/vsync/test_util.go
@@ -106,7 +106,7 @@
return d.bst
}
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return verror.NewErrNotImplemented(ctx)
}