veyron/runtimes/google/vsync: Remove TODOContexts from vsync.
TODOContexts were placeholders until I found the proper place
to put real contexts. Contexts should correspond to a single
logical operation.
Change-Id: I7938408f993ad48c64c967cd8924213d8c90af87
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 1257d5a..67422d9 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -10,6 +10,7 @@
"veyron/services/store/raw"
+ "veyron2/context"
"veyron2/naming"
"veyron2/rt"
"veyron2/storage"
@@ -199,6 +200,8 @@
// getDeltasFromPeer contacts the specified endpoint to obtain deltas wrt its current generation vector.
func (i *syncInitiator) getDeltasFromPeer(dID, ep string, local GenVector) {
+ ctx := rt.R().NewContext()
+
vlog.VI(1).Infof("GetDeltasFromPeer:: From server %s with DeviceID %s at %v", ep, dID, time.Now().UTC())
// Construct a new stub that binds to peer endpoint.
@@ -211,7 +214,7 @@
vlog.VI(1).Infof("GetDeltasFromPeer:: Sending local information: %v", local)
// Issue a GetDeltas() rpc.
- stream, err := c.GetDeltas(rt.R().TODOContext(), local, i.syncd.id)
+ stream, err := c.GetDeltas(ctx, local, i.syncd.id)
if err != nil {
vlog.Errorf("GetDeltasFromPeer:: error getting deltas: err %v", err)
return
@@ -227,7 +230,7 @@
vlog.Fatalf("GetDeltasFromPeer:: finish failed with err %v", err)
}
- if err := i.processUpdatedObjects(local, minGens, remote, DeviceID(dID)); err != nil {
+ if err := i.processUpdatedObjects(ctx, local, minGens, remote, DeviceID(dID)); err != nil {
vlog.Fatalf("GetDeltasFromPeer:: error processing objects: err %v", err)
}
@@ -346,7 +349,7 @@
// this case, we wait to get the latest versions of objects from the
// store, and recheck if the object has any conflicts and repeat the
// above steps, until put to store succeeds.
-func (i *syncInitiator) processUpdatedObjects(local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) processUpdatedObjects(ctx context.T, local, minGens, remote GenVector, dID DeviceID) error {
for {
if err := i.detectConflicts(); err != nil {
return err
@@ -357,7 +360,7 @@
return err
}
- err = i.updateStoreAndSync(m, local, minGens, remote, dID)
+ err = i.updateStoreAndSync(ctx, m, local, minGens, remote, dID)
if err == nil {
break
}
@@ -508,7 +511,7 @@
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) updateStoreAndSync(ctx context.T, m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
@@ -517,7 +520,7 @@
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
if store := i.syncd.store; store != nil && len(m) > 0 {
- stream, err := store.PutMutations(rt.R().TODOContext())
+ stream, err := store.PutMutations(ctx)
if err != nil {
vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
return err
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index e44105c..a0801a7 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -12,6 +12,7 @@
"veyron/services/store/raw"
"veyron2"
+ "veyron2/context"
"veyron2/ipc"
"veyron2/query"
"veyron2/rt"
@@ -66,8 +67,9 @@
}
// Get a Watch stream, process it, repeat till end-of-life.
+ ctx := rt.R().NewContext()
for {
- stream := w.getWatchStream()
+ stream := w.getWatchStream(ctx)
if stream == nil {
return // Syncd is exiting.
}
@@ -95,14 +97,14 @@
// getWatchStream() returns a Watch API stream and handles retries if the Watch() call fails.
// If the stream is nil, it means Syncd is exiting cleanly and the caller should terminate.
-func (w *syncWatcher) getWatchStream() watch.WatcherWatchStream {
+func (w *syncWatcher) getWatchStream(ctx context.T) watch.WatcherWatchStream {
for {
req := watch.Request{Query: query.Query{}}
if resmark := w.syncd.devtab.head.Resmark; resmark != nil {
req.ResumeMarker = resmark
}
- stream, err := w.syncd.store.Watch(rt.R().TODOContext(), req, veyron2.CallTimeout(ipc.NoTimeout))
+ stream, err := w.syncd.store.Watch(ctx, req, veyron2.CallTimeout(ipc.NoTimeout))
if err == nil {
return stream
}