syncbase/vsync: SyncGroup membership view.

Create a view of all SyncGroup members and refresh it on a timeout.
Scan the SyncGroups in each database across all apps.

Additional fixes from previous CLs:
- Switch to using verror.
- Reuse the existing random number generator.
- Assert that a store.Transaction is passed where it is required.
- Improve some comments.

Change-Id: I69bbed808c3b91f38f8f11bef51865b38736651f
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 1b5217d..81276f0 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -88,6 +88,17 @@
 	return d, nil
 }
 
+func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	// In the future this API should be replaced by one that streams the database names.
+	a.mu.Lock()
+	defer a.mu.Unlock()
+	dbNames := make([]string, 0, len(a.dbs))
+	for n := range a.dbs {
+		dbNames = append(dbNames, n)
+	}
+	return dbNames, nil
+}
+
 func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
 	// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
 	// crash, upon restart we execute any remaining steps before we start handling
diff --git a/services/syncbase/server/interfaces/app.go b/services/syncbase/server/interfaces/app.go
index a893e5a..f9d8dab 100644
--- a/services/syncbase/server/interfaces/app.go
+++ b/services/syncbase/server/interfaces/app.go
@@ -16,6 +16,9 @@
 	// NoSQLDatabase returns the Database for the specified NoSQL database.
 	NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (Database, error)
 
+	// NoSQLDatabaseNames returns the names of the NoSQL databases within the App.
+	NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
+
 	// CreateNoSQLDatabase creates the specified NoSQL database.
 	CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error
 
diff --git a/services/syncbase/server/interfaces/service.go b/services/syncbase/server/interfaces/service.go
index 3cbbda4..4d7ff60 100644
--- a/services/syncbase/server/interfaces/service.go
+++ b/services/syncbase/server/interfaces/service.go
@@ -18,4 +18,7 @@
 
 	// App returns the App with the specified name.
 	App(ctx *context.T, call rpc.ServerCall, appName string) (App, error)
+
+	// AppNames returns the names of the Apps within the service.
+	AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
 }
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 094b60f..68a72ab 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -114,6 +114,17 @@
 	return a, nil
 }
 
+func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	// In the future this API should be replaced by one that streams the app names.
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	appNames := make([]string, 0, len(s.apps))
+	for n := range s.apps {
+		appNames = append(appNames, n)
+	}
+	return appNames, nil
+}
+
 ////////////////////////////////////////
 // App management methods
 
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 1ccfabc..e56940d 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -4,16 +4,14 @@
 
 package vsync
 
-// SyncGroup management and storage in Syncbase.  Handle the lifecycle
-// of SyncGroup (create, join, leave, etc.) and their persistence as
-// sync metadata in the application databases.  Provide helper functions
+// SyncGroup management and storage in Syncbase.  Handles the lifecycle
+// of SyncGroups (create, join, leave, etc.) and their persistence as
+// sync metadata in the application databases.  Provides helper functions
 // to the higher levels of sync (Initiator, Watcher) to get membership
 // information and map key/value changes to their matching SyncGroups.
 
 import (
-	"errors" // TODO(rdaoud): switch to verror
 	"fmt"
-	"math/rand"
 	"time"
 
 	"v.io/syncbase/v23/services/syncbase/nosql"
@@ -22,27 +20,21 @@
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 )
 
 var (
 	// memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
 	memberViewTTL = 2 * time.Second
-
-	// sgRng is a random number generator used for SyncGroup versions.
-	sgRng *rand.Rand
 )
 
-func init() {
-	sgRng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
-}
-
 ////////////////////////////////////////////////////////////
 // SyncGroup management internal to Syncbase.
 
 // memberView holds an aggregated view of all SyncGroup members across databases.
 // The view is not coherent, it gets refreshed according to a configured TTL and
 // not (coherently) when SyncGroup membership is updated in the various databases.
-// It is needed by the sync Initiator when selecting a peer to contact from a
+// It is needed by the sync Initiator, which must select a peer to contact from a
 // global view of all SyncGroup members gathered from all databases.  This is why
 // a slightly stale view is acceptable.
 // The members are identified by their Vanadium names (map keys).
@@ -58,42 +50,44 @@
 
 // newSyncGroupVersion generates a random SyncGroup version ("etag").
 func newSyncGroupVersion() string {
-	return fmt.Sprintf("%x", sgRng.Int63())
+	return fmt.Sprintf("%x", rng.Int63())
 }
 
 // addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(tx store.StoreReadWriter, sg *SyncGroup) error {
