| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Veyron Sync hook to the Store Watch API. When applications update objects |
| // in the local Veyron Store, Sync learns about these via the Watch API stream |
| // of object mutations. In turn, this Sync watcher thread updates the DAG and |
| // ILog records to track the object change histories. |
| |
| import ( |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/x/lib/vlog" |
| ) |
| |
| var ( |
| // watchRetryDelay is how long the watcher waits before calling the Watch() RPC again |
| // after the previous call fails. |
| watchRetryDelay = 2 * time.Second |
| // streamRetryDelay is how long the watcher waits before creating a new Watch stream |
| // after the previous stream ends. |
| streamRetryDelay = 1 * time.Second |
| ) |
| |
| // syncWatcher contains the metadata for the Sync Watcher thread. |
| type syncWatcher struct { |
| // syncd is a pointer to the Syncd instance owning this Watcher. |
| syncd *syncd |
| } |
| |
| // newWatcher creates a new Sync Watcher instance attached to the given Syncd instance. |
| func newWatcher(syncd *syncd) *syncWatcher { |
| return &syncWatcher{syncd: syncd} |
| } |
| |
| func (w *syncWatcher) watchStreamCanceler(cancel context.CancelFunc, done <-chan struct{}) { |
| select { |
| case <-w.syncd.closed: |
| vlog.VI(1).Info("watchStreamCanceler: Syncd channel closed, cancel stream and exit") |
| cancel() |
| case <-done: |
| vlog.VI(1).Info("watchStreamCanceler: done, exit") |
| } |
| } |
| |
| // watchStore consumes change records obtained by watching the store |
| // for updates and applies them to the sync DBs. |
| func (w *syncWatcher) watchStore() { |
| defer w.syncd.pending.Done() |
| |
| // // If no Veyron store is configured, there is nothing to watch. |
| // if w.syncd.store == nil { |
| // vlog.VI(1).Info("watchStore: Veyron store not configured; skipping the watcher") |
| // return |
| // } |
| |
| // // Get a Watch stream, process it, repeat till end-of-life. |
| // for { |
| // ctx, _ := vtrace.SetNewTrace(w.syncd.ctx) |
| // ctx, cancel := context.WithCancel(ctx) |
| // stream := w.getWatchStream(ctx) |
| // if stream == nil { |
| // return // Syncd is exiting. |
| // } |
| |
| // // Spawn a goroutine to detect the Syncd "closed" channel and cancel the RPC stream |
| // // to unblock the watcher. The "done" channel lets the watcher terminate the goroutine. |
| // go w.watchStreamCanceler(cancel, ctx.Done()) |
| |
| // // Process the stream of Watch updates until it closes (similar to "tail -f"). |
| // w.processWatchStream(stream) |
| |
| // if w.syncd.isSyncClosing() { |
| // return // Syncd is exiting. |
| // } |
| |
| // stream.Finish() |
| // cancel() |
| |
| // // TODO(rdaoud): we need a rate-limiter here in case the stream closes too quickly. |
| // // If the stream stays up long enough, no need to sleep before getting a new one. |
| // time.Sleep(streamRetryDelay) |
| // } |
| } |
| |
| // // getWatchStream() returns a Watch API stream and handles retries if the Watch() call fails. |
| // // If the stream is nil, it means Syncd is exiting cleanly and the caller should terminate. |
| // func (w *syncWatcher) getWatchStream(ctx *context.T) watch.GlobWatcherWatchGlobCall { |
| // for { |
| // w.syncd.lock.RLock() |
| // resmark := w.syncd.devtab.head.Resmark |
| // w.syncd.lock.RUnlock() |
| |
| // req := raw.Request{} |
| // if resmark != nil { |
| // req.ResumeMarker = resmark |
| // } |
| |
| // stream, err := w.syncd.store.Watch(ctx, req) |
| // if err == nil { |
| // return stream |
| // } |
| |
| // if w.syncd.isSyncClosing() { |
| // vlog.VI(1).Info("getWatchStream: exiting, Syncd closed its channel: ", err) |
| // return nil |
| // } |
| |
| // vlog.VI(1).Info("getWatchStream: call to Watch() failed, retrying a bit later: ", err) |
| // time.Sleep(watchRetryDelay) |
| // } |
| // } |
| |
| // // processWatchStream reads the stream of Watch updates and applies the object mutations. |
| // // Ideally this call does not return as the stream should be un-ending (like "tail -f"). |
| // // If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling |
| // // the stream to trigger a clean exit. |
| // func (w *syncWatcher) processWatchStream(call watch.GlobWatcherWatchGlobCall) { |
| // var txChanges []*types.Change = nil |
| // var syncTime int64 |
| |
| // stream := call.RecvStream() |
| // for stream.Advance() { |
| // change := stream.Value() |
| |
| // // Timestamp of the start of a batch of changes arriving at the Sync server. |
| // if txChanges == nil { |
| // syncTime = time.Now().UnixNano() |
| // } |
| // txChanges = append(txChanges, &change) |
| |
| // // Process the transaction when its last change record is received. |
| // if !change.Continued { |
| // if err := w.processTransaction(txChanges, syncTime); err != nil { |
| // // TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of |
| // // self-healing from a data corruption where feasible, otherwise quarantine this device |
| // // from the cluster and stop Syncd to avoid propagating data corruptions. |
| // vlog.Fatal("processWatchStream:", err) |
| // } |
| // txChanges = nil |
| // } |
| // } |
| |
| // err := stream.Err() |
| // if err == nil { |
| // err = io.EOF |
| // } |
| |
| // if w.syncd.isSyncClosing() { |
| // vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err) |
| // } else { |
| // vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err) |
| // } |
| // } |
| |
| // // matchSyncRoot returns the SyncRoot (SyncGroup root ObjId) that matches the given object path. |
| // // It traverses the object path looking if any of the IDs along the way are SyncRoots. |
| // // If no match is found, it returns the invalid ID (zero). |
| // // Note: the path IDs given by Watch are ordered: object, parent, grand-parent, etc.., root. |
| // // As a result, this function returns the narrowest matching SyncGroup root ObjId. |
| // func matchSyncRoot(objPathIDs []ObjId, sgObjIds map[ObjId]struct{}) ObjId { |
| // for _, oid := range objPathIDs { |
| // if _, ok := sgObjIds[oid]; ok { |
| // return oid |
| // } |
| // } |
| // return ObjId{} |
| // } |
| |
| // // processTransaction applies a batch of changes (object mutations) that form a single transaction |
| // // received from the Watch API. The function grabs the write-lock to access the Log and DAG DBs. |
| // func (w *syncWatcher) processTransaction(txBatch []*types.Change, syncTime int64) error { |
| // w.syncd.lock.Lock() |
| // defer w.syncd.lock.Unlock() |
| |
| // count := uint32(len(txBatch)) |
| // if count == 0 { |
| // return nil |
| // } |
| |
| // // Get the current set of SyncGroup Root ObjIds and use it to determine for each object |
| // // the SyncGroup it belongs to, if any. |
| // sgRootObjIds, err := w.syncd.sgtab.getAllSyncRoots() |
| // if err != nil { |
| // return err |
| // } |
| |
| // // If the batch has more than one mutation, start a DAG transaction for it. |
| // txID := NoTxId |
| // if count > 1 { |
| // txID = w.syncd.dag.addNodeTxStart(txID) |
| // if txID == NoTxId { |
| // return fmt.Errorf("failed to generate transaction ID") |
| // } |
| // } |
| |
| // vlog.VI(1).Infof("processTransaction: ready to process a batch of %d changes for transaction %v", count, txID) |
| |
| // for _, ch := range txBatch { |
| // rc, ok := ch.Value.(*raw.RawChange) |
| // if !ok { |
| // return fmt.Errorf("invalid change value, not a raw change: %#v", ch) |
| // } |
| |
| // mu := &rc.Mutation |
| // isDeleted := ch.State == types.DoesNotExist |
| |
| // // Determine the SyncGroup root ObjId that matches this object, if any. |
| // rootObjId := matchSyncRoot(rc.PathIDs, sgRootObjIds) |
| |
| // if rootObjId.IsValid() { |
| // // The object matches a SyncGroup: process its watch record to track it in the Sync logs. |
| // // All LogValues belonging to the same transaction get the same timestamp. |
| // val := &LogValue{ |
| // Mutation: *mu, |
| // SyncTime: syncTime, |
| // Delete: isDeleted, |
| // TxId: txID, |
| // TxCount: count, |
| // } |
| // vlog.VI(2).Infof("processTransaction: processing record %v, Tx %v", val, txID) |
| |
| // err = w.syncd.log.processWatchRecord(mu.Id, mu.Version, mu.PriorVersion, val, rootObjId) |
| // } else { |
| // // The object does not match any SyncGroup: stash it in the "private" table to save its info |
| // // in case it becomes shared in the future, at which time it would be added to the Sync logs. |
| // if isDeleted { |
| // err = w.syncd.dag.delPrivNode(mu.Id) |
| // } else { |
| // priv := &privNode{Mutation: mu, PathIDs: rc.PathIDs, SyncTime: syncTime, TxId: txID, TxCount: count} |
| // err = w.syncd.dag.setPrivNode(mu.Id, priv) |
| // } |
| // } |
| |
| // if err != nil { |
| // return fmt.Errorf("cannot process mutation: %#v, mu %#v: %s", ch, mu, err) |
| // } |
| // } |
| |
| // // End the DAG transaction if any. |
| // if txID != NoTxId { |
| // // Note: if there are no shared objects in the transaction, the addNodeTxEnd() call |
| // // does not persist the Tx state, it only cleans up the in-memory scaffolding. |
| // // This is the desired behavior, Sync only needs to track the state of transactions |
| // // that have at least one shared object. |
| // if err := w.syncd.dag.addNodeTxEnd(txID, count); err != nil { |
| // return err |
| // } |
| // } |
| |
| // // Update the device table with the new resume marker of the last record. |
| // w.syncd.devtab.head.Resmark = txBatch[count-1].ResumeMarker |
| // return nil |
| // } |