syncbase/vsync: Implementation for SyncGroup related getters and setspec.

Change-Id: I54697c9dd66a4ecd2a19507fbafc4dd7c9cc08f5
diff --git a/v23/syncbase/nosql/syncgroup_test.go b/v23/syncbase/nosql/syncgroup_test.go
index 73e0550..9705c3a 100644
--- a/v23/syncbase/nosql/syncgroup_test.go
+++ b/v23/syncbase/nosql/syncgroup_test.go
@@ -5,6 +5,7 @@
 package nosql_test
 
 import (
+	"reflect"
 	"testing"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -32,6 +33,9 @@
 
 	createSyncGroup(t, ctx, d, sg1, spec, verror.ErrBadArg.ID)
 
+	var wantNames []string
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+
 	// Prefill entries before creating a SyncGroup to exercise the bootstrap
 	// of a SyncGroup through Snapshot operations to the watcher.
 	t1 := tu.CreateTable(t, ctx, d, "t1")
@@ -50,20 +54,34 @@
 	}
 	createSyncGroup(t, ctx, d, sg1, spec, verror.ID(""))
 
+	// Verify SyncGroup is created.
+	wantNames = []string{sg1}
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+	verifySyncGroupInfo(t, ctx, d, sg1, spec, 1)
+
 	// Check if creating an already existing syncgroup fails.
 	createSyncGroup(t, ctx, d, sg1, spec, verror.ErrExist.ID)
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
 
 	// Create a peer syncgroup.
 	spec.Description = "test syncgroup sg2"
 	sg2 := naming.Join(sName, util.SyncbaseSuffix, "sg2")
 	createSyncGroup(t, ctx, d, sg2, spec, verror.ID(""))
 
+	wantNames = []string{sg1, sg2}
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+	verifySyncGroupInfo(t, ctx, d, sg2, spec, 1)
+
 	// Create a nested syncgroup.
 	spec.Description = "test syncgroup sg3"
 	spec.Prefixes = []string{"t1:foobar"}
 	sg3 := naming.Join(sName, util.SyncbaseSuffix, "sg3")
 	createSyncGroup(t, ctx, d, sg3, spec, verror.ID(""))
 
+	wantNames = []string{sg1, sg2, sg3}
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+	verifySyncGroupInfo(t, ctx, d, sg3, spec, 1)
+
 	// Check that create fails if the perms disallow access.
 	perms := perms("root/client")
 	perms.Blacklist("root/client", string(access.Read))
@@ -73,13 +91,12 @@
 	spec.Description = "test syncgroup sg4"
 	sg4 := naming.Join(sName, util.SyncbaseSuffix, "sg4")
 	createSyncGroup(t, ctx, d, sg4, spec, verror.ErrNoAccess.ID)
+	verifySyncGroupNames(t, ctx, d, nil, verror.ErrNoAccess.ID)
 }
 
 // Tests that SyncGroup.Join works as expected for the case with one Syncbase
 // and 2 clients. One client creates the SyncGroup, while the other attempts to
 // join it.
