| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Package vsync provides sync functionality for Syncbase. Sync |
| // service serves incoming GetDeltas requests and contacts other peers |
| // to get deltas from them. When it receives a GetDeltas request, the |
| // incoming generation vector is diffed with the local generation |
| // vector, and missing generations are sent back. When it receives log |
| // records in response to a GetDeltas request, it replays those log |
| // records to get in sync with the sender. |
| import ( |
| "fmt" |
| "math/rand" |
| "sync" |
| "time" |
| |
| "v.io/syncbase/x/ref/services/syncbase/server/interfaces" |
| "v.io/syncbase/x/ref/services/syncbase/server/util" |
| "v.io/syncbase/x/ref/services/syncbase/store" |
| "v.io/v23/context" |
| "v.io/v23/rpc" |
| "v.io/v23/verror" |
| ) |
| |
| // syncService contains the metadata for the sync module. |
| type syncService struct { |
| // TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits. |
| id uint64 // globally unique id for this instance of Syncbase. |
| name string // name derived from the global id. |
| sv interfaces.Service |
| server rpc.Server |
| |
| // High-level lock to serialize the watcher and the initiator. This lock is |
| // needed to handle the following cases: (a) When the initiator is |
| // cutting a local generation, it waits for the watcher to commit the |
| // latest local changes before including them in the checkpoint. (b) |
| // When the initiator is receiving updates, it reads the latest head of |
| // an object as per the DAG state in order to construct the in-memory |
| // graft map used for conflict detection. At the same time, if a watcher |
| // is processing local updates, it may move the object head. Hence the |
| // initiator and watcher contend on the DAG head of an object. Instead |
| // of retrying a transaction which causes the entire delta to be |
| // replayed, we use pessimistic locking to serialize the initiator and |
| // the watcher. |
| // |
| // TODO(hpucha): This is a temporary hack. |
| thLock sync.RWMutex |
| |
| // State to coordinate shutdown of spawned goroutines. |
| pending sync.WaitGroup |
| closed chan struct{} |
| |
| // TODO(hpucha): Other global names to advertise to enable Syncbase |
| // discovery. For example, every Syncbase must be reachable under |
| // <mttable>/<syncbaseid> for p2p sync. This is the name advertised |
| // during SyncGroup join. In addition, a Syncbase might also be |
| // accepting "publish SyncGroup requests", and might use a more |
| // human-readable name such as <mttable>/<idp>/<sgserver>. All these |
| // names must be advertised in the appropriate mount tables. |
| |
| // In-memory sync membership info aggregated across databases. |
| allMembers *memberView |
| |
| // In-memory sync state per Database. This state is populated at |
| // startup, and periodically persisted by the initiator. |
| syncState map[string]*dbSyncStateInMem |
| syncStateLock sync.Mutex // lock to protect access to the sync state. |
| |
| // In-memory tracking of batches during their construction. |
| // The sync Initiator and Watcher build batches incrementally here |
| // and then persist them in DAG batch entries. The mutex guards |
| // access to the batch set. |
| batchesLock sync.Mutex |
| batches batchSet |
| } |
| |
| // syncDatabase contains the metadata for syncing a database. This struct is |
| // used as a receiver to hand off the app-initiated SyncGroup calls that arrive |
| // against a nosql.Database to the sync module. |
| type syncDatabase struct { |
| db interfaces.Database |
| } |
| |
| var ( |
| rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) |
| rngLock sync.Mutex |
| _ interfaces.SyncServerMethods = (*syncService)(nil) |
| ) |
| |
| // rand64 generates an unsigned 64-bit pseudo-random number. |
| func rand64() uint64 { |
| rngLock.Lock() |
| defer rngLock.Unlock() |
| return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2)) |
| } |
| |
| // randIntn mimics rand.Intn (generates a non-negative pseudo-random number in [0,n)). |
| func randIntn(n int) int { |
| rngLock.Lock() |
| defer rngLock.Unlock() |
| return rng.Intn(n) |
| } |
| |
| // New creates a new sync module. |
| // |
| // Concurrency: sync initializes two goroutines at startup: a "watcher" and an |
| // "initiator". The "watcher" thread is responsible for watching the store for |
| // changes to its objects. The "initiator" thread is responsible for |
| // periodically contacting peers to fetch changes from them. In addition, the |
| // sync module responds to incoming RPCs from remote sync modules. |
| func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, server rpc.Server) (*syncService, error) { |
| s := &syncService{ |
| sv: sv, |
| server: server, |
| batches: make(batchSet), |
| } |
| |
| data := &syncData{} |
| if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error { |
| if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil { |
| if verror.ErrorID(err) != verror.ErrNoExist.ID { |
| return err |
| } |
| // First invocation of vsync.New(). |
| // TODO(sadovsky): Maybe move guid generation and storage to serviceData. |
| data.Id = rand64() |
| return util.Put(ctx, tx, s.stKey(), data) |
| } |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| |
| // data.Id is now guaranteed to be initialized. |
| s.id = data.Id |
| s.name = fmt.Sprintf("%x", s.id) |
| |
| // Initialize in-memory state for the sync module before starting any threads. |
| if err := s.initSync(ctx); err != nil { |
| return nil, verror.New(verror.ErrInternal, ctx, err) |
| } |
| |
| // Channel to propagate close event to all threads. |
| s.closed = make(chan struct{}) |
| s.pending.Add(2) |
| |
| // Start watcher thread to watch for updates to local store. |
| go s.watchStore(ctx) |
| |
| // Start initiator thread to periodically get deltas from peers. |
| go s.syncer(ctx) |
| |
| return s, nil |
| } |
| |
| // Close cleans up sync state. |
| // TODO(hpucha): Hook it up to server shutdown of syncbased. |
| func (s *syncService) Close() { |
| close(s.closed) |
| s.pending.Wait() |
| } |
| |
| func NewSyncDatabase(db interfaces.Database) *syncDatabase { |
| return &syncDatabase{db: db} |
| } |
| |
| func (s *syncService) stKey() string { |
| return util.SyncPrefix |
| } |