Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package vsync |
| 6 | |
| 7 | import ( |
| 8 | "container/heap" |
| 9 | "sort" |
| 10 | "strings" |
| 11 | |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 12 | wire "v.io/syncbase/v23/services/syncbase/nosql" |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 13 | "v.io/syncbase/x/ref/services/syncbase/server/interfaces" |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 14 | "v.io/syncbase/x/ref/services/syncbase/server/watchable" |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 15 | "v.io/syncbase/x/ref/services/syncbase/store" |
| 16 | "v.io/v23/context" |
| 17 | "v.io/v23/verror" |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 18 | "v.io/x/lib/vlog" |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 19 | ) |
| 20 | |
| 21 | // GetDeltas implements the responder side of the GetDeltas RPC. |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 22 | func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error { |
| 23 | vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator) |
| 24 | defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator) |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 25 | |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 26 | recvr := call.RecvStream() |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 27 | for recvr.Advance() { |
| 28 | req := recvr.Value() |
| 29 | // Ignoring errors since if one Database fails for any reason, |
| 30 | // it is fine to continue to the next one. In fact, sometimes |
| 31 | // the failure might be genuine. For example, the responder is |
| 32 | // no longer part of the requested SyncGroups, or the app/db is |
| 33 | // locally deleted, or a permission change has denied access. |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 34 | rSt := newResponderState(ctx, call, s, req, initiator) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 35 | rSt.sendDeltasPerDatabase(ctx) |
| 36 | } |
| 37 | |
| 38 | // TODO(hpucha): Is there a need to call finish or some such? |
| 39 | return recvr.Err() |
| 40 | } |
| 41 | |
| 42 | // responderState is state accumulated per Database by the responder during an |
| 43 | // initiation round. |
| 44 | type responderState struct { |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 45 | req interfaces.DeltaReq |
| 46 | call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC. |
| 47 | initiator string |
| 48 | errState error // Captures the error from the first two phases of the responder. |
| 49 | sync *syncService |
| 50 | st store.Store // Store handle to the Database. |
| 51 | diff genRangeVector |
| 52 | outVec interfaces.GenVector |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 53 | } |
| 54 | |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 55 | func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState { |
| 56 | rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator} |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 57 | return rSt |
| 58 | } |
| 59 | |
| 60 | // sendDeltasPerDatabase sends to an initiator all the missing generations |
| 61 | // corresponding to the prefixes requested for this Database, and a genvector |
| 62 | // summarizing the knowledge transferred from the responder to the |
| 63 | // initiator. This happens in three phases: |
| 64 | // |
| 65 | // In the first phase, the initiator is checked against the SyncGroup ACLs of |
| 66 | // all the SyncGroups it is requesting, and only those prefixes that belong to |
| 67 | // allowed SyncGroups are carried forward. |
| 68 | // |
| 69 | // In the second phase, for a given set of nested prefixes from the initiator, |
| 70 | // the shortest prefix in that set is extracted. The initiator's prefix |
| 71 | // genvector for this shortest prefix represents the lower bound on its |
| 72 | // knowledge for the entire set of nested prefixes. This prefix genvector |
| 73 | // (representing the lower bound) is diffed with all the responder prefix |
| 74 | // genvectors corresponding to same or deeper prefixes compared to the initiator |
| 75 | // prefix. This diff produces a bound on the missing knowledge. For example, say |
| 76 | // the initiator is interested in prefixes {foo, foobar}, where each prefix is |
| 77 | // associated with a prefix genvector. Since the initiator strictly has as much |
| 78 | // or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s |
| 79 | // prefix genvector is chosen as the lower bound for the initiator's |
| 80 | // knowledge. Similarly, say the responder has knowledge on prefixes {f, |
| 81 | // foobarX, foobarY, bar}. The responder diffs the prefix genvectors for |
| 82 | // prefixes f, foobarX and foobarY with the initiator's prefix genvector to |
| 83 | // compute a bound on missing generations (all responder's prefixes that match |
| 84 | // "foo". Note that since the responder doesn't have a prefix genvector at |
| 85 | // "foo", its knowledge at "f" is applicable to "foo"). |
| 86 | // |
| 87 | // Since the second phase outputs an aggressive calculation of missing |
| 88 | // generations containing more generation entries than strictly needed by the |
| 89 | // initiator, in the third phase, each missing generation is sent to the |
| 90 | // initiator only if the initiator is eligible for it and is not aware of |
| 91 | // it. The generations are sent to the initiator in the same order as the |
| 92 | // responder learned them so that the initiator can reconstruct the DAG for the |
| 93 | // objects by learning older nodes first. |
| 94 | func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error { |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 95 | // TODO(rdaoud): for such vlog.VI() calls where the function name is |
| 96 | // embedded, consider using a helper function to auto-fill it instead |
| 97 | // (see http://goo.gl/mEa4L0) but only incur that overhead when the |
| 98 | // logging level specified is enabled. |
| 99 | vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v", |
| 100 | rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec) |
| 101 | |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 102 | // Phase 1 of sendDeltas: Authorize the initiator and respond to the |
| 103 | // caller only for the SyncGroups that allow access. |
| 104 | rSt.authorizeAndFilterSyncGroups(ctx) |
| 105 | |
| 106 | // Phase 2 of sendDeltas: diff contains the bound on the |
| 107 | // generations missing from the initiator per device. |
| 108 | rSt.computeDeltaBound(ctx) |
| 109 | |
| 110 | // Phase 3 of sendDeltas: Process the diff, filtering out records that |
| 111 | // are not needed, and send the remainder on the wire ordered. |
| 112 | return rSt.filterAndSendDeltas(ctx) |
| 113 | } |
| 114 | |
| 115 | // authorizeAndFilterSyncGroups authorizes the initiator against the requested |
| 116 | // SyncGroups and filters the initiator's prefixes to only include those from |
| 117 | // allowed SyncGroups (phase 1 of sendDeltas). |
| 118 | func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) { |
| 119 | rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName) |
| 120 | if rSt.errState != nil { |
| 121 | return |
| 122 | } |
| 123 | |
| 124 | allowedPfxs := make(map[string]struct{}) |
| 125 | for sgid := range rSt.req.SgIds { |
| 126 | // Check permissions for the SyncGroup. |
| 127 | var sg *interfaces.SyncGroup |
| 128 | sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid) |
| 129 | if rSt.errState != nil { |
| 130 | return |
| 131 | } |
| 132 | rSt.errState = authorize(ctx, rSt.call.Security(), sg) |
| 133 | if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID { |
| 134 | continue |
| 135 | } else if rSt.errState != nil { |
| 136 | return |
| 137 | } |
| 138 | |
| 139 | for _, p := range sg.Spec.Prefixes { |
| 140 | allowedPfxs[p] = struct{}{} |
| 141 | } |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 142 | |
| 143 | // Add the initiator to the SyncGroup membership if not already |
| 144 | // in it. It is a temporary solution until SyncGroup metadata |
| 145 | // is synchronized peer to peer. |
| 146 | // TODO(rdaoud): remove this when SyncGroups are synced. |
| 147 | rSt.addInitiatorToSyncGroup(ctx, sgid) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 148 | } |
| 149 | |
| 150 | // Filter the initiator's prefixes to what is allowed. |
| 151 | for pfx := range rSt.req.InitVec { |
| 152 | if _, ok := allowedPfxs[pfx]; ok { |
| 153 | continue |
| 154 | } |
| 155 | allowed := false |
| 156 | for p := range allowedPfxs { |
| 157 | if strings.HasPrefix(pfx, p) { |
| 158 | allowed = true |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | if !allowed { |
| 163 | delete(rSt.req.InitVec, pfx) |
| 164 | } |
| 165 | } |
| 166 | return |
| 167 | } |
| 168 | |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 169 | // addInitiatorToSyncGroup adds the request initiator to the membership of the |
| 170 | // given SyncGroup if the initiator is not already a member. It is a temporary |
| 171 | // solution until SyncGroup metadata starts being synchronized, at which time |
| 172 | // peers will learn of new members through mutations of the SyncGroup metadata |
| 173 | // by the SyncGroup administrators. |
| 174 | // Note: the joiner metadata is fake because the responder does not have it. |
| 175 | func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) { |
| 176 | if rSt.initiator == "" { |
| 177 | return |
| 178 | } |
| 179 | |
Sergey Rogulenko | cf49555 | 2015-08-04 18:00:21 -0700 | [diff] [blame] | 180 | err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error { |
Raja Daoud | ccfd6c1 | 2015-08-03 18:46:28 -0700 | [diff] [blame] | 181 | sg, err := getSyncGroupById(ctx, tx, gid) |
| 182 | if err != nil { |
| 183 | return err |
| 184 | } |
| 185 | |
| 186 | // If the initiator is already a member of the SyncGroup abort |
| 187 | // the transaction with a special error code. |
| 188 | if _, ok := sg.Joiners[rSt.initiator]; ok { |
| 189 | return verror.New(verror.ErrExist, ctx, "member already in SyncGroup") |
| 190 | } |
| 191 | |
| 192 | vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid) |
| 193 | sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1} |
| 194 | return setSGDataEntry(ctx, tx, gid, sg) |
| 195 | }) |
| 196 | |
| 197 | if err != nil && verror.ErrorID(err) != verror.ErrExist.ID { |
| 198 | vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err) |
| 199 | } |
| 200 | } |
| 201 | |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 202 | // computeDeltaBound computes the bound on missing generations across all |
| 203 | // requested prefixes (phase 2 of sendDeltas). |
| 204 | func (rSt *responderState) computeDeltaBound(ctx *context.T) { |
| 205 | // Check error from phase 1. |
| 206 | if rSt.errState != nil { |
| 207 | return |
| 208 | } |
| 209 | |
| 210 | if len(rSt.req.InitVec) == 0 { |
| 211 | rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector") |
| 212 | return |
| 213 | } |
| 214 | |
| 215 | var respVec interfaces.GenVector |
| 216 | var respGen uint64 |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 217 | respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 218 | if rSt.errState != nil { |
| 219 | return |
| 220 | } |
| 221 | respPfxs := extractAndSortPrefixes(respVec) |
| 222 | initPfxs := extractAndSortPrefixes(rSt.req.InitVec) |
| 223 | |
| 224 | rSt.outVec = make(interfaces.GenVector) |
| 225 | rSt.diff = make(genRangeVector) |
| 226 | pfx := initPfxs[0] |
| 227 | |
| 228 | for _, p := range initPfxs { |
| 229 | if strings.HasPrefix(p, pfx) && p != pfx { |
| 230 | continue |
| 231 | } |
| 232 | |
| 233 | // Process this prefix as this is the start of a new set of |
| 234 | // nested prefixes. |
| 235 | pfx = p |
| 236 | |
| 237 | // Lower bound on initiator's knowledge for this prefix set. |
| 238 | initpgv := rSt.req.InitVec[pfx] |
| 239 | |
| 240 | // Find the relevant responder prefixes and add the corresponding knowledge. |
| 241 | var respgv interfaces.PrefixGenVector |
| 242 | var rpStart string |
| 243 | for _, rp := range respPfxs { |
| 244 | if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) { |
| 245 | // No relationship with pfx. |
| 246 | continue |
| 247 | } |
| 248 | |
| 249 | if strings.HasPrefix(pfx, rp) { |
| 250 | // If rp is a prefix of pfx, remember it because |
| 251 | // it may be a potential starting point for the |
| 252 | // responder's knowledge. The actual starting |
| 253 | // point is the deepest prefix where rp is a |
| 254 | // prefix of pfx. |
| 255 | // |
| 256 | // Say the initiator is looking for "foo", and |
| 257 | // the responder has knowledge for "f" and "fo", |
| 258 | // the responder's starting point will be the |
| 259 | // prefix genvector for "fo". Similarly, if the |
| 260 | // responder has knowledge for "foo", the |
| 261 | // starting point will be the prefix genvector |
| 262 | // for "foo". |
| 263 | rpStart = rp |
| 264 | } else { |
| 265 | // If pfx is a prefix of rp, this knowledge must |
| 266 | // be definitely sent to the initiator. Diff the |
| 267 | // prefix genvectors to adjust the delta bound and |
| 268 | // include in outVec. |
| 269 | respgv = respVec[rp] |
| 270 | rSt.diffPrefixGenVectors(respgv, initpgv) |
| 271 | rSt.outVec[rp] = respgv |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | // Deal with the starting point. |
| 276 | if rpStart == "" { |
| 277 | // No matching prefixes for pfx were found. |
| 278 | respgv = make(interfaces.PrefixGenVector) |
| 279 | respgv[rSt.sync.id] = respGen |
| 280 | } else { |
| 281 | respgv = respVec[rpStart] |
| 282 | } |
| 283 | rSt.diffPrefixGenVectors(respgv, initpgv) |
| 284 | rSt.outVec[pfx] = respgv |
| 285 | } |
| 286 | |
Raja Daoud | 4171c9c | 2015-07-14 20:07:44 -0700 | [diff] [blame] | 287 | vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v", |
| 288 | rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 289 | return |
| 290 | } |
| 291 | |
| 292 | // filterAndSendDeltas filters the computed delta to remove records already |
| 293 | // known by the initiator, and sends the resulting records to the initiator |
| 294 | // (phase 3 of sendDeltas). |
| 295 | func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error { |
| 296 | // Always send a start and finish response so that the initiator can |
| 297 | // move on to the next Database. |
| 298 | // |
| 299 | // TODO(hpucha): Although ok for now to call SendStream once per |
| 300 | // Database, would like to make this implementation agnostic. |
| 301 | sender := rSt.call.SendStream() |
| 302 | sender.Send(interfaces.DeltaRespStart{true}) |
| 303 | defer sender.Send(interfaces.DeltaRespFinish{true}) |
| 304 | |
| 305 | // Check error from phase 2. |
| 306 | if rSt.errState != nil { |
| 307 | return rSt.errState |
| 308 | } |
| 309 | |
| 310 | // First two phases were successful. So now on to phase 3. We now visit |
| 311 | // every log record in the generation range as obtained from phase 1 in |
| 312 | // their log order. We use a heap to incrementally sort the log records |
| 313 | // as per their position in the log. |
| 314 | // |
| 315 | // Init the min heap, one entry per device in the diff. |
| 316 | mh := make(minHeap, 0, len(rSt.diff)) |
| 317 | for dev, r := range rSt.diff { |
| 318 | r.cur = r.min |
| 319 | rec, err := getNextLogRec(ctx, rSt.st, dev, r) |
| 320 | if err != nil { |
| 321 | return err |
| 322 | } |
| 323 | if rec != nil { |
| 324 | mh = append(mh, rec) |
| 325 | } else { |
| 326 | delete(rSt.diff, dev) |
| 327 | } |
| 328 | } |
| 329 | heap.Init(&mh) |
| 330 | |
| 331 | // Process the log records in order. |
| 332 | initPfxs := extractAndSortPrefixes(rSt.req.InitVec) |
| 333 | for mh.Len() > 0 { |
| 334 | rec := heap.Pop(&mh).(*localLogRec) |
| 335 | |
| 336 | if !filterLogRec(rec, rSt.req.InitVec, initPfxs) { |
| 337 | // Send on the wire. |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 338 | wireRec, err := makeWireLogRec(ctx, rSt.st, rec) |
| 339 | if err != nil { |
| 340 | return err |
| 341 | } |
| 342 | sender.Send(interfaces.DeltaRespRec{*wireRec}) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 343 | } |
| 344 | |
| 345 | // Add a new record from the same device if not done. |
| 346 | dev := rec.Metadata.Id |
| 347 | rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev]) |
| 348 | if err != nil { |
| 349 | return err |
| 350 | } |
| 351 | if rec != nil { |
| 352 | heap.Push(&mh, rec) |
| 353 | } else { |
| 354 | delete(rSt.diff, dev) |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | sender.Send(interfaces.DeltaRespRespVec{rSt.outVec}) |
| 359 | return nil |
| 360 | } |
| 361 | |
| 362 | // genRange represents a range of generations (min and max inclusive). |
| 363 | type genRange struct { |
| 364 | min uint64 |
| 365 | max uint64 |
| 366 | cur uint64 |
| 367 | } |
| 368 | |
| 369 | type genRangeVector map[uint64]*genRange |
| 370 | |
| 371 | // diffPrefixGenVectors diffs two generation vectors, belonging to the responder |
| 372 | // and the initiator, and updates the range of generations per device known to |
| 373 | // the responder but not known to the initiator. "gens" (generation range) is |
| 374 | // passed in as an input argument so that it can be incrementally updated as the |
| 375 | // range of missing generations grows when different responder prefix genvectors |
| 376 | // are used to compute the diff. |
| 377 | // |
| 378 | // For example: Generation vector for responder is say RVec = {A:10, B:5, C:1}, |
| 379 | // Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these |
| 380 | // two vectors returns: {A:[6-10], C:[1-1]}. |
| 381 | // |
| 382 | // TODO(hpucha): Add reclaimVec for GCing. |
| 383 | func (rSt *responderState) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector) { |
| 384 | // Compute missing generations for devices that are in both initiator's and responder's vectors. |
| 385 | for devid, gen := range initPVec { |
| 386 | rgen, ok := respPVec[devid] |
| 387 | if ok { |
| 388 | updateDevRange(devid, rgen, gen, rSt.diff) |
| 389 | } |
| 390 | } |
| 391 | |
| 392 | // Compute missing generations for devices not in initiator's vector but in responder's vector. |
| 393 | for devid, rgen := range respPVec { |
| 394 | if _, ok := initPVec[devid]; !ok { |
| 395 | updateDevRange(devid, rgen, 0, rSt.diff) |
| 396 | } |
| 397 | } |
| 398 | } |
| 399 | |
| 400 | func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) { |
| 401 | if gen < rgen { |
| 402 | // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive. |
| 403 | if r, ok := gens[devid]; !ok { |
| 404 | gens[devid] = &genRange{min: gen + 1, max: rgen} |
| 405 | } else { |
| 406 | if gen+1 < r.min { |
| 407 | r.min = gen + 1 |
| 408 | } |
| 409 | if rgen > r.max { |
| 410 | r.max = rgen |
| 411 | } |
| 412 | } |
| 413 | } |
| 414 | } |
| 415 | |
| 416 | func extractAndSortPrefixes(vec interfaces.GenVector) []string { |
| 417 | pfxs := make([]string, len(vec)) |
| 418 | i := 0 |
| 419 | for p := range vec { |
| 420 | pfxs[i] = p |
| 421 | i++ |
| 422 | } |
| 423 | sort.Strings(pfxs) |
| 424 | return pfxs |
| 425 | } |
| 426 | |
| 427 | // TODO(hpucha): This can be optimized using a scan instead of "gets" in a for |
| 428 | // loop. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 429 | func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) { |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 430 | for i := r.cur; i <= r.max; i++ { |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 431 | rec, err := getLogRec(ctx, st, dev, i) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 432 | if err == nil { |
| 433 | r.cur = i + 1 |
| 434 | return rec, nil |
| 435 | } |
| 436 | if verror.ErrorID(err) != verror.ErrNoExist.ID { |
| 437 | return nil, err |
| 438 | } |
| 439 | } |
| 440 | return nil, nil |
| 441 | } |
| 442 | |
| 443 | // Note: initPfxs is sorted. |
| 444 | func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool { |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 445 | // The key starts with one of the store's reserved prefixes for managed |
| 446 | // namespaces (e.g. $row, $perms). Remove that prefix before comparing |
| 447 | // it with the SyncGroup prefixes which are defined by the application. |
Himabindu Pucha | 665f14c | 2015-08-13 13:42:01 -0700 | [diff] [blame] | 448 | key := extractAppKey(rec.Metadata.ObjId) |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 449 | |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 450 | filter := true |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 451 | var maxGen uint64 |
| 452 | for _, p := range initPfxs { |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 453 | if strings.HasPrefix(key, p) { |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 454 | // Do not filter. Initiator is interested in this |
| 455 | // prefix. |
| 456 | filter = false |
| 457 | |
| 458 | // Track if the initiator knows of this record. |
| 459 | gen := initVec[p][rec.Metadata.Id] |
| 460 | if maxGen < gen { |
| 461 | maxGen = gen |
| 462 | } |
| 463 | } |
| 464 | } |
| 465 | |
| 466 | // Filter this record if the initiator already has it. |
| 467 | if maxGen >= rec.Metadata.Gen { |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 468 | filter = true |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 469 | } |
| 470 | |
| 471 | return filter |
| 472 | } |
| 473 | |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 474 | // makeWireLogRec creates a sync log record to send on the wire from a given |
| 475 | // local sync record. |
| 476 | func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 477 | // Get the object value at the required version. |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 478 | key, version := rec.Metadata.ObjId, rec.Metadata.CurVers |
Himabindu Pucha | feb9936 | 2015-07-24 13:29:04 -0700 | [diff] [blame] | 479 | var value []byte |
| 480 | if !rec.Metadata.Delete { |
| 481 | var err error |
| 482 | value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version)) |
| 483 | if err != nil { |
| 484 | return nil, err |
| 485 | } |
Raja Daoud | 7cb7179 | 2015-07-08 12:00:33 -0700 | [diff] [blame] | 486 | } |
| 487 | |
| 488 | wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value} |
| 489 | return wireRec, nil |
| 490 | } |
| 491 | |
Himabindu Pucha | d964ef0 | 2015-06-30 01:10:47 -0700 | [diff] [blame] | 492 | // A minHeap implements heap.Interface and holds local log records. |
| 493 | type minHeap []*localLogRec |
| 494 | |
| 495 | func (mh minHeap) Len() int { return len(mh) } |
| 496 | |
| 497 | func (mh minHeap) Less(i, j int) bool { |
| 498 | return mh[i].Pos < mh[j].Pos |
| 499 | } |
| 500 | |
| 501 | func (mh minHeap) Swap(i, j int) { |
| 502 | mh[i], mh[j] = mh[j], mh[i] |
| 503 | } |
| 504 | |
| 505 | func (mh *minHeap) Push(x interface{}) { |
| 506 | item := x.(*localLogRec) |
| 507 | *mh = append(*mh, item) |
| 508 | } |
| 509 | |
| 510 | func (mh *minHeap) Pop() interface{} { |
| 511 | old := *mh |
| 512 | n := len(old) |
| 513 | item := old[n-1] |
| 514 | *mh = old[0 : n-1] |
| 515 | return item |
| 516 | } |