blob: 847fcb56c42f6de9e036f09c3b7df77384c07847 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// SyncGroup management and storage in Syncbase. Handles the lifecycle
// of SyncGroups (create, join, leave, etc.) and their persistence as
// sync metadata in the application databases. Provides helper functions
// to the higher levels of sync (Initiator, Watcher) to get membership
// information and map key/value changes to their matching SyncGroups.
import (
"fmt"
"time"
"v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
)
var (
// memberViewTTL is the shelf-life of the aggregate view of SyncGroup members.
memberViewTTL = 2 * time.Second
)
////////////////////////////////////////////////////////////
// SyncGroup management internal to Syncbase.
// memberView holds an aggregated view of all SyncGroup members across
// databases. The view is not coherent, it gets refreshed according to a
// configured TTL and not (coherently) when SyncGroup membership is updated in
// the various databases. It is needed by the sync Initiator, which must select
// a peer to contact from a global view of all SyncGroup members gathered from
// all databases. This is why a slightly stale view is acceptable.
// The members are identified by their Vanadium names (map keys).
type memberView struct {
expiration time.Time
members map[string]*memberInfo
}
// memberInfo holds the member metadata for each SyncGroup this member belongs
// to.
type memberInfo struct {
gid2info map[GroupId]nosql.SyncGroupMemberInfo
}
// newSyncGroupVersion generates a random SyncGroup version ("etag").
func newSyncGroupVersion() string {
return fmt.Sprintf("%x", rng.Int63())
}
// addSyncGroup adds a new SyncGroup given its information.
func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *SyncGroup) error {
_ = tx.(store.Transaction)
if sg == nil {
return verror.New(verror.ErrBadArg, ctx, "group information not specified")
}
if sg.Name == "" {
return verror.New(verror.ErrBadArg, ctx, "group name not specified")
}
if sg.Id == NoGroupId {
return verror.New(verror.ErrBadArg, ctx, "group ID not specified")
}
if sg.Version == "" {
return verror.New(verror.ErrBadArg, ctx, "group version not specified")
}
if len(sg.Joiners) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
}
if len(sg.Spec.Prefixes) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
if hasSGDataEntry(tx, sg.Id) {
return verror.New(verror.ErrBadArg, ctx, "group id already exists")
}
if hasSGNameEntry(tx, sg.Name) {
return verror.New(verror.ErrBadArg, ctx, "group name already exists")
}
// Add the group name and data entries.
if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
return err
}
if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
return err
}
return nil
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
return getSGNameEntry(ctx, st, name)
}
// getSyncGroupName retrieves the SyncGroup name given its ID.
func getSyncGroupName(ctx *context.T, st store.StoreReader, gid GroupId) (string, error) {
sg, err := getSyncGroupById(ctx, st, gid)
if err != nil {
return "", err
}
return sg.Name, nil
}
// getSyncGroupById retrieves the SyncGroup given its ID.
func getSyncGroupById(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
return getSGDataEntry(ctx, st, gid)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*SyncGroup, error) {
gid, err := getSyncGroupId(ctx, st, name)
if err != nil {
return nil, err
}
return getSyncGroupById(ctx, st, gid)
}
// delSyncGroupById deletes the SyncGroup given its ID.
func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
_ = tx.(store.Transaction)
sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
}
if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
return err
}
return delSGDataEntry(ctx, tx, sg.Id)
}
// delSyncGroupByName deletes the SyncGroup given its name.
func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
_ = tx.(store.Transaction)
gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
}
return delSyncGroupById(ctx, tx, gid)
}
// refreshMembersIfExpired updates the aggregate view of SyncGroup members
// across databases if the view has expired.
// TODO(rdaoud): track dirty apps/dbs since the last refresh and incrementally
// update the membership view for them instead of always scanning all of them.
func (s *syncService) refreshMembersIfExpired(ctx *context.T) {
view := s.allMembers
if view == nil {
// The empty expiration time in Go is before "now" and treated as expired
// below.
view = &memberView{expiration: time.Time{}, members: make(map[string]*memberInfo)}
s.allMembers = view
}
if time.Now().Before(view.expiration) {
return
}
// Create a new aggregate view of SyncGroup members across all app databases.
newMembers := make(map[string]*memberInfo)
scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
s.forEachDatabaseStore(ctx, func(st store.Store) bool {
// For each database, fetch its SyncGroup data entries by scanning their
// prefix range. Use a database snapshot for the scan.
sn := st.NewSnapshot()
defer sn.Close()
stream := sn.Scan(scanStart, scanLimit)
for stream.Advance() {
var sg SyncGroup
if vom.Decode(stream.Value(nil), &sg) != nil {
vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
continue
}
// Add all members of this SyncGroup to the membership view.
// A member's info is different across SyncGroups, so gather all of them.
for member, info := range sg.Joiners {
if _, ok := newMembers[member]; !ok {
newMembers[member] = &memberInfo{
gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
}
}
newMembers[member].gid2info[sg.Id] = info
}
}
return false
})
view.members = newMembers
view.expiration = time.Now().Add(memberViewTTL)
}
// getMembers returns all SyncGroup members and the count of SyncGroups each one
// joined.
func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
s.refreshMembersIfExpired(ctx)
members := make(map[string]uint32)
for member, info := range s.allMembers.members {
members[member] = uint32(len(info.gid2info))
}
return members
}
// Low-level utility functions to access DB entries without tracking their
// relationships.
// Use the functions above to manipulate SyncGroups.
// sgDataKeyScanPrefix returns the prefix used to scan SyncGroup data entries.
func sgDataKeyScanPrefix() string {
return util.JoinKeyParts(util.SyncPrefix, "sg", "d")
}
// sgDataKey returns the key used to access the SyncGroup data entry.
func sgDataKey(gid GroupId) string {
return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
}
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
return util.JoinKeyParts(util.SyncPrefix, "sg", "n", name)
}
// hasSGDataEntry returns true if the SyncGroup data entry exists.
func hasSGDataEntry(st store.StoreReader, gid GroupId) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var sg SyncGroup
if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
return false
}
return true
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
func hasSGNameEntry(st store.StoreReader, name string) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var gid GroupId
if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
return false
}
return true
}
// setSGDataEntry stores the SyncGroup data entry.
func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
_ = tx.(store.Transaction)
if err := util.PutObject(tx, sgDataKey(gid), sg); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
// setSGNameEntry stores the SyncGroup name entry.
func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid GroupId) error {
_ = tx.(store.Transaction)
if err := util.PutObject(tx, sgNameKey(name), gid); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
// getSGDataEntry retrieves the SyncGroup data for a given group ID.
func getSGDataEntry(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
var sg SyncGroup
if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
return nil, verror.New(verror.ErrInternal, ctx, err)
}
return &sg, nil
}
// getSGNameEntry retrieves the SyncGroup name to ID mapping.
func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
var gid GroupId
if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
return gid, verror.New(verror.ErrInternal, ctx, err)
}
return gid, nil
}
// delSGDataEntry deletes the SyncGroup data entry.
func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
_ = tx.(store.Transaction)
if err := tx.Delete([]byte(sgDataKey(gid))); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
// delSGNameEntry deletes the SyncGroup name to ID mapping.
func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
_ = tx.(store.Transaction)
if err := tx.Delete([]byte(sgNameKey(name))); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
////////////////////////////////////////////////////////////
// SyncGroup methods between Client and Syncbase.
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
func (s *syncService) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
return verror.NewErrNotImplemented(ctx)
}
func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
return verror.NewErrNotImplemented(ctx)
}