| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Package vsync provides sync functionality for Syncbase. Sync |
| // service serves incoming GetDeltas requests and contacts other peers |
| // to get deltas from them. When it receives a GetDeltas request, the |
| // incoming generation vector is diffed with the local generation |
| // vector, and missing generations are sent back. When it receives log |
| // records in response to a GetDeltas request, it replays those log |
| // records to get in sync with the sender. |
| import ( |
| "container/list" |
| "fmt" |
| "math/rand" |
| "path" |
| "sync" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/discovery" |
| "v.io/v23/naming" |
| "v.io/v23/rpc" |
| "v.io/v23/security/access" |
| wire "v.io/v23/services/syncbase" |
| "v.io/v23/syncbase" |
| "v.io/v23/verror" |
| "v.io/x/lib/vlog" |
| idiscovery "v.io/x/ref/lib/discovery" |
| "v.io/x/ref/services/syncbase/common" |
| blob "v.io/x/ref/services/syncbase/localblobstore" |
| fsblob "v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore" |
| "v.io/x/ref/services/syncbase/server/interfaces" |
| "v.io/x/ref/services/syncbase/store" |
| "v.io/x/ref/services/syncbase/vclock" |
| ) |
| |
| // syncService contains the metadata for the sync module. |
| type syncService struct { |
| // TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 |
| // bits. Another alternative is to derive this from the blessing of |
| // Syncbase. Syncbase can append a uuid to the blessing it is given upon |
| // launch and use its hash as id. Note we cannot use public key since we |
| // want to support key rollover. |
| id uint64 // globally unique id for this instance of Syncbase. |
| name string // name derived from the global id. |
| sv interfaces.Service |
| |
| // Root context. Used for example to create a context for advertising |
| // over neighborhood. |
| ctx *context.T |
| |
| // Random number generator for this Sync service. |
| rng *rand.Rand |
| rngLock sync.Mutex |
| |
| // State to coordinate shutdown of spawned goroutines. |
| pending sync.WaitGroup |
| closed chan struct{} |
| |
| // TODO(hpucha): Other global names to advertise to enable Syncbase |
| // discovery. For example, every Syncbase must be reachable under |
| // <mttable>/<syncbaseid> for p2p sync. This is the name advertised |
| // during syncgroup join. In addition, a Syncbase might also be |
| // accepting "publish syncgroup requests", and might use a more |
| // human-readable name such as <mttable>/<idp>/<sgserver>. All these |
| // names must be advertised in the appropriate mount tables. |
| |
| // In-memory sync membership info aggregated across databases. |
| allMembers *memberView |
| allMembersLock sync.RWMutex |
| |
| // In-memory maps of neighborhood information found via the discovery |
| // service. The IDs map gathers all the neighborhood info using the |
| // instance IDs as keys. The sync peers map is a secondary index to |
| // access the info using the Syncbase names as keys. The syncgroups |
| // map is a secondary index to access the info using the syncgroup Gids |
| // as keys of the outer-map, and instance IDs as keys of the inner-map. |
| discovery discovery.T |
| discoveryIds map[string]*discovery.Advertisement |
| discoveryPeers map[string]*discovery.Advertisement |
| discoverySyncgroups map[interfaces.GroupId]map[string]*discovery.Advertisement |
| discoveryLock sync.RWMutex |
| |
| nameLock sync.Mutex // lock needed to serialize adding and removing of Syncbase names. |
| |
| // Handle to the server for adding other names in the future. |
| svr rpc.Server |
| |
| // Cancel functions for contexts derived from the root context when |
| // advertising over neighborhood. This is needed to stop advertising. |
| cancelAdvSyncbase context.CancelFunc // cancels Syncbase advertising. |
| advSyncgroups map[interfaces.GroupId]syncAdvertisementState // describes syncgroup advertising. |
| advLock sync.Mutex |
| |
| // Whether to enable neighborhood advertising. |
| publishInNh bool |
| |
| // In-memory sync state per Database. This state is populated at |
| // startup, and periodically persisted by the initiator. |
| syncState map[wire.Id]*dbSyncStateInMem |
| syncStateLock sync.Mutex // lock to protect access to the sync state. |
| |
| // In-memory queue of syncgroups to be published. It is reconstructed |
| // at startup from syncgroup info so it does not need to be persisted. |
| sgPublishQueue *list.List |
| sgPublishQueueLock sync.Mutex |
| |
| // In-memory tracking of batches during their construction. |
| // The sync Initiator and Watcher build batches incrementally here |
| // and then persist them in DAG batch entries. The mutex guards |
| // access to the batch set. |
| batchesLock sync.Mutex |
| batches batchSet |
| |
| // Metadata related to blob handling. |
| bst blob.BlobStore // local blob store associated with this Syncbase. |
| |
| // Syncbase vclock related variables. |
| vclock *vclock.VClock |
| |
| // Peer manager for managing peers to sync with. |
| pm peerManager |
| } |
| |
| // syncDatabase contains the metadata for syncing a database. This struct is |
| // used as a receiver to hand off the app-initiated syncgroup calls that arrive |
| // against a database to the sync module. |
| type syncDatabase struct { |
| db interfaces.Database |
| sync interfaces.SyncServerMethods |
| } |
| |
| // syncAdvertisementState contains information about the most recent |
| // advertisement for each advertised syncgroup. |
| type syncAdvertisementState struct { |
| cancel context.CancelFunc // cancels advertising. |
| specVersion string // version of most recently advertised spec. |
| adId discovery.AdId |
| } |
| |
| var ( |
| ifName = interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name |
| _ interfaces.SyncServerMethods = (*syncService)(nil) |
| ) |
| |
| // New creates a new sync module. |
| // |
| // Concurrency: sync initializes 3 goroutines at startup: a "syncer", a |
| // "neighborhood scanner", and a "peer manager". In addition, one "watcher" |
| // thread is started for each database to watch its store for changes to its |
| // objects. The "syncer" thread is responsible for periodically contacting peers |
| // to fetch changes from them. The "neighborhood scanner" thread continuously |
| // scans the neighborhood to learn of other Syncbases and syncgroups in its |
| // neighborhood. The "peer manager" thread continuously maintains viable peers |
| // that the syncer can pick from. In addition, the sync module responds to |
| // incoming RPCs from remote sync modules and local clients. |
| func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) { |
| discovery, err := syncbase.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{})) |
| if err != nil { |
| return nil, err |
| } |
| s := &syncService{ |
| sv: sv, |
| batches: make(batchSet), |
| sgPublishQueue: list.New(), |
| vclock: cl, |
| ctx: ctx, |
| rng: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), |
| discovery: discovery, |
| publishInNh: publishInNh, |
| advSyncgroups: make(map[interfaces.GroupId]syncAdvertisementState), |
| } |
| |
| data := &SyncData{} |
| if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error { |
| if err := store.Get(ctx, sv.St(), s.stKey(), data); err != nil { |
| if verror.ErrorID(err) != verror.ErrNoExist.ID { |
| return err |
| } |
| // First invocation of vsync.New(). |
| // TODO(sadovsky): Maybe move guid generation and storage to serviceData. |
| data.Id = s.rand64() |
| return store.Put(ctx, tx, s.stKey(), data) |
| } |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| |
| // data.Id is now guaranteed to be initialized. |
| s.id = data.Id |
| s.name = syncbaseIdToName(s.id) |
| |
| vlog.VI(1).Infof("sync: New: Syncbase ID is %x", s.id) |
| |
| // Initialize in-memory state for the sync module before starting any threads. |
| if err := s.initSync(ctx); err != nil { |
| return nil, verror.New(verror.ErrInternal, ctx, err) |
| } |
| |
| // Open a blob store. |
| s.bst, err = fsblob.Create(ctx, blobStEngine, path.Join(blobRootDir, "blobs")) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Channel to propagate close event to all threads. |
| s.closed = make(chan struct{}) |
| s.pending.Add(3) |
| |
| // Initialize a peer manager with the peer selection policy. |
| s.pm = newPeerManager(ctx, s, selectRandom) |
| |
| // Start the peer manager thread to maintain peers viable for syncing. |
| go s.pm.managePeers(ctx) |
| |
| // Start initiator thread to periodically get deltas from peers. The |
| // initiator threads consults the peer manager to pick peers to sync |
| // with. So we initialize the peer manager before starting the syncer. |
| go s.syncer(ctx) |
| |
| // Start the discovery service thread to listen to neighborhood updates. |
| go s.discoverNeighborhood(ctx) |
| |
| return s, nil |
| } |
| |
| // Close waits for spawned sync threads to shut down, and closes the local blob |
| // store handle. |
| func Close(ss interfaces.SyncServerMethods) { |
| vlog.VI(2).Infof("sync: Close: begin") |
| defer vlog.VI(2).Infof("sync: Close: end") |
| |
| s := ss.(*syncService) |
| close(s.closed) |
| s.pending.Wait() |
| s.bst.Close() |
| } |
| |
| func NewSyncDatabase(db interfaces.Database) *syncDatabase { |
| return &syncDatabase{db: db, sync: db.Service().Sync()} |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////////////////// |
| // Neighborhood based discovery of syncgroups and Syncbases. |
| |
| // discoverNeighborhood listens to updates from the discovery service to learn |
| // about sync peers and syncgroups (if they have admins in the neighborhood) as |
| // they enter and leave the neighborhood. |
| func (s *syncService) discoverNeighborhood(ctx *context.T) { |
| defer s.pending.Done() |
| |
| query := `v.InterfaceName="` + ifName + `"` |
| ch, err := s.discovery.Scan(ctx, query) |
| if err != nil { |
| vlog.Errorf("sync: discoverNeighborhood: cannot start discovery service: %v", err) |
| return |
| } |
| |
| for !s.isClosed() { |
| select { |
| case update, ok := <-ch: |
| if !ok || s.isClosed() { |
| break |
| } |
| |
| vlog.VI(3).Infof("discoverNeighborhood: got an update, %+v\n", update) |
| |
| if update.IsLost() { |
| s.updateDiscoveryInfo(update.Id().String(), nil) |
| } else { |
| ad := update.Advertisement() |
| s.updateDiscoveryInfo(update.Id().String(), &ad) |
| } |
| |
| case <-s.closed: |
| break |
| } |
| } |
| |
| vlog.VI(1).Info("sync: discoverNeighborhood: channel closed, stop listening and exit") |
| } |
| |
| // updateDiscoveryInfo adds or removes information about a sync peer or a |
| // syncgroup found in the neighborhood through the discovery service. If the |
| // service entry is nil the record is removed from its discovery map. The peer |
| // and syncgroup information is stored in the service attributes. |
| func (s *syncService) updateDiscoveryInfo(id string, ad *discovery.Advertisement) { |
| s.discoveryLock.Lock() |
| defer s.discoveryLock.Unlock() |
| |
| vlog.VI(3).Info("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups) |
| |
| // The first time around initialize all discovery maps. |
| if s.discoveryIds == nil { |
| s.discoveryIds = make(map[string]*discovery.Advertisement) |
| s.discoveryPeers = make(map[string]*discovery.Advertisement) |
| s.discoverySyncgroups = make(map[interfaces.GroupId]map[string]*discovery.Advertisement) |
| } |
| |
| // Determine the service entry type (sync peer or syncgroup) and its |
| // value either from the given service info or previously stored entry. |
| // Note: each entry only contains a single attribute. |
| var attrs discovery.Attributes |
| if ad != nil { |
| attrs = ad.Attributes |
| } else if serv := s.discoveryIds[id]; serv != nil { |
| attrs = serv.Attributes |
| } |
| |
| // handle peers |
| if peer, ok := attrs[wire.DiscoveryAttrPeer]; ok { |
| // The attribute value is the Syncbase peer name. |
| if ad != nil { |
| s.discoveryPeers[peer] = ad |
| } else { |
| delete(s.discoveryPeers, peer) |
| } |
| } |
| |
| // handle syngroups |
| if sgName, ok := attrs[wire.DiscoveryAttrSyncgroupName]; ok { |
| // The attribute value is the syncgroup name. |
| dbId := wire.Id{Name: attrs[wire.DiscoveryAttrDatabaseName], Blessing: attrs[wire.DiscoveryAttrDatabaseBlessing]} |
| sgId := wire.Id{Name: sgName, Blessing: attrs[wire.DiscoveryAttrSyncgroupBlessing]} |
| gid := SgIdToGid(dbId, sgId) |
| admins := s.discoverySyncgroups[gid] |
| if ad != nil { |
| if admins == nil { |
| admins = make(map[string]*discovery.Advertisement) |
| s.discoverySyncgroups[gid] = admins |
| vlog.VI(3).Infof("added advertisement %+v, %p as dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid) |
| |
| } |
| admins[id] = ad |
| } else if admins != nil { |
| delete(admins, id) |
| if len(admins) == 0 { |
| delete(s.discoverySyncgroups, gid) |
| vlog.VI(3).Infof("deleted advertisement %+v, %p from dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid) |
| } |
| } |
| } |
| |
| // Add or remove the service entry from the main IDs map. |
| if ad != nil { |
| s.discoveryIds[id] = ad |
| } else { |
| delete(s.discoveryIds, id) |
| } |
| } |
| |
| // filterDiscoveryPeers returns only those peers discovered via neighborhood |
| // that are also found in sgMembers (passed as input argument). |
| func (s *syncService) filterDiscoveryPeers(sgMembers map[string]uint32) map[string]*discovery.Advertisement { |
| s.discoveryLock.Lock() |
| defer s.discoveryLock.Unlock() |
| |
| if s.discoveryPeers == nil { |
| return nil |
| } |
| |
| sgNeighbors := make(map[string]*discovery.Advertisement) |
| |
| for peer, svc := range s.discoveryPeers { |
| if _, ok := sgMembers[peer]; ok { |
| sgNeighbors[peer] = svc |
| } |
| } |
| |
| return sgNeighbors |
| } |
| |
| // filterSyncgroupAdmins returns syncgroup admins for the specified syncgroup |
| // found in the neighborhood via the discovery service. |
| func (s *syncService) filterSyncgroupAdmins(dbId, sgId wire.Id) []*discovery.Advertisement { |
| s.discoveryLock.Lock() |
| defer s.discoveryLock.Unlock() |
| |
| if s.discoverySyncgroups == nil { |
| return nil |
| } |
| |
| sgInfo := s.discoverySyncgroups[SgIdToGid(dbId, sgId)] |
| if sgInfo == nil { |
| return nil |
| } |
| |
| admins := make([]*discovery.Advertisement, 0, len(sgInfo)) |
| for _, svc := range sgInfo { |
| admins = append(admins, svc) |
| } |
| |
| return admins |
| } |
| |
| // AddNames publishes all the names for this Syncbase instance gathered from all |
| // the syncgroups it is currently participating in. This is needed when |
| // syncbased is restarted so that it can republish itself at the names being |
| // used in the syncgroups. |
| func AddNames(ctx *context.T, ss interfaces.SyncServerMethods, svr rpc.Server) error { |
| vlog.VI(2).Infof("sync: AddNames: begin") |
| defer vlog.VI(2).Infof("sync: AddNames: end") |
| |
| s := ss.(*syncService) |
| s.nameLock.Lock() |
| defer s.nameLock.Unlock() |
| |
| s.svr = svr |
| |
| info := s.copyMemberInfo(ctx, s.name) |
| if info == nil || len(info.mtTables) == 0 { |
| vlog.VI(2).Infof("sync: AddNames: end returning no names") |
| return nil |
| } |
| |
| for mt := range info.mtTables { |
| name := naming.Join(mt, s.name) |
| // Note that AddName will retry the publishing if not |
| // successful. So if a node is offline, it will publish the name |
| // when possible. |
| if err := svr.AddName(name); err != nil { |
| vlog.VI(2).Infof("sync: AddNames: end returning AddName err %v", err) |
| return err |
| } |
| } |
| if err := s.advertiseSyncbaseInNeighborhood(); err != nil { |
| vlog.VI(2).Infof("sync: AddNames: end returning advertiseSyncbaseInNeighborhood err %v", err) |
| return err |
| } |
| |
| // Advertise syncgroups. |
| for dbId, dbInfo := range info.db2sg { |
| st, err := s.getDbStore(ctx, nil, dbId) |
| if err != nil { |
| return err |
| } |
| for gid := range dbInfo { |
| sg, err := getSyncgroupByGid(ctx, st, gid) |
| if err != nil { |
| return err |
| } |
| if err := s.advertiseSyncgroupInNeighborhood(sg); err != nil { |
| vlog.VI(2).Infof("sync: AddNames: end returning advertiseSyncgroupInNeighborhood err %v", err) |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // advertiseSyncbaseInNeighborhood checks if the Syncbase service is already |
| // being advertised over the neighborhood. If not, it begins advertising. The |
| // caller of the function is holding nameLock. |
| func (s *syncService) advertiseSyncbaseInNeighborhood() error { |
| if !s.publishInNh { |
| return nil |
| } |
| |
| vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: begin") |
| defer vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: end") |
| |
| s.advLock.Lock() |
| defer s.advLock.Unlock() |
| // Syncbase is already being advertised. |
| if s.cancelAdvSyncbase != nil { |
| return nil |
| } |
| |
| sbService := discovery.Advertisement{ |
| InterfaceName: ifName, |
| Attributes: discovery.Attributes{ |
| wire.DiscoveryAttrPeer: s.name, |
| }, |
| } |
| |
| ctx, stop := context.WithCancel(s.ctx) |
| |
| // Note that duplicate calls to advertise will return an error. |
| ch, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, nil) |
| |
| if err == nil { |
| vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: successful") |
| s.cancelAdvSyncbase = func() { |
| stop() |
| <-ch |
| } |
| return nil |
| } |
| stop() |
| return err |
| } |
| |
| // advertiseSyncgroupInNeighborhood checks if this Syncbase is an admin of the |
| // syncgroup, and if so advertises the syncgroup over neighborhood. If the |
| // Syncbase loses admin access, any previous syncgroup advertisements are |
| // cancelled. The caller of the function is holding nameLock. |
| func (s *syncService) advertiseSyncgroupInNeighborhood(sg *interfaces.Syncgroup) error { |
| if !s.publishInNh { |
| return nil |
| } |
| vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: begin") |
| defer vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: end") |
| |
| s.advLock.Lock() |
| defer s.advLock.Unlock() |
| |
| // Refresh the sg spec before advertising. This prevents trampling |
| // when concurrent spec updates apply transactions in one ordering, |
| // but advertise in another. |
| gid := SgIdToGid(sg.DbId, sg.Id) |
| st, err := s.getDbStore(s.ctx, nil, sg.DbId) |
| if err != nil { |
| return err |
| } |
| sg, err = getSyncgroupByGid(s.ctx, st, gid) |
| if err != nil { |
| return err |
| } |
| |
| state, advertising := s.advSyncgroups[gid] |
| if advertising { |
| // The spec hasn't changed since the last advertisement. |
| if sg.SpecVersion == state.specVersion { |
| return nil |
| } |
| state.cancel() |
| delete(s.advSyncgroups, gid) |
| } |
| |
| // We only advertise if potential members could join by contacting us. |
| // For that reason we only advertise if we are an admin. |
| if !syncgroupAdmin(s.ctx, sg.Spec.Perms) { |
| return nil |
| } |
| |
| sbService := discovery.Advertisement{ |
| InterfaceName: ifName, |
| Attributes: discovery.Attributes{ |
| wire.DiscoveryAttrDatabaseName: sg.DbId.Name, |
| wire.DiscoveryAttrDatabaseBlessing: sg.DbId.Blessing, |
| wire.DiscoveryAttrSyncgroupName: sg.Id.Name, |
| wire.DiscoveryAttrSyncgroupBlessing: sg.Id.Blessing, |
| }, |
| } |
| if advertising { |
| sbService.Id = state.adId |
| } |
| ctx, stop := context.WithCancel(s.ctx) |
| |
| vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: advertising %v", sbService) |
| |
| // TODO(mattr): Unfortunately, discovery visibility isn't as powerful |
| // as an ACL. There's no way to represent the NotIn list. For now |
| // if you match the In list you can see the advertisement, though you |
| // might not be able to join. |
| visibility := sg.Spec.Perms[string(access.Read)].In |
| ch, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, visibility) |
| if err == nil { |
| vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: successful") |
| cancel := func() { |
| stop() |
| <-ch |
| } |
| s.advSyncgroups[gid] = syncAdvertisementState{cancel: cancel, specVersion: sg.SpecVersion, adId: sbService.Id} |
| return nil |
| } |
| stop() |
| return err |
| } |
| |
| ////////////////////////////// |
| // Helpers. |
| |
| // isClosed returns true if the sync service channel is closed indicating that |
| // the service is shutting down. |
| func (s *syncService) isClosed() bool { |
| select { |
| case <-s.closed: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // rand64 generates an unsigned 64-bit pseudo-random number. |
| func (s *syncService) rand64() uint64 { |
| s.rngLock.Lock() |
| defer s.rngLock.Unlock() |
| return (uint64(s.rng.Int63()) << 1) | uint64(s.rng.Int63n(2)) |
| } |
| |
| // randIntn mimics rand.Intn (generates a non-negative pseudo-random number in [0,n)). |
| func (s *syncService) randIntn(n int) int { |
| s.rngLock.Lock() |
| defer s.rngLock.Unlock() |
| return s.rng.Intn(n) |
| } |
| |
| func syncbaseIdToName(id uint64) string { |
| return fmt.Sprintf("%x", id) |
| } |
| |
| func (s *syncService) stKey() string { |
| return common.SyncPrefix |
| } |