blob: ac25c2929ff255917c34cf7374204cb930dbe357 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Package vsync provides veyron sync daemon utility functions. Sync
// daemon serves incoming GetDeltas requests and contacts other peers
// to get deltas from them. When it receives a GetDeltas request, the
// incoming generation vector is diffed with the local generation
// vector, and missing generations are sent back. When it receives
// log records in response to a GetDeltas request, it replays those
// log records to get in sync with the sender.
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/stats"
)
var (
rng *rand.Rand
)
// Names of Sync stats entries.
const (
statsKvdbPath = "vsync/kvdb-path"
statsDevId = "vsync/dev-id"
statsNumDagObj = "vsync/num-dag-objects"
statsNumDagNode = "vsync/num-dag-nodes"
statsNumDagTx = "vsync/num-dag-tx"
statsNumDagPrivNode = "vsync/num-dag-privnodes"
statsNumSyncGroup = "vsync/num-syncgroups"
statsNumMember = "vsync/num-members"
)
// syncd contains the metadata for the sync daemon.
type syncd struct {
// Pointers to metadata structures.
log *iLog
devtab *devTable
dag *dag
sgtab *syncGroupTable
// Filesystem path to store metadata.
kvdbPath string
// Local device id.
id DeviceId
// Handle to the server to publish names to mount tables during syncgroup create/join.
server rpc.Server
// Map to track the set of names already published for this sync service.
names map[string]struct{}
// Relative name of the sync service to be advertised to the syncgroup server.
// This is built off the local device id.
relName string
// Authorizer for sync service.
auth security.Authorizer
// RWlock to concurrently access log and device table data structures.
lock sync.RWMutex
// Mutex to serialize syncgroup operations and an going
// initiation round. If both "lock" and "sgOp" need to be
// acquired, sgOp must be acquired first.
sgOp sync.Mutex
// Channel used by Sync responders to notify the SyncGroup refresher
// to fetch from SyncGroupServer updated info for some SyncGroups.
// sgRefresh chan *refreshRequest
// State to coordinate shutting down all spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
// Handlers for goroutine procedures.
// hdlGC *syncGC
hdlWatcher *syncWatcher
hdlInitiator *syncInitiator
// The context used for background activities.
ctx *context.T
}
func (o ObjId) String() string {
return string(o)
}
func (o ObjId) IsValid() bool {
return o != NoObjId
}
func init() {
rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
}
func NewVersion() Version {
for {
if v := Version(rng.Int63()); v != NoVersion {
return v
}
}
}
// NewSyncd creates a new syncd instance.
//
// Syncd concurrency: syncd initializes three goroutines at
// startup. The "watcher" thread is responsible for watching the store
// for changes to its objects. The "initiator" thread is responsible
// for periodically checking the neighborhood and contacting a peer to
// obtain changes from that peer. The "gc" thread is responsible for
// periodically checking if any log records and dag state can be
// pruned. All these 3 threads perform write operations to the data
// structures, and synchronize by acquiring a write lock on s.lock. In
// addition, when syncd receives an incoming RPC, it responds to the
// request by acquiring a read lock on s.lock. Thus, at any instant in
// time, either one of the watcher, initiator or gc threads is active,
// or any number of responders can be active, serving incoming
// requests. Fairness between these threads follows from
// sync.RWMutex. The spec says that the writers cannot be starved by
// the readers but it does not guarantee FIFO. We may have to revisit
// this in the future.
func NewSyncd(devid string, syncTick time.Duration, server rpc.Server, ctx *context.T) *syncd {
return newSyncdCore(devid, syncTick, server, ctx)
}
// newSyncdCore is the internal function that creates the Syncd
// structure and initilizes its thread (goroutines). It takes a
// Veyron Store parameter to separate the core of Syncd setup from the
// external dependency on Veyron Store.
func newSyncdCore(devid string, syncTick time.Duration,
server rpc.Server, ctx *context.T) *syncd {
// TODO(kash): Get this to compile
return nil
// s := &syncd{ctx: ctx}
// storePath := "/tmp"
// // If no authorizer is specified, allow access to all Principals.
// if s.auth = vflag.NewAuthorizerOrDie(); s.auth == nil {
// s.auth = allowEveryone{}
// }
// // Bootstrap my own DeviceId.
// s.id = DeviceId(devid)
// // Cache the server to publish names if required.
// s.server = server
// s.relName = devIDToName(s.id)
// s.names = make(map[string]struct{})
// var err error
// // Log init.
// if s.log, err = openILog(storePath+"/vsync_ilog.db", s); err != nil {
// vlog.Fatalf("newSyncd: openILog failed: err %v", err)
// }
// // DevTable init.
// if s.devtab, err = openDevTable(storePath+"/vsync_dtab.db", s); err != nil {
// vlog.Fatalf("newSyncd: openDevTable failed: err %v", err)
// }
// // Dag init.
// if s.dag, err = openDAG(storePath + "/vsync_dag.db"); err != nil {
// vlog.Fatalf("newSyncd: OpenDag failed: err %v", err)
// }
// // SyncGroupTable init.
// if s.sgtab, err = openSyncGroupTable(storePath + "/vsync_sgtab.db"); err != nil {
// vlog.Fatalf("newSyncd: openSyncGroupTable failed: err %v", err)
// }
// // Channel for responders to notify the SyncGroup refresher.
// // Give the channel some buffering to ease a burst of responders
// // while the refresher is still finishing a previous request.
// // s.sgRefresh = make(chan *refreshRequest, 4096)
// // Channel to propagate close event to all threads.
// s.closed = make(chan struct{})
// s.pending.Add(2)
// // Get deltas every peerSyncInterval.
// s.hdlInitiator = newInitiator(s, syncTick)
// go s.hdlInitiator.contactPeers()
// // // Garbage collect every garbageCollectInterval.
// // s.hdlGC = newGC(s)
// // go s.hdlGC.garbageCollect()
// // Start a watcher thread that will get updates from local store.
// s.hdlWatcher = newWatcher(s)
// go s.hdlWatcher.watchStore()
// // Start a thread to refresh SyncGroup info from SyncGroupServer.
// // go s.syncGroupRefresher()
// // Server stats.
// stats.NewString(statsKvdbPath).Set(s.kvdbPath)
// stats.NewString(statsDevId).Set(string(s.id))
// return s
}
// devIDToName converts a device ID to its relative name.
func devIDToName(id DeviceId) string {
return naming.Join([]string{"global", "vsync", string(id)}...)
}
// nameToDevId extracts the DeviceId from a name.
func (s *syncd) nameToDevId(name string) DeviceId {
parts := strings.Split(name, "/")
return DeviceId(parts[len(parts)-1])
}
// Close cleans up syncd state.
func (s *syncd) Close() {
close(s.closed)
s.pending.Wait()
stats.Delete(statsKvdbPath)
stats.Delete(statsDevId)
// TODO(hpucha): close without flushing.
}
// isSyncClosing returns true if Close() was called i.e. the "closed" channel is closed.
func (s *syncd) isSyncClosing() bool {
select {
case <-s.closed:
return true
default:
return false
}
}
// GetDeltas responds to the incoming request from a client by sending missing generations to the client.
func (s *syncd) GetDeltas(call SyncGetDeltasServerCall, in map[ObjId]GenVector,
syncGroups map[ObjId]GroupIdSet, clientID DeviceId) (map[ObjId]GenVector, error) {
vlog.VI(1).Infof("GetDeltas:: Received vector %v from client %s", in, clientID)
// Handle misconfiguration: the client cannot have the same ID as this node.
if clientID == s.id {
vlog.VI(1).Infof("GetDeltas:: impostor alert: client ID %s is the same as mine %s", clientID, s.id)
return nil, fmt.Errorf("impostor: cannot be %s, for this node is %s", clientID, s.id)
}
out := make(map[ObjId]GenVector)
for sr, gvIn := range in {
gvOut, gens, gensInfo, err := s.prepareGensToReply(call, sr, syncGroups[sr], gvIn)
if err != nil {
vlog.Errorf("GetDeltas:: prepareGensToReply for sr %s failed with err %v",
sr.String(), err)
continue
}
for pos, v := range gens {
gen := gensInfo[pos]
var count uint64
for i := SeqNum(0); i <= gen.MaxSeqNum; i++ {
count++
rec, err := s.getLogRec(v.devID, v.srID, v.genID, i)
if err != nil {
vlog.Fatalf("GetDeltas:: Couldn't get log record %s %s %d %d, err %v",
v.devID, v.srID.String(), v.genID, i, err)
}
vlog.VI(1).Infof("Sending log record %v", rec)
if err := call.SendStream().Send(*rec); err != nil {
vlog.Errorf("GetDeltas:: Couldn't send stream err: %v", err)
return nil, err
}
}
if count != gen.Count {
vlog.Fatalf("GetDeltas:: GenMetadata has incorrect log records for generation %s %s %d %v",
v.devID, v.srID.String(), v.genID, gen)
}
}
out[sr] = gvOut
}
// Notify the SyncGroup Refresher about the SyncGroup IDs associated
// with the SyncRoots in the incoming request from the client.
// Note: the initial slice capacity is only a lower-bound.
// Note: by using the SyncRoots from the incoming "in" map and all
// all SyncGroup IDs for each SyncRoot, the refresh is eager because
// it does not filter out SyncRoots and SyncGroup IDs by the Permissions.
// sgIDs := make([]syncgroup.Id, 0, len(in))
// for sr := range in {
// for _, id := range syncGroups[sr] {
// sgIDs = append(sgIDs, id)
// }
// }
// s.notifySyncGroupRefresher(devIDToName(clientID), sgIDs)
return out, nil
}
// // updateDeviceInfo updates the remote device's information based on
// // the incoming GetDeltas request.
// func (s *syncd) updateDeviceInfo(ClientID DeviceId, In GenVector) error {
// s.lock.Lock()
// defer s.lock.Unlock()
// // Note that the incoming client generation vector cannot be
// // used for garbage collection. We can only garbage collect
// // based on the generations we receive from other
// // devices. Receiving a set of generations assures that all
// // updates branching from those generations are also received
// // and hence generations present on all devices can be
// // GC'ed. This function sends generations to other devices and
// // hence does not use the generation vector for GC.
// //
// // TODO(hpucha): Cache the client's incoming generation vector
// // to assist in tracking missing generations and hence next
// // peer to contact.
// if !s.devtab.hasDevInfo(ClientID) {
// if err := s.devtab.addDevice(ClientID); err != nil {
// return err
// }
// }
// return nil
// }
// prepareGensToReply verifies if the requestor has access to the
// requested syncroot. If allowed, it processes the incoming
// generation vector and returns the metadata of all the missing
// generations between the incoming and the local generation vector
// for that syncroot.
func (s *syncd) prepareGensToReply(call rpc.ServerCall, srid ObjId, sgVec GroupIdSet, In GenVector) (GenVector, []*genOrder, []*genMetadata, error) {
s.lock.RLock()
defer s.lock.RUnlock()
// Respond to the caller if any of the syncgroups
// corresponding to the syncroot allow access.
var allowedErr error
// for _, sg := range sgVec {
// // Check permissions for the syncgroup.
// sgData, err := s.sgtab.getSyncGroupByID(sg)
// if err != nil {
// return GenVector{}, nil, nil, err
// }
// if vlog.V(2) {
// b, _ := ctx.RemoteBlessings().ForContext(ctx)
// vlog.Infof("prepareGensToReply:: matching acl %v, remote names %v", sgData.SrvInfo.Config.Permissions, b)
// }
// // TODO(ashankar): Consider changing TaggedPermissionsAuthorizer so that it doesn't return an error -
// // instead of an error, it just locks down access completely. The error condition is really
// // really rare and doing so will make for shorter code?
// auth, err := access.TaggedPermissionsAuthorizer(sgData.SrvInfo.Config.Permissions, access.TypicalTagType())
// if err != nil {
// vlog.Errorf("unable to create authorizer: %v", err)
// allowedErr = fmt.Errorf("server error: authorization policy misconfigured")
// } else if allowedErr = auth.Authorize(ctx); allowedErr == nil {
// break
// }
// }
if allowedErr != nil {
return GenVector{}, nil, nil, allowedErr
}
// // TODO(hpucha): Need only for GC.
// if err := s.updateDeviceInfo(ClientID, gvIn); err != nil {
// vlog.Fatalf("GetDeltas:: updateDeviceInfo failed with err %v", err)
// }
// Get local generation vector.
out, err := s.devtab.getGenVec(s.id, srid)
if err != nil {
return GenVector{}, nil, nil, err
}
// Diff the two generation vectors.
gens, err := s.devtab.diffGenVectors(out, In, srid)
if err != nil {
return GenVector{}, nil, nil, err
}
// Get the metadata for all the generations in the reply.
gensInfo := make([]*genMetadata, len(gens))
for pos, v := range gens {
gen, err := s.log.getGenMetadata(v.devID, v.srID, v.genID)
if err != nil || gen.Count <= 0 {
return GenVector{}, nil, nil, err
}
gensInfo[pos] = gen
}
return out, gens, gensInfo, nil
}
// getLogRec gets the log record for a given generation and lsn.
func (s *syncd) getLogRec(dev DeviceId, srid ObjId, gen GenId, lsn SeqNum) (*LogRec, error) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.log.getLogRec(dev, srid, gen, lsn)
}
// // GetSyncGroupStats gives the client high-level information on all SyncGroups.
// // The streamed information is returned sorted by SyncGroup name.
// func (s *syncd) GetSyncGroupStats(ctx SyncGetSyncGroupStatsContext) error {
// // Get a snapshot of the SyncGroup names.
// // The names are a copy, OK to unlock here.
// s.lock.RLock()
// names, err := s.sgtab.getAllSyncGroupNames()
// s.lock.RUnlock()
// if err != nil {
// vlog.Errorf("GetSyncGroupStats:: cannot get SyncGroup names: %v", err)
// return err
// }
// if len(names) == 0 {
// return nil
// }
// // Send the SyncGroup stats in the stream in sorted order.
// // Skip SyncGroups that got deleted in the meanwhile.
// sort.Strings(names)
// for _, name := range names {
// // The SyncGroup object is a copy, OK to unlock here.
// s.lock.RLock()
// sg, err := s.sgtab.getSyncGroupByName(name)
// s.lock.RUnlock()
// if err != nil {
// continue // skip deleted SyncGroup
// }
// stats := SyncGroupStats{
// Name: name,
// SGObjId: sg.SrvInfo.SGObjId,
// Path: sg.LocalPath,
// RootObjId: ObjId(sg.SrvInfo.RootObjId),
// NumJoiners: uint32(len(sg.SrvInfo.Joiners)),
// }
// if err = ctx.SendStream().Send(stats); err != nil {
// vlog.Errorf("GetSyncGroupStats:: cannot send stream data: %v", err)
// return err
// }
// }
// return nil
// }
// // GetSyncGroupMembers gives the client information on SyncGroup members.
// // If SyncGroup names are specified, only their member information is sent.
// // Otherwise information on members from all SyncGroups is sent.
// func (s *syncd) GetSyncGroupMembers(ctx SyncGetSyncGroupMembersContext, sgNames []string) error {
// // If SyncGroup names are given, fetch their IDs.
// // Also get a snapshot of the SyncGroup members.
// wantedGroupIds := make(map[syncgroup.Id]bool)
// s.lock.RLock()
// for _, name := range sgNames {
// sgid, err := s.sgtab.getSyncGroupID(name)
// if err != nil {
// s.lock.RUnlock()
// vlog.Errorf("GetSyncGroupMembers:: invalid SyncGroup %s: %v", name, err)
// return err
// }
// wantedGroupIds[sgid] = true
// }
// memberMap, err := s.sgtab.getMembers() // The map is a copy, OK to unlock here.
// s.lock.RUnlock()
// if err != nil {
// vlog.Errorf("GetSyncGroupMembers:: cannot get members: %v", err)
// return err
// }
// if len(memberMap) == 0 {
// return nil
// }
// // Send the member information in the stream in sorted order.
// // Skip members that left in the meanwhile.
// // If SyncGroup names are given, only return members in these groups.
// members := make([]string, 0, len(memberMap))
// for member := range memberMap {
// members = append(members, member)
// }
// sort.Strings(members)
// for _, member := range members {
// // The member info points to in-memory data; cannot unlock
// // while accessing that info.
// s.lock.RLock()
// info, err := s.sgtab.getMemberInfo(member)
// if err != nil {
// s.lock.RUnlock()
// continue // skip member no longer there
// }
// replyData := make([]SyncGroupMember, 0, len(info.gids))
// for sgid, meta := range info.gids {
// if len(wantedGroupIds) > 0 && !wantedGroupIds[sgid] {
// continue // skip unwanted SyncGroups
// }
// data := SyncGroupMember{
// Name: member,
// SGObjId: sgid,
// Metadata: meta.metaData,
// }
// replyData = append(replyData, data)
// }
// s.lock.RUnlock()
// for _, data := range replyData {
// if err = ctx.SendStream().Send(data); err != nil {
// vlog.Errorf("GetSyncGroupMembers:: cannot send stream data: %v", err)
// return err
// }
// }
// }
// return nil
// }
// Dump writes to the Sync log internal information used for debugging.
func (s *syncd) Dump(call rpc.ServerCall) error {
s.lock.RLock()
defer s.lock.RUnlock()
vlog.VI(1).Infof("Dump: ==== BEGIN ====")
s.devtab.dump()
// s.sgtab.dump()
s.log.dump()
s.dag.dump()
vlog.VI(1).Infof("Dump: ==== END ====")
return nil
}
// GetDeviceStats gives the client information on devices that have synchronized
// with the server.
func (s *syncd) GetDeviceStats(call SyncGetDeviceStatsServerCall) error {
// Get a snapshot of the device IDs.
// The device IDs are copies, OK to unlock here.
s.lock.RLock()
myID := s.id
devIDs, err := s.devtab.getDeviceIds()
s.lock.RUnlock()
if err != nil {
vlog.Errorf("GetDeviceStats:: cannot get device IDs: %v", err)
return err
}
// Send the device stats in the stream.
for _, id := range devIDs {
// The device info is a copy, OK to unlock here.
s.lock.RLock()
info, err := s.devtab.getDevInfo(id)
s.lock.RUnlock()
if err != nil {
continue // skip unknown devices
}
stats := DeviceStats{
DevId: id,
LastSync: info.Ts.Unix(),
GenVectors: info.Vectors,
IsSelf: id == myID,
}
if err = call.SendStream().Send(stats); err != nil {
vlog.Errorf("GetDeviceStats:: cannot send stream data: %v", err)
return err
}
}
return nil
}
// GetObjectHistory gives the client the mutation history of a store object.
// It also returns the object version that is at the "head" of the graph (DAG).
func (s *syncd) GetObjectHistory(call SyncGetObjectHistoryServerCall, oid ObjId) (Version, error) {
// Get the object version numbers in reverse chronological order,
// starting from the "head" version back through its ancestors.
// Also get the DAG parents of each object version.
var versions []Version
parents := make(map[Version][]Version)
s.lock.RLock()
head, err := s.dag.getHead(oid)
if err != nil {
// For now ignore objects that do not yet have a "head" version.
// They are either not part of any SyncGroup, or this Sync server
// has not finished catching up (Sync or Watch operations).
vlog.Errorf("GetObjectHistory:: unknown object %v: %v", oid, err)
s.lock.RUnlock()
return NoVersion, err
}
s.dag.ancestorIter(oid, []Version{head},
func(oid ObjId, v Version, node *dagNode) error {
versions = append(versions, v)
parents[v] = node.Parents
return nil
})
s.lock.RUnlock()
// Stream the object log records in the given order.
for _, v := range versions {
// The log record is a copy, OK to unlock here.
s.lock.RLock()
rec, err := s.hdlInitiator.getLogRec(oid, v)
s.lock.RUnlock()
if err != nil {
vlog.Errorf("GetObjectHistory:: cannot get log record for %v:%v: %v", oid, v, err)
return NoVersion, err
}
// To give the client a full DAG, send the parents as seen in
// the DAG node instead of the ones in the log record.
// Note: DAG nodes contain the full sets of parents whereas
// log records may have subsets due to the different types of
// records: NodeRec-Parents + LinkRec-Parent == DAG-Parents.
rec.Parents = parents[v]
if err = call.SendStream().Send(*rec); err != nil {
vlog.Errorf("GetObjectHistory:: cannot send stream data: %v", err)
return NoVersion, err
}
}
return head, nil
}
// allowEveryone implements the authorization policy of ... allowing every principal.
type allowEveryone struct{}
func (allowEveryone) Authorize(security.Call) error { return nil }