syncbase/vsync: SyncGroup storage layer.

Add the SyncGroup storage layer and some of the support functions.

Change-Id: Ib9971e7c9b54f8c9ea1cdd1501bc3756b2482a0c
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 361b5a0..078024d 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -32,6 +32,9 @@
 	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
 	closed  chan struct{}
+
+	// In-memory sync membership info aggregated across databases.
+	allMembers *memberView
 }
 
 var (
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index c3ab3d1..1ccfabc 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -4,12 +4,252 @@
 
 package vsync
 
+// SyncGroup management and storage in Syncbase.  Handle the lifecycle
+// of SyncGroup (create, join, leave, etc.) and their persistence as
+// sync metadata in the application databases.  Provide helper functions
+// to the higher levels of sync (Initiator, Watcher) to get membership
+// information and map key/value changes to their matching SyncGroups.
+
 import (
+	"errors" // TODO(rdaoud): switch to verror
+	"fmt"
+	"math/rand"
+	"time"
+
+	"v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
 )
 
+var (
+	// memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
+	memberViewTTL = 2 * time.Second
+
+	// sgRng is a random number generator used for SyncGroup versions.
+	sgRng *rand.Rand
+)
+
+func init() {
+	sgRng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+}
+
+////////////////////////////////////////////////////////////
+// SyncGroup management internal to Syncbase.
+
+// memberView holds an aggregated view of all SyncGroup members across databases.
+// The view is not coherent, it gets refreshed according to a configured TTL and
+// not (coherently) when SyncGroup membership is updated in the various databases.
+// It is needed by the sync Initiator when selecting a peer to contact from a
+// global view of all SyncGroup members gathered from all databases.  This is why
+// a slightly stale view is acceptable.
+// The members are identified by their Vanadium names (map keys).
+type memberView struct {
+	expiration time.Time
+	members    map[string]*memberInfo
+}
+
+// memberInfo holds the member metadata for each SyncGroup this member belongs to.
+type memberInfo struct {
+	gid2info map[GroupId]nosql.SyncGroupMemberInfo
+}
+
+// newSyncGroupVersion generates a random SyncGroup version ("etag").
+func newSyncGroupVersion() string {
+	return fmt.Sprintf("%x", sgRng.Int63())
+}
+
+// addSyncGroup adds a new SyncGroup given its information.
+func addSyncGroup(tx store.StoreReadWriter, sg *SyncGroup) error {
+	if sg == nil {
+		return errors.New("group information not specified")
+	}
+	if sg.Name == "" {
+		return errors.New("group name not specified")
+	}
+	if sg.Id == NoGroupId {
+		return errors.New("group ID not specified")
+	}
+	if sg.Version == "" {
+		return errors.New("group version not specified")
+	}
+	if len(sg.Joiners) == 0 {
+		return errors.New("group has no joiners")
+	}
+	if len(sg.Spec.Prefixes) == 0 {
+		return errors.New("group has no prefixes specified")
+	}
+
+	if hasSGDataEntry(tx, sg.Id) {
+		return fmt.Errorf("group %d already exists", sg.Id)
+	}
+	if hasSGNameEntry(tx, sg.Name) {
+		return fmt.Errorf("group name %s already exists", sg.Name)
+	}
+
+	// Add the group name and data entries.
+	if err := setSGNameEntry(tx, sg.Name, sg.Id); err != nil {
+		return err
+	}
+	if err := setSGDataEntry(tx, sg.Id, sg); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// getSyncGroupId retrieves the SyncGroup ID given its name.
+func getSyncGroupId(st store.StoreReader, name string) (GroupId, error) {
+	return getSGNameEntry(st, name)
+}
+
+// getSyncGroupName retrieves the SyncGroup name given its ID.
+func getSyncGroupName(st store.StoreReader, gid GroupId) (string, error) {
+	sg, err := getSyncGroupById(st, gid)
+	if err != nil {
+		return "", err
+	}
+	return sg.Name, nil
+}
+
+// getSyncGroupById retrieves the SyncGroup given its ID.
+func getSyncGroupById(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+	return getSGDataEntry(st, gid)
+}
+
+// getSyncGroupByName retrieves the SyncGroup given its name.
+func getSyncGroupByName(st store.StoreReader, name string) (*SyncGroup, error) {
+	gid, err := getSyncGroupId(st, name)
+	if err != nil {
+		return nil, err
+	}
+	return getSyncGroupById(st, gid)
+}
+
+// delSyncGroupById deletes the SyncGroup given its ID.
+func delSyncGroupById(tx store.StoreReadWriter, gid GroupId) error {
+	sg, err := getSyncGroupById(tx, gid)
+	if err != nil {
+		return err
+	}
+	if err = delSGNameEntry(tx, sg.Name); err != nil {
+		return err
+	}
+	return delSGDataEntry(tx, sg.Id)
+}
+
+// delSyncGroupByName deletes the SyncGroup given its name.
+func delSyncGroupByName(tx store.StoreReadWriter, name string) error {
+	gid, err := getSyncGroupId(tx, name)
+	if err != nil {
+		return err
+	}
+	return delSyncGroupById(tx, gid)
+}
+
+// refreshMembersIfExpired updates the aggregate view of SyncGroup members across
+// databases if the view has expired.
+func (s *syncService) refreshMembersIfExpired() {
+	view := s.allMembers
+	if view == nil {
+		// The empty expiration time in Go is before "now" and treated as expired below.
+		view = &memberView{expiration: time.Time{}, members: make(map[string]*memberInfo)}
+		s.allMembers = view
+	}
+
+	if time.Now().Before(view.expiration) {
+		return
+	}
+
+	// TODO(rdaoud): iterate over all SyncGroups in all app DBs to get members.
+	// TODO(rdaoud): pending a new Syncbase API to access apps and database handles.
+
+	view.expiration = time.Now().Add(memberViewTTL)
+}
+
+// getMembers returns all SyncGroup members and the count of SyncGroups each one joined.
+func (s *syncService) getMembers() map[string]uint32 {
+	s.refreshMembersIfExpired()
+
+	members := make(map[string]uint32)
+	for member, info := range s.allMembers.members {
+		members[member] = uint32(len(info.gid2info))
+	}
+
+	return members
+}
+
+// Low-level utility functions to access DB entries without tracking their relationships.
+// Use the functions above to manipulate SyncGroups.
+
+// sgDataKey returns the key used to access the SyncGroup data entry.
+func sgDataKey(gid GroupId) string {
+	return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
+}
+
+// sgNameKey returns the key used to access the SyncGroup name entry.
+func sgNameKey(name string) string {
+	return util.JoinKeyParts(util.SyncPrefix, "sg", "n", name)
+}
+
+// hasSGDataEntry returns true if the SyncGroup data entry exists.
+func hasSGDataEntry(st store.StoreReader, gid GroupId) bool {
+	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
+	var sg SyncGroup
+	if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
+		return false
+	}
+	return true
+}
+
+// hasSGNameEntry returns true if the SyncGroup name entry exists.
+func hasSGNameEntry(st store.StoreReader, name string) bool {
+	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
+	var gid GroupId
+	if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
+		return false
+	}
+	return true
+}
+
+// setSGDataEntry stores the SyncGroup data entry.
+func setSGDataEntry(tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
+	return util.PutObject(tx, sgDataKey(gid), sg)
+}
+
+// setSGNameEntry stores the SyncGroup name entry.
+func setSGNameEntry(tx store.StoreReadWriter, name string, gid GroupId) error {
+	return util.PutObject(tx, sgNameKey(name), gid)
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID.
+func getSGDataEntry(st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+	var sg SyncGroup
+	if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
+		return nil, err
+	}
+	return &sg, nil
+}
+
+// getSGNameEntry retrieves the SyncGroup name to ID mapping.
+func getSGNameEntry(st store.StoreReader, name string) (GroupId, error) {
+	var gid GroupId
+	err := util.GetObject(st, sgNameKey(name), &gid)
+	return gid, err
+}
+
+// delSGDataEntry deletes the SyncGroup data entry.
+func delSGDataEntry(tx store.StoreReadWriter, gid GroupId) error {
+	return tx.Delete([]byte(sgDataKey(gid)))
+}
+
+// delSGNameEntry deletes the SyncGroup name to ID mapping.
+func delSGNameEntry(tx store.StoreReadWriter, name string) error {
+	return tx.Delete([]byte(sgNameKey(name)))
+}
+
 ////////////////////////////////////////////////////////////
 // SyncGroup methods between Client and Syncbase.
 
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 2434200..1bde00c 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -4,7 +4,27 @@
 
 package vsync
 