-//
-// TODO(hpucha): Add more integration-style testing with 2 syncbases.
 func TestJoinSyncGroup(t *testing.T) {
 	// Create client1-server pair.
 	ctx, ctx1, sName, rootp, cleanup := tu.SetupOrDieCustom("client1", "server", perms("root/client1"))
@@ -106,6 +123,8 @@
 	// Check that client2's join fails if the perms disallow access.
 	joinSyncGroup(t, ctx2, d2, sgNameA, verror.ErrNoAccess.ID)
 
+	verifySyncGroupNames(t, ctx2, d2, nil, verror.ErrNoAccess.ID)
+
 	// Client1 gives access to client2.
 	if err := d1.SetPermissions(ctx1, perms("root/client1", "root/client2"), ""); err != nil {
 		t.Fatalf("d.SetPermissions() failed: %v", err)
@@ -130,6 +149,46 @@
 
 	// Check that client2's join now succeeds.
 	joinSyncGroup(t, ctx2, d2, sgNameB, verror.ID(""))
+
+	// Verify SyncGroup state.
+	wantNames := []string{sgNameA, sgNameB}
+	verifySyncGroupNames(t, ctx1, d1, wantNames, verror.ID(""))
+	verifySyncGroupNames(t, ctx2, d2, wantNames, verror.ID(""))
+	verifySyncGroupInfo(t, ctx1, d1, sgNameA, specA, 1)
+	verifySyncGroupInfo(t, ctx1, d1, sgNameB, specB, 1)
+	verifySyncGroupInfo(t, ctx2, d2, sgNameB, specB, 1)
+}
+
+// Tests that SyncGroup.SetSpec works as expected.
+func TestSetSpecSyncGroup(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(perms("root/client"))
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+	// Create successfully.
+	sgName := naming.Join(sName, util.SyncbaseSuffix, "sg1")
+	spec := wire.SyncGroupSpec{
+		Description: "test syncgroup sg1",
+		Perms:       nil,
+		Prefixes:    []string{"t1:foo"},
+	}
+	createSyncGroup(t, ctx, d, sgName, spec, verror.ID(""))
+
+	// Verify SyncGroup is created.
+	wantNames := []string{sgName}
+	verifySyncGroupNames(t, ctx, d, wantNames, verror.ID(""))
+	verifySyncGroupInfo(t, ctx, d, sgName, spec, 1)
+
+	spec.Prefixes = []string{"t1:foo", "t2:bar"}
+	spec.Description = "test syncgroup sg1 update"
+	spec.Perms = perms("root/client1")
+
+	sg := d.SyncGroup(sgName)
+	if err := sg.SetSpec(ctx, spec, ""); err != nil {
+		t.Fatalf("sg.SetSpec failed: %v", err)
+	}
+	verifySyncGroupInfo(t, ctx, d, sgName, spec, 1)
 }
 
 ///////////////////
@@ -144,15 +203,35 @@
 	return sg
 }
 
-func joinSyncGroup(t *testing.T, ctx *context.T, d nosql.Database, sgName string, errID verror.ID) nosql.SyncGroup {
+func joinSyncGroup(t *testing.T, ctx *context.T, d nosql.Database, sgName string, wantErr verror.ID) nosql.SyncGroup {
 	sg := d.SyncGroup(sgName)
 	info := wire.SyncGroupMemberInfo{10}
-	if _, err := sg.Join(ctx, info); verror.ErrorID(err) != errID {
+	if _, err := sg.Join(ctx, info); verror.ErrorID(err) != wantErr {
 		tu.Fatalf(t, "Join SG %v failed: %v", sgName, err)
 	}
 	return sg
 }
 
+func verifySyncGroupNames(t *testing.T, ctx *context.T, d nosql.Database, wantNames []string, wantErr verror.ID) {
+	gotNames, gotErr := d.GetSyncGroupNames(ctx)
+	if verror.ErrorID(gotErr) != wantErr || !reflect.DeepEqual(gotNames, wantNames) {
+		t.Fatalf("d.GetSyncGroupNames() failed, got %v, want %v, err %v", gotNames, wantNames, gotErr)
+	}
+}
+
+func verifySyncGroupInfo(t *testing.T, ctx *context.T, d nosql.Database, sgName string, wantSpec wire.SyncGroupSpec, wantMembers int) {
+	sg := d.SyncGroup(sgName)
+	gotSpec, _, err := sg.GetSpec(ctx)
+	if err != nil || !reflect.DeepEqual(gotSpec, wantSpec) {
+		t.Fatalf("sg.GetSpec() failed, got %v, want %v, err %v", gotSpec, wantSpec, err)
+	}
+
+	members, err := sg.GetMembers(ctx)
+	if err != nil || len(members) != wantMembers {
+		t.Fatalf("sg.GetMembers() failed, got %v, want %v, err %v", members, wantMembers, err)
+	}
+}
+
 // TODO(sadovsky): This appears to be identical to tu.DefaultPerms(). We should
 // just use that.
 func perms(bps ...string) access.Permissions {
diff --git a/x/ref/services/syncbase/server/interfaces/database.go b/x/ref/services/syncbase/server/interfaces/database.go
index 32cca17..02e232e 100644
--- a/x/ref/services/syncbase/server/interfaces/database.go
+++ b/x/ref/services/syncbase/server/interfaces/database.go
@@ -26,7 +26,7 @@
 	// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
 	// the database perms.
 	// Designed for use from within App.DeleteNoSQLDatabase.
-	CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error
+	CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error
 
 	// SetPermsInternal updates the database perms.
 	// Designed for use from within App.SetDatabasePerms.
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 7d900e7..2943a52 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -388,7 +388,7 @@
 	return d.a
 }
 
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
diff --git a/x/ref/services/syncbase/server/nosql/database_sgm.go b/x/ref/services/syncbase/server/nosql/database_sgm.go
index 10ad323..cc1a73d 100644
--- a/x/ref/services/syncbase/server/nosql/database_sgm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sgm.go
@@ -19,7 +19,8 @@
 	if d.batchId != nil {
 		return nil, wire.NewErrBoundToBatch(ctx)
 	}
-	return nil, verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupNames(ctx, call)
 }
 
 func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
@@ -63,19 +64,22 @@
 	if d.batchId != nil {
 		return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
 	}
-	return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupSpec(ctx, call, sgName)
 }
 
 func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
 	if d.batchId != nil {
 		return wire.NewErrBoundToBatch(ctx)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.SetSyncGroupSpec(ctx, call, sgName, spec, version)
 }
 
 func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
 	if d.batchId != nil {
 		return nil, wire.NewErrBoundToBatch(ctx)
 	}
