syncbase/vsync: SyncGroup membership view.
Create a view of all SyncGroup members and refresh it on a timeout.
Scan the SyncGroups in each database across all apps.
Additional fixes from previous CLs:
- Switch to using verror.
- Reuse the existing random number generator.
- Assert that a store.Transaction is passed where it is required.
- Improve some comments.
Change-Id: I69bbed808c3b91f38f8f11bef51865b38736651f
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 1b5217d..81276f0 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -88,6 +88,17 @@
return d, nil
}
+func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ // In the future this API should be replaced by one that streams the database names.
+ a.mu.Lock()
+ defer a.mu.Unlock()
+ dbNames := make([]string, 0, len(a.dbs))
+ for n := range a.dbs {
+ dbNames = append(dbNames, n)
+ }
+ return dbNames, nil
+}
+
func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
// crash, upon restart we execute any remaining steps before we start handling
diff --git a/services/syncbase/server/interfaces/app.go b/services/syncbase/server/interfaces/app.go
index a893e5a..f9d8dab 100644
--- a/services/syncbase/server/interfaces/app.go
+++ b/services/syncbase/server/interfaces/app.go
@@ -16,6 +16,9 @@
// NoSQLDatabase returns the Database for the specified NoSQL database.
NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (Database, error)
+ // NoSQLDatabaseNames returns the names of the NoSQL databases within the App.
+ NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
+
// CreateNoSQLDatabase creates the specified NoSQL database.
CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error
diff --git a/services/syncbase/server/interfaces/service.go b/services/syncbase/server/interfaces/service.go
index 3cbbda4..4d7ff60 100644
--- a/services/syncbase/server/interfaces/service.go
+++ b/services/syncbase/server/interfaces/service.go
@@ -18,4 +18,7 @@
// App returns the App with the specified name.
App(ctx *context.T, call rpc.ServerCall, appName string) (App, error)
+
+ // AppNames returns the names of the Apps within the service.
+ AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 094b60f..68a72ab 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -114,6 +114,17 @@
return a, nil
}
+func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ // In the future this API should be replaced by one that streams the app names.
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ appNames := make([]string, 0, len(s.apps))
+ for n := range s.apps {
+ appNames = append(appNames, n)
+ }
+ return appNames, nil
+}
+
////////////////////////////////////////
// App management methods
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 1ccfabc..e56940d 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -4,16 +4,14 @@
package vsync
-// SyncGroup management and storage in Syncbase. Handle the lifecycle
-// of SyncGroup (create, join, leave, etc.) and their persistence as
-// sync metadata in the application databases. Provide helper functions
+// SyncGroup management and storage in Syncbase. Handles the lifecycle
+// of SyncGroups (create, join, leave, etc.) and their persistence as
+// sync metadata in the application databases. Provides helper functions
// to the higher levels of sync (Initiator, Watcher) to get membership
// information and map key/value changes to their matching SyncGroups.
import (
- "errors" // TODO(rdaoud): switch to verror
"fmt"
- "math/rand"
"time"
"v.io/syncbase/v23/services/syncbase/nosql"
@@ -22,27 +20,21 @@
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
+ "v.io/v23/vom"
)
var (
// memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
memberViewTTL = 2 * time.Second
-
- // sgRng is a random number generator used for SyncGroup versions.
- sgRng *rand.Rand
)
-func init() {
- sgRng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
-}
-
////////////////////////////////////////////////////////////
// SyncGroup management internal to Syncbase.
// memberView holds an aggregated view of all SyncGroup members across databases.
// The view is not coherent, it gets refreshed according to a configured TTL and
// not (coherently) when SyncGroup membership is updated in the various databases.
-// It is needed by the sync Initiator when selecting a peer to contact from a
+// It is needed by the sync Initiator, which must select a peer to contact from a
// global view of all SyncGroup members gathered from all databases. This is why
// a slightly stale view is acceptable.
// The members are identified by their Vanadium names (map keys).
@@ -58,42 +50,44 @@
// newSyncGroupVersion generates a random SyncGroup version ("etag").
func newSyncGroupVersion() string {
- return fmt.Sprintf("%x", sgRng.Int63())
+ return fmt.Sprintf("%x", rng.Int63())
}
// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(tx store.StoreReadWriter, sg *SyncGroup) error {
+func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *SyncGroup) error {
+ _ = tx.(store.Transaction)
+
if sg == nil {
- return errors.New("group information not specified")
+ return verror.New(verror.ErrBadArg, ctx, "group information not specified")
}
if sg.Name == "" {
- return errors.New("group name not specified")
+ return verror.New(verror.ErrBadArg, ctx, "group name not specified")
}
if sg.Id == NoGroupId {
- return errors.New("group ID not specified")
+ return verror.New(verror.ErrBadArg, ctx, "group ID not specified")
}
if sg.Version == "" {
- return errors.New("group version not specified")
+ return verror.New(verror.ErrBadArg, ctx, "group version not specified")
}
if len(sg.Joiners) == 0 {
- return errors.New("group has no joiners")
+ return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
}
if len(sg.Spec.Prefixes) == 0 {
- return errors.New("group has no prefixes specified")
+ return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
if hasSGDataEntry(tx, sg.Id) {
- return fmt.Errorf("group %d already exists", sg.Id)
+ return verror.New(verror.ErrBadArg, ctx, "group id already exists")
}
if hasSGNameEntry(tx, sg.Name) {
- return fmt.Errorf("group name %s already exists", sg.Name)
+ return verror.New(verror.ErrBadArg, ctx, "group name already exists")
}
// Add the group name and data entries.
- if err := setSGNameEntry(tx, sg.Name, sg.Id); err != nil {
+ if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
return err
}
- if err := setSGDataEntry(tx, sg.Id, sg); err != nil {
+ if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
return err
}
@@ -101,13 +95,13 @@
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
-func getSyncGroupId(st store.StoreReader, name string) (GroupId, error) {
- return getSGNameEntry(st, name)
+func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
+ return getSGNameEntry(ctx, st, name)
}
// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(st store.StoreReader, gid GroupId) (string, error) {
- sg, err := getSyncGroupById(st, gid)
+func getSyncGroupName(ctx *context.T, st store.StoreReader, gid GroupId) (string, error) {
+ sg, err := getSyncGroupById(ctx, st, gid)
if err != nil {
return "", err
}
@@ -115,43 +109,49 @@
}
// getSyncGroupById retrieves the SyncGroup given its ID.
-func getSyncGroupById(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
- return getSGDataEntry(st, gid)
+func getSyncGroupById(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+ return getSGDataEntry(ctx, st, gid)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
-func getSyncGroupByName(st store.StoreReader, name string) (*SyncGroup, error) {
- gid, err := getSyncGroupId(st, name)
+func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*SyncGroup, error) {
+ gid, err := getSyncGroupId(ctx, st, name)
if err != nil {
return nil, err
}
- return getSyncGroupById(st, gid)
+ return getSyncGroupById(ctx, st, gid)
}
// delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(tx store.StoreReadWriter, gid GroupId) error {
- sg, err := getSyncGroupById(tx, gid)
+func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+ _ = tx.(store.Transaction)
+
+ sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
}
- if err = delSGNameEntry(tx, sg.Name); err != nil {
+ if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
return err
}
- return delSGDataEntry(tx, sg.Id)
+ return delSGDataEntry(ctx, tx, sg.Id)
}
// delSyncGroupByName deletes the SyncGroup given its name.
-func delSyncGroupByName(tx store.StoreReadWriter, name string) error {
- gid, err := getSyncGroupId(tx, name)
+func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
+ _ = tx.(store.Transaction)
+
+ gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
}
- return delSyncGroupById(tx, gid)
+ return delSyncGroupById(ctx, tx, gid)
}
// refreshMembersIfExpired updates the aggregate view of SyncGroup members across
// databases if the view has expired.
-func (s *syncService) refreshMembersIfExpired() {
+// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
+// update the membership view for them instead of always scanning all of them.
+func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
view := s.allMembers
if view == nil {
// The empty expiration time in Go is before "now" and treated as expired below.
@@ -163,15 +163,65 @@
return
}
- // TODO(rdaoud): iterate over all SyncGroups in all app DBs to get members.
- // TODO(rdaoud): pending a new Syncbase API to access apps and database handles.
+ // Create a new aggregate view of SyncGroup members across all app databases.
+ // Get the apps and iterate over them.
+ appNames, err := s.sv.AppNames(ctx, nil)
+ if err != nil {
+ return
+ }
+ newMembers := make(map[string]*memberInfo)
+ scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+
+ // For each app, get its databases and iterate over them.
+ for _, a := range appNames {
+ app, err := s.sv.App(ctx, nil, a)
+ if err != nil {
+ continue
+ }
+ dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
+ if err != nil {
+ continue
+ }
+
+ // For each database, fetch its SyncGroup data entries by scanning their
+ // prefix range. Use a database snapshot for the scan.
+ for _, d := range dbNames {
+ db, err := app.NoSQLDatabase(ctx, nil, d)
+ if err != nil {
+ continue
+ }
+
+ sn := db.St().NewSnapshot()
+ stream := sn.Scan(scanStart, scanLimit)
+ for stream.Advance() {
+ var sg SyncGroup
+ if vom.Decode(stream.Value(nil), &sg) != nil {
+ continue
+ }
+
+ // Add all members of this SyncGroup to the membership view.
+ for member, info := range sg.Joiners {
+ if _, ok := newMembers[member]; !ok {
+ newMembers[member] = &memberInfo{
+ gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
+ }
+ }
+ newMembers[member].gid2info[sg.Id] = info
+ }
+ }
+
+ sn.Close()
+ }
+ }
+
+ view.members = newMembers
view.expiration = time.Now().Add(memberViewTTL)
}
// getMembers returns all SyncGroup members and the count of SyncGroups each one joined.
-func (s *syncService) getMembers() map[string]uint32 {
- s.refreshMembersIfExpired()
+func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
+ s.refreshMembersIfExpired(ctx)
members := make(map[string]uint32)
for member, info := range s.allMembers.members {
@@ -184,6 +234,11 @@
// Low-level utility functions to access DB entries without tracking their relationships.
// Use the functions above to manipulate SyncGroups.
+// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
+func sgDataKeyScanPrefix() string {
+ return util.JoinKeyParts(util.SyncPrefix, "sg", "d")
+}
+
// sgDataKey returns the key used to access the SyncGroup data entry.
func sgDataKey(gid GroupId) string {
return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
@@ -215,39 +270,61 @@
}
// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
- return util.PutObject(tx, sgDataKey(gid), sg)
+func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
+ _ = tx.(store.Transaction)
+
+ if err := util.PutObject(tx, sgDataKey(gid), sg); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
// setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(tx store.StoreReadWriter, name string, gid GroupId) error {
- return util.PutObject(tx, sgNameKey(name), gid)
+func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid GroupId) error {
+ _ = tx.(store.Transaction)
+
+ if err := util.PutObject(tx, sgNameKey(name), gid); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
var sg SyncGroup
if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
- return nil, err
+ return nil, verror.New(verror.ErrInternal, ctx, err)
}
return &sg, nil
}
// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(st store.StoreReader, name string) (GroupId, error) {
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
var gid GroupId
- err := util.GetObject(st, sgNameKey(name), &gid)
- return gid, err
+ if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
+ return gid, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return gid, nil
}
// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(tx store.StoreReadWriter, gid GroupId) error {
- return tx.Delete([]byte(sgDataKey(gid)))
+func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+ _ = tx.(store.Transaction)
+
+ if err := tx.Delete([]byte(sgDataKey(gid))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
// delSGNameEntry deletes the SyncGroup name to ID mapping.
-func delSGNameEntry(tx store.StoreReadWriter, name string) error {
- return tx.Delete([]byte(sgNameKey(name)))
+func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
+ _ = tx.(store.Transaction)
+
+ if err := tx.Delete([]byte(sgNameKey(name))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
////////////////////////////////////////////////////////////