+import (
+	"v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+const (
+	NoGroupId = GroupId(0)
+)
+
 // syncData represents the persistent state of the sync module.
 type syncData struct {
 	Id int64
 }
+
+// GroupId is a globally unique SyncGroup ID.
+type GroupId uint64
+
+// SyncGroup contains the state of a SyncGroup object.
+type SyncGroup struct {
+	Id      GroupId                               // globally unique identifier generated by Syncbase
+	Name    string                                // globally unique Vanadium name chosen by app
+	Version string                                // "etag" for concurrency control
+	Spec    nosql.SyncGroupSpec                   // app-given specification
+	Joiners map[string]nosql.SyncGroupMemberInfo  // map of joiners to their metadata
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index cc15abe..9a9737c 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -10,6 +10,9 @@
 import (
 	// VDL system imports
 	"v.io/v23/vdl"
+
+	// VDL user imports
+	"v.io/syncbase/v23/services/syncbase/nosql"
 )
 
 // syncData represents the persistent state of the sync module.
@@ -22,6 +25,32 @@
 }) {
 }
 
+// GroupId is a globally unique SyncGroup ID.
+type GroupId uint64
+
+func (GroupId) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.GroupId"`
+}) {
+}
+
+// SyncGroup contains the state of a SyncGroup object.
+type SyncGroup struct {
+	Id      GroupId                              // globally unique identifier generated by Syncbase
+	Name    string                               // globally unique Vanadium name chosen by app
+	Version string                               // "etag" for concurrency control
+	Spec    nosql.SyncGroupSpec                  // app-given specification
+	Joiners map[string]nosql.SyncGroupMemberInfo // map of joiners to their metadata
+}
+
+func (SyncGroup) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.SyncGroup"`
+}) {
+}
+
 func init() {
 	vdl.Register((*syncData)(nil))
+	vdl.Register((*GroupId)(nil))
+	vdl.Register((*SyncGroup)(nil))
 }
+
+const NoGroupId = GroupId(0)