+func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *SyncGroup) error {
+	_ = tx.(store.Transaction)
+
 	if sg == nil {
-		return errors.New("group information not specified")
+		return verror.New(verror.ErrBadArg, ctx, "group information not specified")
 	}
 	if sg.Name == "" {
-		return errors.New("group name not specified")
+		return verror.New(verror.ErrBadArg, ctx, "group name not specified")
 	}
 	if sg.Id == NoGroupId {
-		return errors.New("group ID not specified")
+		return verror.New(verror.ErrBadArg, ctx, "group ID not specified")
 	}
 	if sg.Version == "" {
-		return errors.New("group version not specified")
+		return verror.New(verror.ErrBadArg, ctx, "group version not specified")
 	}
 	if len(sg.Joiners) == 0 {
-		return errors.New("group has no joiners")
+		return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
 	}
 	if len(sg.Spec.Prefixes) == 0 {
-		return errors.New("group has no prefixes specified")
+		return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
 	}
 
 	if hasSGDataEntry(tx, sg.Id) {
-		return fmt.Errorf("group %d already exists", sg.Id)
+		return verror.New(verror.ErrBadArg, ctx, "group id already exists")
 	}
 	if hasSGNameEntry(tx, sg.Name) {
-		return fmt.Errorf("group name %s already exists", sg.Name)
+		return verror.New(verror.ErrBadArg, ctx, "group name already exists")
 	}
 
 	// Add the group name and data entries.
-	if err := setSGNameEntry(tx, sg.Name, sg.Id); err != nil {
+	if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
 		return err
 	}
-	if err := setSGDataEntry(tx, sg.Id, sg); err != nil {
+	if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
 		return err
 	}
 
@@ -101,13 +95,13 @@
 }
 
 // getSyncGroupId retrieves the SyncGroup ID given its name.
-func getSyncGroupId(st store.StoreReader, name string) (GroupId, error) {
-	return getSGNameEntry(st, name)
+func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
+	return getSGNameEntry(ctx, st, name)
 }
 
 // getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(st store.StoreReader, gid GroupId) (string, error) {
-	sg, err := getSyncGroupById(st, gid)
+func getSyncGroupName(ctx *context.T, st store.StoreReader, gid GroupId) (string, error) {
+	sg, err := getSyncGroupById(ctx, st, gid)
 	if err != nil {
 		return "", err
 	}
@@ -115,43 +109,49 @@
 }
 
 // getSyncGroupById retrieves the SyncGroup given its ID.
-func getSyncGroupById(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
-	return getSGDataEntry(st, gid)
+func getSyncGroupById(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+	return getSGDataEntry(ctx, st, gid)
 }
 
 // getSyncGroupByName retrieves the SyncGroup given its name.
-func getSyncGroupByName(st store.StoreReader, name string) (*SyncGroup, error) {
-	gid, err := getSyncGroupId(st, name)
+func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*SyncGroup, error) {
+	gid, err := getSyncGroupId(ctx, st, name)
 	if err != nil {
 		return nil, err
 	}
-	return getSyncGroupById(st, gid)
+	return getSyncGroupById(ctx, st, gid)
 }
 
 // delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(tx store.StoreReadWriter, gid GroupId) error {
-	sg, err := getSyncGroupById(tx, gid)
+func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+	_ = tx.(store.Transaction)
+
+	sg, err := getSyncGroupById(ctx, tx, gid)
 	if err != nil {
 		return err
 	}
-	if err = delSGNameEntry(tx, sg.Name); err != nil {
+	if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
 		return err
 	}
-	return delSGDataEntry(tx, sg.Id)
+	return delSGDataEntry(ctx, tx, sg.Id)
 }
 
 // delSyncGroupByName deletes the SyncGroup given its name.
-func delSyncGroupByName(tx store.StoreReadWriter, name string) error {
-	gid, err := getSyncGroupId(tx, name)
+func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
+	_ = tx.(store.Transaction)
+
+	gid, err := getSyncGroupId(ctx, tx, name)
 	if err != nil {
 		return err
 	}
-	return delSyncGroupById(tx, gid)
+	return delSyncGroupById(ctx, tx, gid)
 }
 
 // refreshMembersIfExpired updates the aggregate view of SyncGroup members across
 // databases if the view has expired.
