blob: a9a30f40790af12c52852442ab36f63b6c7f947b [file] [log] [blame]
Himabindu Pucha38ddeb32014-05-12 10:57:22 -07001package vsync
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "math/rand"
8 "strings"
9 "time"
10
Tilak Sharma2af70022014-05-14 11:29:27 -070011 "veyron/services/store/raw"
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070012
13 "veyron2/naming"
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -070014 "veyron2/rt"
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070015 "veyron2/storage"
16 "veyron2/vlog"
17)
18
19// Policies to pick a peer to sync with.
20const (
21 // Picks a peer at random from the available set.
22 selectRandom = iota
23
24 // TODO(hpucha): implement other policies.
25 // Picks a peer with most differing generations.
26 selectMostDiff
27
28 // Picks a peer that was synced with the furthest in the past.
29 selectOldest
30)
31
32// Policies for conflict resolution.
33const (
Raja Daoudc861ab42014-05-20 14:32:54 -070034 // Resolves conflicts by picking the mutation with the most recent timestamp.
35 useTime = iota
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070036
37 // TODO(hpucha): implement other policies.
38 // Resolves conflicts by using the app conflict resolver callbacks via store.
39 useCallback
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070040)
41
42var (
43 // peerSyncInterval is the duration between two consecutive
44 // sync events. In every sync event, the initiator contacts
45 // one of its peers to obtain any pending updates.
Raja Daoudc861ab42014-05-20 14:32:54 -070046 peerSyncInterval = 100 * time.Millisecond
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070047
48 // peerSelectionPolicy is the policy used to select a peer when
49 // the initiator gets a chance to sync.
50 peerSelectionPolicy = selectRandom
51
Raja Daoudc861ab42014-05-20 14:32:54 -070052 // conflictResolutionPolicy is the policy used to resolve conflicts.
53 conflictResolutionPolicy = useTime
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070054
55 errNoUsefulPeer = errors.New("no useful peer to contact")
56)
57
58// syncInitiator contains the metadata and state for the initiator thread.
59type syncInitiator struct {
60 syncd *syncd
61
62 // State to contact peers periodically and get deltas.
63 // TODO(hpucha): This is an initial version with command line arguments.
64 // Next steps are to tie this up into mount table and auto-discover neighbors.
65 neighbors []string
66 neighborIDs []string
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070067
68 updObjects map[storage.ID]*objConflictState
69}
70
71// objConflictState contains the conflict state for objects that are
72// updated during an initiator run.
73type objConflictState struct {
74 isConflict bool
75 newHead storage.Version
76 oldHead storage.Version
77 ancestor storage.Version
78 resolvVal *LogValue
79}
80
81// newInitiator creates a new initiator instance attached to the given syncd instance.
Raja Daoudc861ab42014-05-20 14:32:54 -070082func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string, syncTick time.Duration) *syncInitiator {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070083 i := &syncInitiator{syncd: syncd,
84 updObjects: make(map[storage.ID]*objConflictState),
85 }
86
87 // Bootstrap my peer list.
88 if peerEndpoints != "" {
89 i.neighbors = strings.Split(peerEndpoints, ",")
90 i.neighborIDs = strings.Split(peerDeviceIDs, ",")
91 }
92 if len(i.neighbors) != len(i.neighborIDs) {
93 vlog.Fatalf("newInitiator: Mismatch between number of endpoints and IDs")
94 }
95
Raja Daoudc861ab42014-05-20 14:32:54 -070096 // Override the default peerSyncInterval value if syncTick is specified.
97 if syncTick > 0 {
98 peerSyncInterval = syncTick
99 }
100
Himabindu Puchaed4f2342014-05-15 15:18:24 -0700101 vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id)
102 vlog.VI(1).Infof("newInitiator: Peer endpoints: %v", i.neighbors)
103 vlog.VI(1).Infof("newInitiator: Peer IDs: %v", i.neighborIDs)
Raja Daoudc861ab42014-05-20 14:32:54 -0700104 vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700105
106 return i
107}
108
109// contactPeers wakes up every peerSyncInterval to contact peers and get deltas from them.
110func (i *syncInitiator) contactPeers() {
111 ticker := time.NewTicker(peerSyncInterval)
112 for {
113 select {
114 case <-i.syncd.closed:
115 ticker.Stop()
116 i.syncd.pending.Done()
117 return
118 case <-ticker.C:
119 }
120
121 id, ep, err := i.pickPeer()
122 if err != nil {
123 continue
124 }
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700125
126 // Freeze the most recent batch of local changes
127 // before fetching remote changes from a peer.
128 //
129 // We only allow an initiator to create new local
130 // generations (not responders/watcher) in order to
131 // maintain a static baseline for the duration of a
132 // sync. This addresses the following race condition:
133 // If we allow responders to create new local
134 // generations while the initiator is in progress,
135 // they may beat the initiator and send these new
136 // generations to remote devices. These remote
137 // devices in turn can send these generations back to
138 // the initiator in progress which was started with
139 // older generation information.
140 local, err := i.updateLocalGeneration()
141 if err != nil {
142 vlog.Fatalf("contactPeers:: error updating local generation: err %v", err)
143 }
144
145 i.getDeltasFromPeer(id, ep, local)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700146 }
147}
148
149// pickPeer picks a sync endpoint in the neighborhood to sync with.
150func (i *syncInitiator) pickPeer() (string, string, error) {
151 switch peerSelectionPolicy {
152 case selectRandom:
153 // Pick a neighbor at random.
154 if i.neighbors == nil {
155 return "", "", errNoUsefulPeer
156 }
157 ind := rand.Intn(len(i.neighbors))
158 return i.neighborIDs[ind], i.neighbors[ind], nil
159 default:
160 return "", "", fmt.Errorf("unknown peer selection policy")
161 }
162}
163
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700164// updateLocalGeneration creates a new local generation if needed and
165// returns the newest local generation vector.
166func (i *syncInitiator) updateLocalGeneration() (GenVector, error) {
167 // TODO(hpucha): Eliminate reaching into syncd's lock.
168 i.syncd.lock.Lock()
169 defer i.syncd.lock.Unlock()
170
171 // Create a new local generation if there are any local updates.
172 gen, err := i.syncd.log.createLocalGeneration()
173 if err == errNoUpdates {
174 vlog.VI(1).Infof("createLocalGeneration:: No new updates. Local at %d", gen)
175 return i.syncd.devtab.getGenVec(i.syncd.id)
176 }
177 if err != nil {
178 return GenVector{}, err
179 }
180
181 // Update local generation vector in devTable.
182 if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
183 return GenVector{}, err
184 }
185 return i.syncd.devtab.getGenVec(i.syncd.id)
186}
187
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700188// getDeltasFromPeer contacts the specified endpoint to obtain deltas wrt its current generation vector.
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700189func (i *syncInitiator) getDeltasFromPeer(dID, ep string, local GenVector) {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700190 vlog.VI(1).Infof("GetDeltasFromPeer:: From server %s with DeviceID %s at %v", ep, dID, time.Now().UTC())
191
192 // Construct a new stub that binds to peer endpoint.
193 c, err := BindSync(naming.JoinAddressName(ep, "sync"))
194 if err != nil {
Himabindu Puchaed4f2342014-05-15 15:18:24 -0700195 vlog.Errorf("GetDeltasFromPeer:: error binding to server: err %v", err)
196 return
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700197 }
198
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700199 vlog.VI(1).Infof("GetDeltasFromPeer:: Sending local information: %v", local)
200
201 // Issue a GetDeltas() rpc.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700202 stream, err := c.GetDeltas(rt.R().TODOContext(), local, i.syncd.id)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700203 if err != nil {
Himabindu Puchaed4f2342014-05-15 15:18:24 -0700204 vlog.Errorf("GetDeltasFromPeer:: error getting deltas: err %v", err)
205 return
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700206 }
207
208 minGens, err := i.processLogStream(stream)
209 if err != nil {
210 vlog.Fatalf("GetDeltasFromPeer:: error processing logs: err %v", err)
211 }
212
213 remote, err := stream.Finish()
214 if err != nil {
215 vlog.Fatalf("GetDeltasFromPeer:: finish failed with err %v", err)
216 }
217
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700218 if err := i.processUpdatedObjects(local, minGens, remote, DeviceID(dID)); err != nil {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700219 vlog.Fatalf("GetDeltasFromPeer:: error processing objects: err %v", err)
220 }
221
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700222 vlog.VI(1).Infof("GetDeltasFromPeer:: Local vector %v", local)
223 vlog.VI(1).Infof("GetDeltasFromPeer:: Remote vector %v", remote)
224}
225
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700226// processLogStream replays an entire log stream spanning multiple
227// generations from different devices received from a single GetDeltas
228// call. It does not perform any conflict resolution during replay.
229// This avoids resolving conflicts that have already been resolved by
230// other devices.
231func (i *syncInitiator) processLogStream(stream SyncGetDeltasStream) (GenVector, error) {
232 // Map to track new generations received in the RPC reply.
233 // TODO(hpucha): If needed, this can be optimized under the
234 // assumption that an entire generation is received
235 // sequentially. We can then parse a generation at a time.
236 newGens := make(map[string]*genMetadata)
237 // Array to track order of arrival for the generations.
238 // We need to preserve this order.
239 var orderGens []string
240 // Compute the minimum generation for every device in this set.
241 minGens := GenVector{}
242
243 for {
244 rec, err := stream.Recv()
245 if err == io.EOF {
246 break
247 }
248 if err != nil {
249 return GenVector{}, err
250 }
251
252 if err := i.insertRecInLogAndDag(&rec); err != nil {
253 return GenVector{}, err
254 }
255 // Mark object dirty.
256 i.updObjects[rec.ObjID] = &objConflictState{}
257
258 // Populate the generation metadata.
259 genKey := generationKey(rec.DevID, rec.GNum)
260 if gen, ok := newGens[genKey]; !ok {
261 // New generation in the stream.
262 orderGens = append(orderGens, genKey)
263 newGens[genKey] = &genMetadata{
264 Count: 1,
265 MaxLSN: rec.LSN,
266 }
267 g, ok := minGens[rec.DevID]
268 if !ok || g > rec.GNum {
269 minGens[rec.DevID] = rec.GNum
270 }
271 } else {
272 gen.Count++
273 if rec.LSN > gen.MaxLSN {
274 gen.MaxLSN = rec.LSN
275 }
276 }
277 }
278
279 if err := i.createGenMetadataBatch(newGens, orderGens); err != nil {
280 return GenVector{}, err
281 }
282
283 return minGens, nil
284}
285
286// insertLogAndDag adds a new log record to log and dag data structures.
287func (i *syncInitiator) insertRecInLogAndDag(rec *LogRec) error {
288 // TODO(hpucha): Eliminate reaching into syncd's lock.
289 i.syncd.lock.Lock()
290 defer i.syncd.lock.Unlock()
291
292 logKey, err := i.syncd.log.putLogRec(rec)
293 if err != nil {
294 return err
295 }
296 if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
297 return err
298 }
299 return nil
300}
301
302// createGenMetadataBatch inserts a batch of generations into the log.
303func (i *syncInitiator) createGenMetadataBatch(newGens map[string]*genMetadata, orderGens []string) error {
304 // TODO(hpucha): Eliminate reaching into syncd's lock.
305 i.syncd.lock.Lock()
306 defer i.syncd.lock.Unlock()
307
308 for _, key := range orderGens {
309 gen := newGens[key]
310 // Insert the generation metadata.
311 dev, gnum, err := splitGenerationKey(key)
312 if err != nil {
313 return err
314 }
315 if err := i.syncd.log.createRemoteGeneration(dev, gnum, gen); err != nil {
316 return err
317 }
318 }
319
320 return nil
321}
322
323// processUpdatedObjects processes all the updates received by the
324// initiator, one object at a time. For each updated object, we first
325// check if the object has any conflicts. If there is a conflict, we
326// resolve the conflict and generate a new store mutation reflecting
327// the conflict resolution. If there is no conflict, we generate a
328// store mutation to simply update the store to the latest value. We
329// then put all these mutations in the store. If the put succeeds, we
330// update the log and dag state suitably (move the head ptr of the
331// object in the dag to the latest version, and create a new log
332// record reflecting conflict resolution if any). Puts to store can
333// fail since preconditions on the objects may have been violated. In
334// this case, we wait to get the latest versions of objects from the
335// store, and recheck if the object has any conflicts and repeat the
336// above steps, until put to store succeeds.
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700337func (i *syncInitiator) processUpdatedObjects(local, minGens, remote GenVector, dID DeviceID) error {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700338 for {
339 if err := i.detectConflicts(); err != nil {
340 return err
341 }
342
343 m, err := i.resolveConflicts()
344 if err != nil {
345 return err
346 }
347
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700348 err = i.updateStoreAndSync(m, local, minGens, remote, dID)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700349 if err == nil {
350 break
351 }
352
353 vlog.Errorf("PutMutations failed %v. Will retry", err)
354 // TODO(hpucha): Sleeping and retrying is a temporary
355 // solution. Next iteration will have coordination
356 // with watch thread to intelligently retry. Hence
357 // this value is not a config param.
358 time.Sleep(10 * time.Second)
359 }
360
361 // Remove any pending state.
362 i.updObjects = make(map[storage.ID]*objConflictState)
363 i.syncd.dag.clearGraft()
364 return nil
365}
366
367// detectConflicts iterates through all the updated objects to detect
368// conflicts.
369func (i *syncInitiator) detectConflicts() error {
370 // TODO(hpucha): Eliminate reaching into syncd's lock.
371 i.syncd.lock.RLock()
372 defer i.syncd.lock.RUnlock()
373
374 for obj, st := range i.updObjects {
375 // Check if object has a conflict.
376 var err error
377 st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
378 if err != nil {
379 return err
380 }
381 if !st.isConflict {
382 rec, err := i.getLogRec(obj, st.newHead)
383 if err != nil {
384 return err
385 }
386 st.resolvVal = &rec.Value
387 // Sanity check.
388 if st.resolvVal.Mutation.Version != st.newHead {
389 return fmt.Errorf("bad mutation %d %d",
390 st.resolvVal.Mutation.Version, st.newHead)
391 }
392 }
393 }
394 return nil
395}
396
397// getLogRec returns the log record corresponding to a given object and its version.
398func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
399 logKey, err := i.syncd.dag.getLogrec(obj, vers)
400 if err != nil {
401 return nil, err
402 }
403 dev, gen, lsn, err := splitLogRecKey(logKey)
404 if err != nil {
405 return nil, err
406 }
407 rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
408 if err != nil {
409 return nil, err
410 }
411 return rec, nil
412}
413
414// resolveConflicts resolves conflicts for updated objects.
Tilak Sharma2af70022014-05-14 11:29:27 -0700415func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700416 switch conflictResolutionPolicy {
Raja Daoudc861ab42014-05-20 14:32:54 -0700417 case useTime:
418 if err := i.resolveConflictsByTime(); err != nil {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700419 return nil, err
420 }
421 default:
422 return nil, fmt.Errorf("unknown conflict resolution policy")
423 }
424
Tilak Sharma2af70022014-05-14 11:29:27 -0700425 var m []raw.Mutation
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700426 for _, st := range i.updObjects {
427 // Append to mutations.
428 st.resolvVal.Mutation.PriorVersion = st.oldHead
429 m = append(m, st.resolvVal.Mutation)
430 }
431 return m, nil
432}
433
Raja Daoudc861ab42014-05-20 14:32:54 -0700434// resolveConflictsByTime resolves conflicts using the timestamps
435// of the conflicting mutations. It picks a mutation with the larger
436// timestamp, i.e. the most recent update. If the timestamps are equal,
437// it uses the mutation version numbers as a tie-breaker, picking the
438// mutation with the larger version.
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700439//
440// TODO(hpucha): Based on a few more policies, reconsider nesting
441// order of the conflict resolution loop and switch-on-policy.
Raja Daoudc861ab42014-05-20 14:32:54 -0700442func (i *syncInitiator) resolveConflictsByTime() error {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700443 for obj, st := range i.updObjects {
444 if !st.isConflict {
445 continue
446 }
447
448 versions := make([]storage.Version, 3)
449 versions[0] = st.oldHead
450 versions[1] = st.newHead
451 versions[2] = st.ancestor
452
453 lrecs, err := i.getLogRecsBatch(obj, versions)
454 if err != nil {
455 return err
456 }
Raja Daoudc861ab42014-05-20 14:32:54 -0700457
458 res := 0
459 switch {
460 case lrecs[0].Value.SyncTime > lrecs[1].Value.SyncTime:
461 res = 0
462 case lrecs[0].Value.SyncTime < lrecs[1].Value.SyncTime:
463 res = 1
464 case lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version:
465 res = 0
466 case lrecs[0].Value.Mutation.Version < lrecs[1].Value.Mutation.Version:
467 res = 1
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700468 }
Raja Daoudc861ab42014-05-20 14:32:54 -0700469
470 m := lrecs[res].Value.Mutation
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700471 m.Version = storage.NewVersion()
472
473 // TODO(hpucha): handle continue and delete flags.
474 st.resolvVal = &LogValue{Mutation: m}
475 }
476
477 return nil
478}
479
480// getLogRecsBatch gets the log records for an array of versions.
481func (i *syncInitiator) getLogRecsBatch(obj storage.ID, versions []storage.Version) ([]*LogRec, error) {
482 // TODO(hpucha): Eliminate reaching into syncd's lock.
483 i.syncd.lock.RLock()
484 defer i.syncd.lock.RUnlock()
485
486 lrecs := make([]*LogRec, len(versions))
487 var err error
488 for p, v := range versions {
489 lrecs[p], err = i.getLogRec(obj, v)
490 if err != nil {
491 return nil, err
492 }
493 }
494 return lrecs, nil
495}
496
497// updateStoreAndSync updates the store, and if that is successful,
498// updates log and dag data structures.
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700499func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700500 // TODO(hpucha): Eliminate reaching into syncd's lock.
501 i.syncd.lock.Lock()
502 defer i.syncd.lock.Unlock()
503
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700504 // TODO(hpucha): We will hold the lock across PutMutations rpc
505 // to prevent a race with watcher. The next iteration will
506 // clean up this coordination.
Tilak Sharma79cfb412014-05-27 18:34:57 -0700507 if store := i.syncd.store; store != nil && len(m) > 0 {
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700508 stream, err := store.PutMutations(rt.R().TODOContext())
Tilak Sharma2af70022014-05-14 11:29:27 -0700509 if err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700510 vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
Tilak Sharma2af70022014-05-14 11:29:27 -0700511 return err
512 }
513 for i := range m {
514 if err := stream.Send(m[i]); err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700515 vlog.Errorf("updateStoreAndSync:: send err %v", err)
Tilak Sharma2af70022014-05-14 11:29:27 -0700516 return err
517 }
518 }
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700519 if err := stream.CloseSend(); err != nil {
520 vlog.Errorf("updateStoreAndSync:: closesend err %v", err)
521 return err
522 }
Tilak Sharma2af70022014-05-14 11:29:27 -0700523 if err := stream.Finish(); err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700524 vlog.Errorf("updateStoreAndSync:: finish err %v", err)
Himabindu Puchaed4f2342014-05-15 15:18:24 -0700525 return err
526 }
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700527 }
528
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700529 vlog.VI(2).Infof("updateStoreAndSync:: putmutations succeeded")
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700530 if err := i.updateLogAndDag(); err != nil {
531 return err
532 }
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700533
534 if err := i.updateGenVecs(local, minGens, remote, DeviceID(dID)); err != nil {
535 return err
536 }
537
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700538 return nil
539}
540
541// updateLogAndDag updates the log and dag data structures on a successful store put.
542func (i *syncInitiator) updateLogAndDag() error {
543 for obj, st := range i.updObjects {
544 if st.isConflict {
545 // Object had a conflict, which was resolved successfully.
546 // Put is successful, create a log record.
547 parents := []storage.Version{st.newHead, st.oldHead}
548 rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
549 if err != nil {
550 return err
551 }
552
553 logKey, err := i.syncd.log.putLogRec(rec)
554 if err != nil {
555 return err
556 }
557
558 // Put is successful, add a new DAG node.
559 if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
560 return err
561 }
562 }
563
564 // Move the head.
565 if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
566 return err
567 }
568 }
569 return nil
570}
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700571
572// updateGenVecs updates local, reclaim and remote vectors at the end of an initiator cycle.
573func (i *syncInitiator) updateGenVecs(local, minGens, remote GenVector, dID DeviceID) error {
574 // Update the local gen vector and put it in kvdb only if we have new updates.
575 if len(i.updObjects) > 0 {
576 if err := i.syncd.devtab.updateLocalGenVector(local, remote); err != nil {
577 return err
578 }
579
580 if err := i.syncd.devtab.putGenVec(i.syncd.id, local); err != nil {
581 return err
582 }
583
584 if err := i.syncd.devtab.updateReclaimVec(minGens); err != nil {
585 return err
586 }
587 }
588
589 // Cache the remote generation vector for space reclamation.
590 if err := i.syncd.devtab.putGenVec(dID, remote); err != nil {
591 return err
592 }
593 return nil
594}