syncbase/vsync: Implementation for SyncGroup related getters and setspec.

Change-Id: I54697c9dd66a4ecd2a19507fbafc4dd7c9cc08f5
diff --git a/services/syncbase/server/interfaces/database.go b/services/syncbase/server/interfaces/database.go
index 32cca17..02e232e 100644
--- a/services/syncbase/server/interfaces/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -26,7 +26,7 @@
 	// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
 	// the database perms.
 	// Designed for use from within App.DeleteNoSQLDatabase.
-	CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error
+	CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error
 
 	// SetPermsInternal updates the database perms.
 	// Designed for use from within App.SetDatabasePerms.
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 7d900e7..2943a52 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -388,7 +388,7 @@
 	return d.a
 }
 
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index 10ad323..cc1a73d 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -19,7 +19,8 @@
 	if d.batchId != nil {
 		return nil, wire.NewErrBoundToBatch(ctx)
 	}
-	return nil, verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupNames(ctx, call)
 }
 
 func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
@@ -63,19 +64,22 @@
 	if d.batchId != nil {
 		return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
 	}
-	return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupSpec(ctx, call, sgName)
 }
 
 func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
 	if d.batchId != nil {
 		return wire.NewErrBoundToBatch(ctx)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.SetSyncGroupSpec(ctx, call, sgName, spec, version)
 }
 
 func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
 	if d.batchId != nil {
 		return nil, wire.NewErrBoundToBatch(ctx)
 	}
-	return nil, verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupMembers(ctx, call, sgName)
 }
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index a06da74..1b158f6 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
 
 import (
 	"fmt"
+	"strings"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -246,7 +247,7 @@
 // make forEachSyncGroup() stop the iteration earlier; otherwise the function
 // loops across all SyncGroups in the Database.
 func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
-	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
 	stream := st.Scan(scanStart, scanLimit)
 	for stream.Advance() {
 		var sg interfaces.SyncGroup
@@ -286,10 +287,13 @@
 // relationships.
 // Use the functions above to manipulate SyncGroups.
 
-// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
-func sgDataKeyScanPrefix() string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-}
+var (
+	// sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
+	sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+
+	// sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
+	sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+)
 
 // sgDataKey returns the key used to access the SyncGroup data entry.
 func sgDataKey(gid interfaces.GroupId) string {
@@ -301,6 +305,17 @@
 	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
 }
 
+// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
+func splitSgNameKey(ctx *context.T, key string) (string, error) {
+	prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
+
+	// Note that the actual SyncGroup name may contain ":" as a separator.
+	if !strings.HasPrefix(key, prefix) {
+		return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
+	}
+	return strings.TrimPrefix(key, prefix), nil
+}
+
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
 func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
@@ -518,6 +533,115 @@
 	return sg.Spec, nil
 }
 
+func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	var sgNames []string
+
+	vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
+	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return nil, err
+	}
+
+	// Scan all the SyncGroup names found in the Database.
+	scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
+	stream := sn.Scan(scanStart, scanLimit)
+	var key []byte
+	for stream.Advance() {
+		sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
+		if err != nil {
+			return nil, err
+		}
+		sgNames = append(sgNames, sgName)
+	}
+
+	if err := stream.Err(); err != nil {
+		return nil, err
+	}
+
+	return sgNames, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+	var spec wire.SyncGroupSpec
+
+	vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return spec, "", err
+	}
+
+	// Get the SyncGroup information.
+	sg, err := getSyncGroupByName(ctx, sn, sgName)
+	if err != nil {
+		return spec, "", err
+	}
+	// TODO(hpucha): Check SyncGroup ACL.
+
+	spec = sg.Spec
+	return spec, sg.SpecVersion, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+	var members map[string]wire.SyncGroupMemberInfo
+
+	vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return members, err
+	}
+
+	// Get the SyncGroup information.
+	sg, err := getSyncGroupByName(ctx, sn, sgName)
+	if err != nil {
+		return members, err
+	}
+
+	// TODO(hpucha): Check SyncGroup ACL.
+
+	members = sg.Joiners
+	return members, nil
+}
+
+// TODO(hpucha): Enable syncing syncgroup metadata.
+func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
+	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+
+	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+		// Check permissions on Database.
+		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+			return err
+		}
+
+		gid, err := getSyncGroupId(ctx, tx, sgName)
+		if err != nil {
+			return err
+		}
+		sg, err := getSyncGroupById(ctx, tx, gid)
+
+		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+
+		sg.Spec = spec
+		return setSGDataEntry(ctx, tx, gid, sg)
+	})
+	return err
+}
+
 //////////////////////////////
 // Helper functions
 
diff --git a/services/syncbase/vsync/test_util.go b/services/syncbase/vsync/test_util.go
index b145887..d98d35a 100644
--- a/services/syncbase/vsync/test_util.go
+++ b/services/syncbase/vsync/test_util.go
@@ -106,7 +106,7 @@
 	return d.bst
 }
 
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	return verror.NewErrNotImplemented(ctx)
 }