-func (s *syncService) refreshMembersIfExpired() {
+// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
+// update the membership view for them instead of always scanning all of them.
+func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
 	view := s.allMembers
 	if view == nil {
 		// The empty expiration time in Go is before "now" and treated as expired below.
@@ -163,15 +163,65 @@
 		return
 	}
 
-	// TODO(rdaoud): iterate over all SyncGroups in all app DBs to get members.
-	// TODO(rdaoud): pending a new Syncbase API to access apps and database handles.
+	// Create a new aggregate view of SyncGroup members across all app databases.
+	// Get the apps and iterate over them.
+	appNames, err := s.sv.AppNames(ctx, nil)
+	if err != nil {
+		return
+	}
 
+	newMembers := make(map[string]*memberInfo)
+	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+
+	// For each app, get its databases and iterate over them.
+	for _, a := range appNames {
+		app, err := s.sv.App(ctx, nil, a)
+		if err != nil {
+			continue
+		}
+		dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
+		if err != nil {
+			continue
+		}
+
+		// For each database, fetch its SyncGroup data entries by scanning their
+		// prefix range.  Use a database snapshot for the scan.
+		for _, d := range dbNames {
+			db, err := app.NoSQLDatabase(ctx, nil, d)
+			if err != nil {
+				continue
+			}
+
+			sn := db.St().NewSnapshot()
+			stream := sn.Scan(scanStart, scanLimit)
+			for stream.Advance() {
+				var sg SyncGroup
+				if vom.Decode(stream.Value(nil), &sg) != nil {
+					continue
+				}
+
+				// Add all members of this SyncGroup to the membership view.
+				for member, info := range sg.Joiners {
+					if _, ok := newMembers[member]; !ok {
+						newMembers[member] = &memberInfo{
+							gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
+						}
+					}
+					newMembers[member].gid2info[sg.Id] = info
+				}
+			}
+
+			sn.Close()
+		}
+	}
+
+	view.members = newMembers
 	view.expiration = time.Now().Add(memberViewTTL)
 }
 
 // getMembers returns all SyncGroup members and the count of SyncGroups each one joined.
-func (s *syncService) getMembers() map[string]uint32 {
-	s.refreshMembersIfExpired()
+func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
+	s.refreshMembersIfExpired(ctx)
 
 	members := make(map[string]uint32)
 	for member, info := range s.allMembers.members {
@@ -184,6 +234,11 @@
 // Low-level utility functions to access DB entries without tracking their relationships.
 // Use the functions above to manipulate SyncGroups.
 
+// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
+func sgDataKeyScanPrefix() string {
+	return util.JoinKeyParts(util.SyncPrefix, "sg", "d")
+}
+
 // sgDataKey returns the key used to access the SyncGroup data entry.
 func sgDataKey(gid GroupId) string {
 	return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
@@ -215,39 +270,61 @@
 }
 
 // setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
-	return util.PutObject(tx, sgDataKey(gid), sg)
+func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
+	_ = tx.(store.Transaction)
+
+	if err := util.PutObject(tx, sgDataKey(gid), sg); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
 
 // setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(tx store.StoreReadWriter, name string, gid GroupId) error {
-	return util.PutObject(tx, sgNameKey(name), gid)
+func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid GroupId) error {
+	_ = tx.(store.Transaction)
+
+	if err := util.PutObject(tx, sgNameKey(name), gid); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
 
 // getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
 	var sg SyncGroup
 	if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
-		return nil, err
+		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
 	return &sg, nil
 }
 
 // getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(st store.StoreReader, name string) (GroupId, error) {
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
 	var gid GroupId
-	err := util.GetObject(st, sgNameKey(name), &gid)
-	return gid, err
+	if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
+		return gid, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return gid, nil
 }
 
 // delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(tx store.StoreReadWriter, gid GroupId) error {
-	return tx.Delete([]byte(sgDataKey(gid)))
+func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+	_ = tx.(store.Transaction)
+
+	if err := tx.Delete([]byte(sgDataKey(gid))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
 
 // delSGNameEntry deletes the SyncGroup name to ID mapping.
-func delSGNameEntry(tx store.StoreReadWriter, name string) error {
-	return tx.Delete([]byte(sgNameKey(name)))
+func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
+	_ = tx.(store.Transaction)
+
+	if err := tx.Delete([]byte(sgNameKey(name))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
 
 ////////////////////////////////////////////////////////////