-	return nil, verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.GetSyncGroupMembers(ctx, call, sgName)
 }
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index a06da74..1b158f6 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
 
 import (
 	"fmt"
+	"strings"
 	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
@@ -246,7 +247,7 @@
 // make forEachSyncGroup() stop the iteration earlier; otherwise the function
 // loops across all SyncGroups in the Database.
 func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
-	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
 	stream := st.Scan(scanStart, scanLimit)
 	for stream.Advance() {
 		var sg interfaces.SyncGroup
@@ -286,10 +287,13 @@
 // relationships.
 // Use the functions above to manipulate SyncGroups.
 
-// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
-func sgDataKeyScanPrefix() string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-}
+var (
+	// sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
+	sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+
+	// sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
+	sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+)
 
 // sgDataKey returns the key used to access the SyncGroup data entry.
 func sgDataKey(gid interfaces.GroupId) string {
@@ -301,6 +305,17 @@
 	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
 }
 
+// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
+func splitSgNameKey(ctx *context.T, key string) (string, error) {
+	prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
+
+	// Note that the actual SyncGroup name may contain ":" as a separator.
+	if !strings.HasPrefix(key, prefix) {
+		return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
+	}
+	return strings.TrimPrefix(key, prefix), nil
+}
+
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
 func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
@@ -518,6 +533,115 @@
 	return sg.Spec, nil
 }
 
+func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	var sgNames []string
+
+	vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
+	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return nil, err
+	}
+
+	// Scan all the SyncGroup names found in the Database.
+	scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
+	stream := sn.Scan(scanStart, scanLimit)
+	var key []byte
+	for stream.Advance() {
+		sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
+		if err != nil {
+			return nil, err
+		}
+		sgNames = append(sgNames, sgName)
+	}
+
+	if err := stream.Err(); err != nil {
+		return nil, err
+	}
+
+	return sgNames, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+	var spec wire.SyncGroupSpec
+
+	vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return spec, "", err
+	}
+
+	// Get the SyncGroup information.
+	sg, err := getSyncGroupByName(ctx, sn, sgName)
+	if err != nil {
+		return spec, "", err
+	}
+	// TODO(hpucha): Check SyncGroup ACL.
+
+	spec = sg.Spec
+	return spec, sg.SpecVersion, nil
+}
+
+func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+	var members map[string]wire.SyncGroupMemberInfo
+
+	vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+
+	sn := sd.db.St().NewSnapshot()
+	defer sn.Close()
+
+	// Check permissions on Database.
+	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+		return members, err
+	}
+
+	// Get the SyncGroup information.
+	sg, err := getSyncGroupByName(ctx, sn, sgName)
+	if err != nil {
+		return members, err
+	}
+
+	// TODO(hpucha): Check SyncGroup ACL.
+
+	members = sg.Joiners
+	return members, nil
+}
+
+// TODO(hpucha): Enable syncing syncgroup metadata.
+func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
+	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+
+	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+		// Check permissions on Database.
+		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+			return err
+		}
+
+		gid, err := getSyncGroupId(ctx, tx, sgName)
+		if err != nil {
+			return err
+		}
+		sg, err := getSyncGroupById(ctx, tx, gid)
+
+		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+
+		sg.Spec = spec
+		return setSGDataEntry(ctx, tx, gid, sg)
+	})
+	return err
+}
+
 //////////////////////////////
 // Helper functions
 
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index b145887..d98d35a 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -106,7 +106,7 @@
 	return d.bst
 }
 
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	return verror.NewErrNotImplemented(ctx)
 }