blob: f9799ae0a911268acdd9030c81bbd12cf9e48a92 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Veyron Sync hook to the Store Watch API. When applications update objects
// in the local Veyron Store, Sync learns about these via the Watch API stream
// of object mutations. In turn, this Sync watcher thread updates the DAG and
// ILog records to track the object change histories.
import (
var (
// watchRetryDelay is how long the watcher waits before calling the Watch() RPC again
// after the previous call fails.
watchRetryDelay = 2 * time.Second
// streamRetryDelay is how long the watcher waits before creating a new Watch stream
// after the previous stream ends.
streamRetryDelay = 1 * time.Second
// syncWatcher contains the metadata for the Sync Watcher thread.
type syncWatcher struct {
// syncd is a pointer to the Syncd instance owning this Watcher.
syncd *syncd
// newWatcher creates a new Sync Watcher instance attached to the given Syncd instance.
func newWatcher(syncd *syncd) *syncWatcher {
return &syncWatcher{syncd: syncd}
func (w *syncWatcher) watchStreamCanceler(cancel context.CancelFunc, done <-chan struct{}) {
select {
case <-w.syncd.closed:
vlog.VI(1).Info("watchStreamCanceler: Syncd channel closed, cancel stream and exit")
case <-done:
vlog.VI(1).Info("watchStreamCanceler: done, exit")
// watchStore consumes change records obtained by watching the store
// for updates and applies them to the sync DBs.
func (w *syncWatcher) watchStore() {
defer w.syncd.pending.Done()
// // If no Veyron store is configured, there is nothing to watch.
// if == nil {
// vlog.VI(1).Info("watchStore: Veyron store not configured; skipping the watcher")
// return
// }
// // Get a Watch stream, process it, repeat till end-of-life.
// for {
// ctx, _ := vtrace.SetNewTrace(w.syncd.ctx)
// ctx, cancel := context.WithCancel(ctx)
// stream := w.getWatchStream(ctx)
// if stream == nil {
// return // Syncd is exiting.
// }
// // Spawn a goroutine to detect the Syncd "closed" channel and cancel the RPC stream
// // to unblock the watcher. The "done" channel lets the watcher terminate the goroutine.
// go w.watchStreamCanceler(cancel, ctx.Done())
// // Process the stream of Watch updates until it closes (similar to "tail -f").
// w.processWatchStream(stream)
// if w.syncd.isSyncClosing() {
// return // Syncd is exiting.
// }
// stream.Finish()
// cancel()
// // TODO(rdaoud): we need a rate-limiter here in case the stream closes too quickly.
// // If the stream stays up long enough, no need to sleep before getting a new one.
// time.Sleep(streamRetryDelay)
// }
// // getWatchStream() returns a Watch API stream and handles retries if the Watch() call fails.
// // If the stream is nil, it means Syncd is exiting cleanly and the caller should terminate.
// func (w *syncWatcher) getWatchStream(ctx *context.T) watch.GlobWatcherWatchGlobCall {
// for {
// w.syncd.lock.RLock()
// resmark := w.syncd.devtab.head.Resmark
// w.syncd.lock.RUnlock()
// req := raw.Request{}
// if resmark != nil {
// req.ResumeMarker = resmark
// }
// stream, err :=, req)
// if err == nil {
// return stream
// }
// if w.syncd.isSyncClosing() {
// vlog.VI(1).Info("getWatchStream: exiting, Syncd closed its channel: ", err)
// return nil
// }
// vlog.VI(1).Info("getWatchStream: call to Watch() failed, retrying a bit later: ", err)
// time.Sleep(watchRetryDelay)
// }
// }
// // processWatchStream reads the stream of Watch updates and applies the object mutations.
// // Ideally this call does not return as the stream should be un-ending (like "tail -f").
// // If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling
// // the stream to trigger a clean exit.
// func (w *syncWatcher) processWatchStream(call watch.GlobWatcherWatchGlobCall) {
// var txChanges []*types.Change = nil
// var syncTime int64
// stream := call.RecvStream()
// for stream.Advance() {
// change := stream.Value()
// // Timestamp of the start of a batch of changes arriving at the Sync server.
// if txChanges == nil {
// syncTime = time.Now().UnixNano()
// }
// txChanges = append(txChanges, &change)
// // Process the transaction when its last change record is received.
// if !change.Continued {
// if err := w.processTransaction(txChanges, syncTime); err != nil {
// // TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
// // self-healing from a data corruption where feasible, otherwise quarantine this device
// // from the cluster and stop Syncd to avoid propagating data corruptions.
// vlog.Fatal("processWatchStream:", err)
// }
// txChanges = nil
// }
// }
// err := stream.Err()
// if err == nil {
// err = io.EOF
// }
// if w.syncd.isSyncClosing() {
// vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
// } else {
// vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
// }
// }
// // matchSyncRoot returns the SyncRoot (SyncGroup root ObjId) that matches the given object path.
// // It traverses the object path looking if any of the IDs along the way are SyncRoots.
// // If no match is found, it returns the invalid ID (zero).
// // Note: the path IDs given by Watch are ordered: object, parent, grand-parent, etc.., root.
// // As a result, this function returns the narrowest matching SyncGroup root ObjId.
// func matchSyncRoot(objPathIDs []ObjId, sgObjIds map[ObjId]struct{}) ObjId {
// for _, oid := range objPathIDs {
// if _, ok := sgObjIds[oid]; ok {
// return oid
// }
// }
// return ObjId{}
// }
// // processTransaction applies a batch of changes (object mutations) that form a single transaction
// // received from the Watch API. The function grabs the write-lock to access the Log and DAG DBs.
// func (w *syncWatcher) processTransaction(txBatch []*types.Change, syncTime int64) error {
// w.syncd.lock.Lock()
// defer w.syncd.lock.Unlock()
// count := uint32(len(txBatch))
// if count == 0 {
// return nil
// }
// // Get the current set of SyncGroup Root ObjIds and use it to determine for each object
// // the SyncGroup it belongs to, if any.
// sgRootObjIds, err := w.syncd.sgtab.getAllSyncRoots()
// if err != nil {
// return err
// }
// // If the batch has more than one mutation, start a DAG transaction for it.
// txID := NoTxId
// if count > 1 {
// txID = w.syncd.dag.addNodeTxStart(txID)
// if txID == NoTxId {
// return fmt.Errorf("failed to generate transaction ID")
// }
// }
// vlog.VI(1).Infof("processTransaction: ready to process a batch of %d changes for transaction %v", count, txID)
// for _, ch := range txBatch {
// rc, ok := ch.Value.(*raw.RawChange)
// if !ok {
// return fmt.Errorf("invalid change value, not a raw change: %#v", ch)
// }
// mu := &rc.Mutation
// isDeleted := ch.State == types.DoesNotExist
// // Determine the SyncGroup root ObjId that matches this object, if any.
// rootObjId := matchSyncRoot(rc.PathIDs, sgRootObjIds)
// if rootObjId.IsValid() {
// // The object matches a SyncGroup: process its watch record to track it in the Sync logs.
// // All LogValues belonging to the same transaction get the same timestamp.
// val := &LogValue{
// Mutation: *mu,
// SyncTime: syncTime,
// Delete: isDeleted,
// TxId: txID,
// TxCount: count,
// }
// vlog.VI(2).Infof("processTransaction: processing record %v, Tx %v", val, txID)
// err = w.syncd.log.processWatchRecord(mu.Id, mu.Version, mu.PriorVersion, val, rootObjId)
// } else {
// // The object does not match any SyncGroup: stash it in the "private" table to save its info
// // in case it becomes shared in the future, at which time it would be added to the Sync logs.
// if isDeleted {
// err = w.syncd.dag.delPrivNode(mu.Id)
// } else {
// priv := &privNode{Mutation: mu, PathIDs: rc.PathIDs, SyncTime: syncTime, TxId: txID, TxCount: count}
// err = w.syncd.dag.setPrivNode(mu.Id, priv)
// }
// }
// if err != nil {
// return fmt.Errorf("cannot process mutation: %#v, mu %#v: %s", ch, mu, err)
// }
// }
// // End the DAG transaction if any.
// if txID != NoTxId {
// // Note: if there are no shared objects in the transaction, the addNodeTxEnd() call
// // does not persist the Tx state, it only cleans up the in-memory scaffolding.
// // This is the desired behavior, Sync only needs to track the state of transactions
// // that have at least one shared object.
// if err := w.syncd.dag.addNodeTxEnd(txID, count); err != nil {
// return err
// }
// }
// // Update the device table with the new resume marker of the last record.
// w.syncd.devtab.head.Resmark = txBatch[count-1].ResumeMarker
// return nil
// }