blob: 101d08c353f2486f98172dfe860271b9f5ef88c4 [file] [log] [blame]
Himabindu Pucha2752a7e2015-06-12 14:07:07 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
7// New log records are created when objects in the local store are created,
8// updated or deleted. Local log records are also replayed to keep the
9// per-object dags consistent with the local store state. Sync module assigns
10// each log record created within a Database a unique sequence number, called
11// the generation number. Locally on each device, the position of each log
12// record is also recorded relative to other local and remote log records.
13//
14// When a device receives a request to send log records, it first computes the
15// missing generations between itself and the incoming request on a per-prefix
16// basis. It then sends all the log records belonging to the missing generations
17// in the order they occur locally (using the local log position). A device that
18// receives log records over the network replays all the records received from
19// another device in a single batch. Each replayed log record adds a new version
20// to the dag of the object contained in the log record. At the end of replaying
21// all the log records, conflict detection and resolution is carried out for all
22// the objects learned during this iteration. Conflict detection and resolution
23// is carried out after a batch of log records are replayed, instead of
24// incrementally after each record is replayed, to avoid repeating conflict
25// resolution already performed by other devices.
26//
27// Sync module tracks the current generation number and the current local log
28// position for each Database. In addition, it also tracks the current
29// generation vector for a Database. Log records are indexed such that they can
30// be selectively retrieved from the store for any missing generation from any
31// device.
32
33import (
Himabindu Pucha53497022015-06-19 11:57:13 -070034 "container/heap"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070035 "fmt"
Himabindu Pucha53497022015-06-19 11:57:13 -070036 "sort"
Himabindu Pucha03ef3932015-06-26 17:56:09 -070037 "strconv"
Himabindu Pucha53497022015-06-19 11:57:13 -070038 "strings"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070039
40 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
41 "v.io/syncbase/x/ref/services/syncbase/server/util"
42 "v.io/syncbase/x/ref/services/syncbase/store"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070043 "v.io/v23/context"
Himabindu Pucha53497022015-06-19 11:57:13 -070044 "v.io/v23/rpc"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070045 "v.io/v23/verror"
46)
47
48// dbSyncStateInMem represents the in-memory sync state of a Database.
49type dbSyncStateInMem struct {
Himabindu Pucha03ef3932015-06-26 17:56:09 -070050 gen uint64
51 pos uint64
52
53 ckPtGen uint64
54 genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070055}
56
57// initSync initializes the sync module during startup. It scans all the
Raja Daoudcb50b5d2015-06-26 18:37:24 -070058// databases across all apps to initialize the following:
59// a) in-memory sync state of a Database consisting of the current generation
60// number, log position and generation vector.
61// b) watcher map of prefixes currently being synced.
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070062//
63// TODO(hpucha): This is incomplete. Flesh this out further.
64func (s *syncService) initSync(ctx *context.T) error {
65 s.syncStateLock.Lock()
66 defer s.syncStateLock.Unlock()
67
68 var errFinal error
69 s.syncState = make(map[string]*dbSyncStateInMem)
70
Raja Daoudcb50b5d2015-06-26 18:37:24 -070071 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
72 // Scan the SyncGroups, skipping those not yet being watched.
73 forEachSyncGroup(st, func(sg *interfaces.SyncGroup) bool {
74 // TODO(rdaoud): only use SyncGroups that have been
75 // marked as "watchable" by the sync watcher thread.
76 // This is to handle the case of a SyncGroup being
77 // created but Syncbase restarting before the watcher
78 // processed the SyncGroupOp entry in the watch queue.
79 // It should not be syncing that SyncGroup's data after
80 // restart, but wait until the watcher processes the
81 // entry as would have happened without a restart.
82 for _, prefix := range sg.Spec.Prefixes {
83 incrWatchPrefix(appName, dbName, prefix)
84 }
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070085 return false
Raja Daoudcb50b5d2015-06-26 18:37:24 -070086 })
87
88 if false {
89 // Fetch the sync state.
90 ds, err := getDbSyncState(ctx, st)
91 if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
92 errFinal = err
93 return false
94 }
95 var scanStart, scanLimit []byte
96 // Figure out what to scan among local log records.
97 if verror.ErrorID(err) == verror.ErrNoExist.ID {
98 scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
99 } else {
100 scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
101 }
102 var maxpos uint64
103 var dbName string
104 // Scan local log records to find the most recent one.
105 st.Scan(scanStart, scanLimit)
106 // Scan remote log records using the persisted GenVector.
107 s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700108 }
109
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700110 return false
111 })
112
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700113 return errFinal
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700114}
115
116// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
117// positions in a Database's log. Used when local updates result in log
118// entries.
119func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) {
120 return s.reserveGenAndPosInternal(appName, dbName, count, count)
121}
122
123// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
124// when remote log records are received.
125func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 {
126 _, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count)
127 return pos
128}
129
130func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) {
131 s.syncStateLock.Lock()
132 defer s.syncStateLock.Unlock()
133
Raja Daoud28d3b602015-06-17 20:02:20 -0700134 name := appDbName(appName, dbName)
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700135 ds, ok := s.syncState[name]
136 if !ok {
137 ds = &dbSyncStateInMem{gen: 1}
138 s.syncState[name] = ds
139 }
140
141 gen := ds.gen
142 pos := ds.pos
143
144 ds.gen += genCount
145 ds.pos += posCount
146
147 return gen, pos
148}
149
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700150// checkPtLocalGen freezes the local generation number for the responder's use.
151func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
152 s.syncStateLock.Lock()
153 defer s.syncStateLock.Unlock()
154
155 name := appDbName(appName, dbName)
156 ds, ok := s.syncState[name]
157 if !ok {
158 return verror.New(verror.ErrInternal, ctx, "db state not found", name)
159 }
160
161 ds.ckPtGen = ds.gen
162 return nil
163}
164
165// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
166func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
167 s.syncStateLock.Lock()
168 defer s.syncStateLock.Unlock()
169
170 name := appDbName(appName, dbName)
171 ds, ok := s.syncState[name]
172 if !ok {
173 return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
174 }
175
176 dsCopy := &dbSyncStateInMem{
177 gen: ds.gen,
178 pos: ds.pos,
179 ckPtGen: ds.ckPtGen,
180 }
181
182 // Make a copy of the genvec.
183 dsCopy.genvec = copyGenVec(ds.genvec)
184
185 return dsCopy, nil
186}
187
Himabindu Pucha53497022015-06-19 11:57:13 -0700188// getDbGenInfo returns a copy of the current generation information of the Database.
189func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
190 s.syncStateLock.Lock()
191 defer s.syncStateLock.Unlock()
192
193 name := appDbName(appName, dbName)
194 ds, ok := s.syncState[name]
195 if !ok {
196 return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
197 }
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700198
Himabindu Pucha53497022015-06-19 11:57:13 -0700199 // Make a copy of the genvec.
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700200 genvec := copyGenVec(ds.genvec)
201
202 // Add local generation information to the genvec.
203 for _, gv := range genvec {
204 gv[s.id] = ds.ckPtGen
Himabindu Pucha53497022015-06-19 11:57:13 -0700205 }
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700206
207 return genvec, ds.ckPtGen, nil
208}
209
210// putDbGenInfoRemote puts the current remote generation information of the Database.
211func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
212 s.syncStateLock.Lock()
213 defer s.syncStateLock.Unlock()
214
215 name := appDbName(appName, dbName)
216 ds, ok := s.syncState[name]
217 if !ok {
218 return verror.New(verror.ErrInternal, ctx, "db state not found", name)
219 }
220
221 // Make a copy of the genvec.
222 ds.genvec = copyGenVec(genvec)
223
224 return nil
Himabindu Pucha53497022015-06-19 11:57:13 -0700225}
226
Raja Daoud28d3b602015-06-17 20:02:20 -0700227// appDbName combines the app and db names to return a globally unique name for
228// a Database. This relies on the fact that the app name is globally unique and
229// the db name is unique within the scope of the app.
230func appDbName(appName, dbName string) string {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700231 return util.JoinKeyParts(appName, dbName)
232}
233
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700234// splitAppDbName is the inverse of appDbName and returns app and db name from a
235// globally unique name for a Database.
236func splitAppDbName(ctx *context.T, name string) (string, string, error) {
237 parts := util.SplitKeyParts(name)
238 if len(parts) != 2 {
239 return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name)
240 }
241 return parts[0], parts[1], nil
242}
243
244func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
245 genvec := make(interfaces.GenVector)
246 for p, inpgv := range in {
247 pgv := make(interfaces.PrefixGenVector)
248 for id, gen := range inpgv {
249 pgv[id] = gen
250 }
251 genvec[p] = pgv
252 }
253 return genvec
254}
255
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700256////////////////////////////////////////////////////////////
257// Low-level utility functions to access sync state.
258
259// dbSyncStateKey returns the key used to access the sync state of a Database.
260func dbSyncStateKey() string {
261 return util.JoinKeyParts(util.SyncPrefix, "dbss")
262}
263
264// putDbSyncState persists the sync state object for a given Database.
265func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
266 _ = tx.(store.Transaction)
267 if err := util.PutObject(tx, dbSyncStateKey(), ds); err != nil {
268 return verror.New(verror.ErrInternal, ctx, err)
269 }
270 return nil
271}
272
273// getDbSyncState retrieves the sync state object for a given Database.
274func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
275 var ds dbSyncState
276 if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
Himabindu Pucha53497022015-06-19 11:57:13 -0700277 return nil, translateError(ctx, err, dbSyncStateKey())
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700278 }
279 return &ds, nil
280}
281
282////////////////////////////////////////////////////////////
283// Low-level utility functions to access log records.
284
285// logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
286func logRecsPerDeviceScanPrefix(id uint64) string {
287 return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id))
288}
289
290// logRecKey returns the key used to access a specific log record.
291func logRecKey(id, gen uint64) string {
292 return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
293}
294
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700295// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
296func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
297 parts := util.SplitKeyParts(key)
298 verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
299 if len(parts) != 4 {
300 return 0, 0, verr
301 }
302 id, err := strconv.ParseUint(parts[2], 10, 64)
303 if err != nil {
304 return 0, 0, verr
305 }
306 gen, err := strconv.ParseUint(parts[3], 10, 64)
307 if err != nil {
308 return 0, 0, verr
309 }
310 return id, gen, nil
311}
312
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700313// hasLogRec returns true if the log record for (devid, gen) exists.
314func hasLogRec(st store.StoreReader, id, gen uint64) bool {
315 // TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
316 var rec localLogRec
317 if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
318 return false
319 }
320 return true
321}
322
323// putLogRec stores the log record.
324func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
325 _ = tx.(store.Transaction)
326 if err := util.PutObject(tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec); err != nil {
327 return verror.New(verror.ErrInternal, ctx, err)
328 }
329 return nil
330}
331
332// getLogRec retrieves the log record for a given (devid, gen).
333func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
334 var rec localLogRec
335 if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
Himabindu Pucha53497022015-06-19 11:57:13 -0700336 return nil, translateError(ctx, err, logRecKey(id, gen))
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700337 }
338 return &rec, nil
339}
340
341// delLogRec deletes the log record for a given (devid, gen).
342func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
343 _ = tx.(store.Transaction)
344
345 if err := tx.Delete([]byte(logRecKey(id, gen))); err != nil {
346 return verror.New(verror.ErrInternal, ctx, err)
347 }
348 return nil
349}
Himabindu Pucha53497022015-06-19 11:57:13 -0700350
351////////////////////////////////////////////////////////////
352// Genvector-related utilities.
353
354// sendDeltasPerDatabase sends to an initiator all the missing generations
355// corresponding to the prefixes requested for this Database, and a genvector
356// summarizing the knowledge transferred from the responder to the
357// initiator. This happens in two phases:
358//
359// In the first phase, for a given set of nested prefixes from the initiator,
360// the shortest prefix in that set is extracted. The initiator's prefix
361// genvector for this shortest prefix represents the lower bound on its
362// knowledge for the entire set of nested prefixes. This prefix genvector
363// (representing the lower bound) is diffed with all the responder prefix
364// genvectors corresponding to same or deeper prefixes compared to the initiator
365// prefix. This diff produces a bound on the missing knowledge. For example, say
366// the initiator is interested in prefixes {foo, foobar}, where each prefix is
367// associated with a prefix genvector. Since the initiator strictly has as much
368// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
369// prefix genvector is chosen as the lower bound for the initiator's
370// knowledge. Similarly, say the responder has knowledge on prefixes {f,
371// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
372// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
373// compute a bound on missing generations (all responder's prefixes that match
374// "foo". Note that since the responder doesn't have a prefix genvector at
375// "foo", its knowledge at "f" is applicable to "foo").
376//
377// Since the first phase outputs an aggressive calculation of missing
378// generations containing more generation entries than strictly needed by the
379// initiator, in the second phase, each missing generation is sent to the
380// initiator only if the initiator is eligible for it and is not aware of
381// it. The generations are sent to the initiator in the same order as the
382// responder learned them so that the initiator can reconstruct the DAG for the
383// objects by learning older nodes first.
384func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) {
385 // Phase 1 of sendDeltas. diff contains the bound on the generations
386 // missing from the initiator per device.
387 diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec)
388 if err != nil {
389 return nil, err
390 }
391
392 // Phase 2 of sendDeltas: Process the diff, filtering out records that
393 // are not needed, and send the remainder on the wire ordered.
394 st, err := s.getDbStore(ctx, call, appName, dbName)
395 if err != nil {
396 return nil, err
397 }
398
399 // We now visit every log record in the generation range as obtained
400 // from phase 1 in their log order. We use a heap to incrementally sort
401 // the log records as per their position in the log.
402 //
403 // Init the min heap, one entry per device in the diff.
404 mh := make(minHeap, 0, len(diff))
405 for dev, r := range diff {
406 r.cur = r.min
407 rec, err := getNextLogRec(ctx, st, dev, r)
408 if err != nil {
409 return nil, err
410 }
411 if rec != nil {
412 mh = append(mh, rec)
413 } else {
414 delete(diff, dev)
415 }
416 }
417 heap.Init(&mh)
418
419 // Process the log records in order.
420 initPfxs := extractAndSortPrefixes(initVec)
421
422 for mh.Len() > 0 {
423 rec := heap.Pop(&mh).(*localLogRec)
424
425 if !filterLogRec(rec, initVec, initPfxs) {
426 // Send on the wire.
427 wireRec := interfaces.LogRec{Metadata: rec.Metadata}
428 // TODO(hpucha): Hash out this fake stream stuff when
429 // defining the RPC and the rest of the responder.
430 stream.Send(wireRec)
431 }
432
433 // Add a new record from the same device if not done.
434 dev := rec.Metadata.Id
435 rec, err := getNextLogRec(ctx, st, dev, diff[dev])
436 if err != nil {
437 return nil, err
438 }
439 if rec != nil {
440 heap.Push(&mh, rec)
441 } else {
442 delete(diff, dev)
443 }
444 }
445
446 return outVec, nil
447}
448
449// computeDeltaBound computes the bound on missing generations across all
450// requested prefixes (phase 1 of sendDeltas).
451func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) {
452 respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName)
453 if err != nil {
454 return nil, nil, err
455 }
456 respPfxs := extractAndSortPrefixes(respVec)
457 initPfxs := extractAndSortPrefixes(initVec)
458 if len(initPfxs) == 0 {
459 return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
460 }
461
462 outVec := make(interfaces.GenVector)
463 diff := make(genRangeVector)
464 pfx := initPfxs[0]
465
466 for _, p := range initPfxs {
467 if strings.HasPrefix(p, pfx) && p != pfx {
468 continue
469 }
470
471 // Process this prefix as this is the start of a new set of
472 // nested prefixes.
473 pfx = p
474
475 // Lower bound on initiator's knowledge for this prefix set.
476 initpgv := initVec[pfx]
477
478 // Find the relevant responder prefixes and add the corresponding knowledge.
479 var respgv interfaces.PrefixGenVector
480 var rpStart string
481 for _, rp := range respPfxs {
482 if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
483 // No relationship with pfx.
484 continue
485 }
486
487 if strings.HasPrefix(pfx, rp) {
488 // If rp is a prefix of pfx, remember it because
489 // it may be a potential starting point for the
490 // responder's knowledge. The actual starting
491 // point is the deepest prefix where rp is a
492 // prefix of pfx.
493 //
494 // Say the initiator is looking for "foo", and
495 // the responder has knowledge for "f" and "fo",
496 // the responder's starting point will be the
497 // prefix genvector for "fo". Similarly, if the
498 // responder has knowledge for "foo", the
499 // starting point will be the prefix genvector
500 // for "foo".
501 rpStart = rp
502 } else {
503 // If pfx is a prefix of rp, this knowledge must
504 // be definitely sent to the initiator. Diff the
505 // prefix genvectors to adjust the delta bound and
506 // include in outVec.
507 respgv = respVec[rp]
Himabindu Pucha53497022015-06-19 11:57:13 -0700508 s.diffPrefixGenVectors(respgv, initpgv, diff)
509 outVec[rp] = respgv
510 }
511 }
512
513 // Deal with the starting point.
514 if rpStart == "" {
515 // No matching prefixes for pfx were found.
516 respgv = make(interfaces.PrefixGenVector)
Himabindu Pucha03ef3932015-06-26 17:56:09 -0700517 respgv[s.id] = respGen
Himabindu Pucha53497022015-06-19 11:57:13 -0700518 } else {
519 respgv = respVec[rpStart]
520 }
Himabindu Pucha53497022015-06-19 11:57:13 -0700521 s.diffPrefixGenVectors(respgv, initpgv, diff)
522 outVec[pfx] = respgv
523 }
524
525 return diff, outVec, nil
526}
527
528// genRange represents a range of generations (min and max inclusive).
529type genRange struct {
530 min uint64
531 max uint64
532 cur uint64
533}
534
535type genRangeVector map[uint64]*genRange
536
537// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
538// and the initiator, and updates the range of generations per device known to
539// the responder but not known to the initiator. "gens" (generation range) is
540// passed in as an input argument so that it can be incrementally updated as the
541// range of missing generations grows when different responder prefix genvectors
542// are used to compute the diff.
543//
544// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
545// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
546// two vectors returns: {A:[6-10], C:[1-1]}.
547//
548// TODO(hpucha): Add reclaimVec for GCing.
549func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) {
550 // Compute missing generations for devices that are in both initiator's and responder's vectors.
551 for devid, gen := range initPVec {
552 rgen, ok := respPVec[devid]
553 // Skip since responder doesn't know of this device.
554 if ok {
555 updateDevRange(devid, rgen, gen, gens)
556 }
557 }
558
559 // Compute missing generations for devices not in initiator's vector but in responder's vector.
560 for devid, rgen := range respPVec {
561 if _, ok := initPVec[devid]; !ok {
562 updateDevRange(devid, rgen, 0, gens)
563 }
564 }
565}
566
567func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
568 if gen < rgen {
569 // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
570 if r, ok := gens[devid]; !ok {
571 gens[devid] = &genRange{min: gen + 1, max: rgen}
572 } else {
573 if gen+1 < r.min {
574 r.min = gen + 1
575 }
576 if rgen > r.max {
577 r.max = rgen
578 }
579 }
580 }
581}
582
583func extractAndSortPrefixes(vec interfaces.GenVector) []string {
584 pfxs := make([]string, len(vec))
585 i := 0
586 for p := range vec {
587 pfxs[i] = p
588 i++
589 }
590 sort.Strings(pfxs)
591 return pfxs
592}
593
594// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
595// loop.
596func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
597 for i := r.cur; i <= r.max; i++ {
598 rec, err := getLogRec(ctx, sn, dev, i)
599 if err == nil {
600 r.cur = i + 1
601 return rec, nil
602 }
603 if verror.ErrorID(err) != verror.ErrNoExist.ID {
604 return nil, err
605 }
606 }
607 return nil, nil
608}
609
610// Note: initPfxs is sorted.
611func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
612 filter := true
613
614 var maxGen uint64
615 for _, p := range initPfxs {
616 if strings.HasPrefix(rec.Metadata.ObjId, p) {
617 // Do not filter. Initiator is interested in this
618 // prefix.
619 filter = false
620
621 // Track if the initiator knows of this record.
622 gen := initVec[p][rec.Metadata.Id]
623 if maxGen < gen {
624 maxGen = gen
625 }
626 }
627 }
628
629 // Filter this record if the initiator already has it.
630 if maxGen >= rec.Metadata.Gen {
631 return true
632 }
633
634 return filter
635}
636
637// A minHeap implements heap.Interface and holds local log records.
638type minHeap []*localLogRec
639
640func (mh minHeap) Len() int { return len(mh) }
641
642func (mh minHeap) Less(i, j int) bool {
643 return mh[i].Pos < mh[j].Pos
644}
645
646func (mh minHeap) Swap(i, j int) {
647 mh[i], mh[j] = mh[j], mh[i]
648}
649
650func (mh *minHeap) Push(x interface{}) {
651 item := x.(*localLogRec)
652 *mh = append(*mh, item)
653}
654
655func (mh *minHeap) Pop() interface{} {
656 old := *mh
657 n := len(old)
658 item := old[n-1]
659 *mh = old[0 : n-1]
660 return item
661}
662
663type logRecStream interface {
664 Send(interfaces.LogRec)
665}