blob: 41818c155247d683ecb590b1d8fe572a7227a0bb [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// The SyncGroup Table stores the group information in a K/V DB. It also
// maintains an index to provide access by SyncGroup ID or name.
//
// The SyncGroup info is fetched from the SyncGroup server by the create or
// join operations, and is regularly updated after that.
//
// The DB contains two tables persisted to disk (data, names) and one
// in-memory (ephemeral) map (members):
// * data: one entry per SyncGroup ID containing the SyncGroup data
// * names: one entry per SyncGroup name pointing to its SyncGroup ID
// * members: an inverted index of SyncGroup members to SyncGroup IDs
// built from the list of SyncGroup joiners
import (
"errors"
"fmt"
"path"
"strconv"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/stats"
)
var (
errBadSGTable = errors.New("invalid SyncGroup Table")
)
type syncGroupTable struct {
fname string // file pathname
store *kvdb // underlying K/V store
sgData *kvtable // pointer to "data" table in the kvdb
sgNames *kvtable // pointer to "names" table in the kvdb
members map[string]*memberInfo // in-memory tracking of SyncGroup member info
// SyncGroup Table stats
numSGs *stats.Integer // number of SyncGroups
numMembers *stats.Integer // number of Sync members
}
type syncGroupData struct {
SrvInfo SyncGroupInfo // SyncGroup info from SyncGroupServer
LocalPath string // local path of the SyncGroup in the Store
}
type memberInfo struct {
gids map[GroupId]*memberMetaData // map of SyncGroup IDs joined and their metadata
}
type memberMetaData struct {
metaData JoinerMetaData // joiner metadata at the SyncGroup server
}
type sgSet map[GroupId]struct{} // a set of SyncGroups
// strToGroupId converts a SyncGroup ID in string format to an GroupId.
func strToGroupId(str string) (GroupId, error) {
id, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return NoGroupId, err
}
return GroupId(id), nil
}
// openSyncGroupTable opens or creates a syncGroupTable for the given filename.
func openSyncGroupTable(filename string) (*syncGroupTable, error) {
// Open the file and create it if it does not exist.
// Also initialize the store and its tables.
db, tbls, err := kvdbOpen(filename, []string{"data", "names"})
if err != nil {
return nil, err
}
s := &syncGroupTable{
fname: filename,
store: db,
sgData: tbls[0],
sgNames: tbls[1],
members: make(map[string]*memberInfo),
numSGs: stats.NewInteger(statsNumSyncGroup),
numMembers: stats.NewInteger(statsNumMember),
}
// Reconstruct the in-memory tracking maps by iterating over the SyncGroups.
// This is needed when an existing SyncGroup Table file is re-opened.
s.sgData.keyIter(func(gidStr string) {
// Get the SyncGroup data given the group ID in string format (as the data table key).
gid, err := strToGroupId(gidStr)
if err != nil {
return
}
data, err := s.getSyncGroupByID(gid)
if err != nil {
return
}
s.numSGs.Incr(1)
// Add all SyncGroup members to the members inverted index.
s.addAllMembers(data)
})
return s, nil
}
// close closes the syncGroupTable and invalidates its structure.
func (s *syncGroupTable) close() {
if s.store != nil {
s.store.close() // this also closes the tables
stats.Delete(statsNumSyncGroup)
stats.Delete(statsNumMember)
}
*s = syncGroupTable{} // zero out the structure
}
// flush flushes the syncGroupTable store to disk.
func (s *syncGroupTable) flush() {
if s.store != nil {
s.store.flush()
}
}
// addSyncGroup adds a new SyncGroup given its information.
func (s *syncGroupTable) addSyncGroup(sgData *syncGroupData) error {
if s.store == nil {
return errBadSGTable
}
if sgData == nil {
return errors.New("group information not specified")
}
gid, name := sgData.SrvInfo.Id, path.Join(sgData.SrvInfo.ServerName, sgData.SrvInfo.GroupName)
if name == "" {
return errors.New("group name not specified")
}
if sgData.LocalPath == "" {
return errors.New("group local path not specified")
}
if len(sgData.SrvInfo.Joiners) == 0 {
return errors.New("group has no joiners")
}
if s.hasSGDataEntry(gid) {
return fmt.Errorf("group %d already exists", gid)
}
if s.hasSGNameEntry(name) {
return fmt.Errorf("group name %s already exists", name)
}
// Add the group name and data entries.
if err := s.setSGNameEntry(name, gid); err != nil {
return err
}
if err := s.setSGDataEntry(gid, sgData); err != nil {
s.delSGNameEntry(name)
return err
}
s.numSGs.Incr(1)
s.addAllMembers(sgData)
return nil
}
// getSyncGroupID retrieves the SyncGroup ID given its name.
func (s *syncGroupTable) getSyncGroupID(name string) (GroupId, error) {
return s.getSGNameEntry(name)
}
// getSyncGroupName retrieves the SyncGroup name given its ID.
func (s *syncGroupTable) getSyncGroupName(gid GroupId) (string, error) {
data, err := s.getSyncGroupByID(gid)
if err != nil {
return "", err
}
return path.Join(data.SrvInfo.ServerName, data.SrvInfo.GroupName), nil
}
// getSyncGroupByID retrieves the SyncGroup given its ID.
func (s *syncGroupTable) getSyncGroupByID(gid GroupId) (*syncGroupData, error) {
return s.getSGDataEntry(gid)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
func (s *syncGroupTable) getSyncGroupByName(name string) (*syncGroupData, error) {
gid, err := s.getSyncGroupID(name)
if err != nil {
return nil, err
}
return s.getSyncGroupByID(gid)
}
// updateSyncGroup updates the SyncGroup data.
func (s *syncGroupTable) updateSyncGroup(data *syncGroupData) error {
if s.store == nil {
return errBadSGTable
}
if data == nil {
return errors.New("SyncGroup data not specified")
}
if data.SrvInfo.GroupName == "" {
return errors.New("group name not specified")
}
if len(data.SrvInfo.Joiners) == 0 {
return errors.New("group has no joiners")
}
fullGroupName := path.Join(data.SrvInfo.ServerName, data.SrvInfo.GroupName)
oldData, err := s.getSyncGroupByName(fullGroupName)
if err != nil {
return err
}
if data.SrvInfo.Id != oldData.SrvInfo.Id {
return fmt.Errorf("cannot change ID of SyncGroup name %s", fullGroupName)
}
if data.LocalPath == "" {
data.LocalPath = oldData.LocalPath
} else if data.LocalPath != oldData.LocalPath {
return fmt.Errorf("cannot change local path of SyncGroup name %s", fullGroupName)
}
// Get the old set of SyncGroup joiners and diff it with the new set.
// Add all the current members because this inserts the new members and
// updates the metadata of the existing ones (addMember() is like a "put").
// Delete the members that are no longer part of the SyncGroup.
gid := oldData.SrvInfo.Id
newJoiners, oldJoiners := data.SrvInfo.Joiners, oldData.SrvInfo.Joiners
for member, memberData := range newJoiners {
s.addMember(member, gid, memberData)
}
for member := range oldJoiners {
if _, ok := newJoiners[member]; !ok {
s.delMember(member, gid)
}
}
return s.setSGDataEntry(gid, data)
}
// delSyncGroupByID deletes the SyncGroup given its ID.
func (s *syncGroupTable) delSyncGroupByID(gid GroupId) error {
data, err := s.getSyncGroupByID(gid)
if err != nil {
return err
}
if err = s.delSGNameEntry(path.Join(data.SrvInfo.ServerName, data.SrvInfo.GroupName)); err != nil {
return err
}
s.numSGs.Incr(-1)
s.delAllMembers(data)
return s.delSGDataEntry(gid)
}
// delSyncGroupByName deletes the SyncGroup given its name.
func (s *syncGroupTable) delSyncGroupByName(name string) error {
gid, err := s.getSyncGroupID(name)
if err != nil {
return err
}
return s.delSyncGroupByID(gid)
}
// getAllSyncGroupNames returns the names of all SyncGroups.
func (s *syncGroupTable) getAllSyncGroupNames() ([]string, error) {
if s.store == nil {
return nil, errBadSGTable
}
names := make([]string, 0)
err := s.sgNames.keyIter(func(name string) {
names = append(names, name)
})
if err != nil {
return nil, err
}
return names, nil
}
// getMembers returns all SyncGroup members and the count of SyncGroups each one joined.
func (s *syncGroupTable) getMembers() (map[string]uint32, error) {
if s.store == nil {
return nil, errBadSGTable
}
members := make(map[string]uint32)
for member, info := range s.members {
members[member] = uint32(len(info.gids))
}
return members, nil
}
// getMemberInfo returns SyncGroup information for a given member.
func (s *syncGroupTable) getMemberInfo(member string) (*memberInfo, error) {
if s.store == nil {
return nil, errBadSGTable
}
info, ok := s.members[member]
if !ok {
return nil, fmt.Errorf("unknown member: %s", member)
}
return info, nil
}
// addMember inserts or updates a (member, group ID) entry in the in-memory
// structure that indexes SyncGroup memberships based on member names and stores
// in it the member's joiner metadata.
func (s *syncGroupTable) addMember(member string, gid GroupId, metadata JoinerMetaData) {
if s.store == nil {
return
}
info, ok := s.members[member]
if !ok {
info = &memberInfo{gids: make(map[GroupId]*memberMetaData)}
s.members[member] = info
s.numMembers.Incr(1)
}
info.gids[gid] = &memberMetaData{metaData: metadata}
}
// delMember removes a (member, group ID) entry from the in-memory structure
// that indexes SyncGroup memberships based on member names.
func (s *syncGroupTable) delMember(member string, gid GroupId) {
if s.store == nil {
return
}
info, ok := s.members[member]
if !ok {
return
}
delete(info.gids, gid)
if len(info.gids) == 0 {
delete(s.members, member)
s.numMembers.Incr(-1)
}
}
// addAllMembers inserts all members of a SyncGroup in the in-memory structure
// that indexes SyncGroup memberships based on member names.
func (s *syncGroupTable) addAllMembers(data *syncGroupData) {
if s.store == nil || data == nil {
return
}
gid := data.SrvInfo.Id
for member, memberData := range data.SrvInfo.Joiners {
s.addMember(member, gid, memberData)
}
}
// delAllMembers removes all members of a SyncGroup from the in-memory structure
// that indexes SyncGroup memberships based on member names.
func (s *syncGroupTable) delAllMembers(data *syncGroupData) {
if s.store == nil || data == nil {
return
}
gid := data.SrvInfo.Id
for member := range data.SrvInfo.Joiners {
s.delMember(member, gid)
}
}
// Low-level functions to access the tables in the K/V DB.
// They directly access the table entries without tracking their relationships.
// sgDataKey returns the key used to access the SyncGroup data in the DB.
func sgDataKey(gid GroupId) string {
return fmt.Sprintf("%d", gid)
}
// hasSGDataEntry returns true if the SyncGroup data entry exists in the DB.
func (s *syncGroupTable) hasSGDataEntry(gid GroupId) bool {
if s.store == nil {
return false
}
key := sgDataKey(gid)
return s.sgData.hasKey(key)
}
// setSGDataEntry stores the SyncGroup data in the DB.
func (s *syncGroupTable) setSGDataEntry(gid GroupId, data *syncGroupData) error {
if s.store == nil {
return errBadSGTable
}
key := sgDataKey(gid)
return s.sgData.set(key, data)
}
// getSGDataEntry retrieves from the DB the SyncGroup data for a given group ID.
func (s *syncGroupTable) getSGDataEntry(gid GroupId) (*syncGroupData, error) {
if s.store == nil {
return nil, errBadSGTable
}
var data syncGroupData
key := sgDataKey(gid)
if err := s.sgData.get(key, &data); err != nil {
return nil, err
}
return &data, nil
}
// delSGDataEntry deletes the SyncGroup data from the DB.
func (s *syncGroupTable) delSGDataEntry(gid GroupId) error {
if s.store == nil {
return errBadSGTable
}
key := sgDataKey(gid)
return s.sgData.del(key)
}
// sgNameKey returns the key used to access the SyncGroup name in the DB.
func sgNameKey(name string) string {
return name
}
// hasSGNameEntry returns true if the SyncGroup name entry exists in the DB.
func (s *syncGroupTable) hasSGNameEntry(name string) bool {
if s.store == nil {
return false
}
key := sgNameKey(name)
return s.sgNames.hasKey(key)
}
// setSGNameEntry stores the SyncGroup name to ID mapping in the DB.
func (s *syncGroupTable) setSGNameEntry(name string, gid GroupId) error {
if s.store == nil {
return errBadSGTable
}
key := sgNameKey(name)
return s.sgNames.set(key, gid)
}
// getSGNameEntry retrieves the SyncGroup name to ID mapping from the DB.
func (s *syncGroupTable) getSGNameEntry(name string) (GroupId, error) {
var gid GroupId
if s.store == nil {
return gid, errBadSGTable
}
key := sgNameKey(name)
err := s.sgNames.get(key, &gid)
return gid, err
}
// delSGNameEntry deletes the SyncGroup name to ID mapping from the DB.
func (s *syncGroupTable) delSGNameEntry(name string) error {
if s.store == nil {
return errBadSGTable
}
key := sgNameKey(name)
return s.sgNames.del(key)
}
// dump writes to the log file information on all SyncGroups.
func (s *syncGroupTable) dump() {
if s.store == nil {
return
}
s.sgData.keyIter(func(gidStr string) {
// Get the SyncGroup data given the group ID in string format (as the data table key).
gid, err := strToGroupId(gidStr)
if err != nil {
return
}
data, err := s.getSyncGroupByID(gid)
if err != nil {
return
}
members := make([]string, 0, len(data.SrvInfo.Joiners))
for joiner := range data.SrvInfo.Joiners {
members = append(members, joiner)
}
vlog.VI(1).Infof("DUMP: SyncGroup %s: id %v, path %s, members: %s",
path.Join(data.SrvInfo.ServerName, data.SrvInfo.GroupName),
gid, data.LocalPath, members)
})
}