blob: c417eca3bdf8790f23430f39a8c0b4ed1fc84648 [file] [log] [blame]
Himabindu Puchad964ef02015-06-30 01:10:47 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
7import (
8 "container/heap"
9 "sort"
10 "strings"
11
Raja Daoudccfd6c12015-08-03 18:46:28 -070012 wire "v.io/syncbase/v23/services/syncbase/nosql"
Himabindu Puchad964ef02015-06-30 01:10:47 -070013 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
Raja Daoud7cb71792015-07-08 12:00:33 -070014 "v.io/syncbase/x/ref/services/syncbase/server/watchable"
Himabindu Puchad964ef02015-06-30 01:10:47 -070015 "v.io/syncbase/x/ref/services/syncbase/store"
16 "v.io/v23/context"
17 "v.io/v23/verror"
Raja Daoud7cb71792015-07-08 12:00:33 -070018 "v.io/x/lib/vlog"
Himabindu Puchad964ef02015-06-30 01:10:47 -070019)
20
21// GetDeltas implements the responder side of the GetDeltas RPC.
Raja Daoudccfd6c12015-08-03 18:46:28 -070022func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
23 vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
24 defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
Raja Daoud4171c9c2015-07-14 20:07:44 -070025
Himabindu Puchad964ef02015-06-30 01:10:47 -070026 recvr := call.RecvStream()
Himabindu Puchad964ef02015-06-30 01:10:47 -070027 for recvr.Advance() {
28 req := recvr.Value()
29 // Ignoring errors since if one Database fails for any reason,
30 // it is fine to continue to the next one. In fact, sometimes
31 // the failure might be genuine. For example, the responder is
32 // no longer part of the requested SyncGroups, or the app/db is
33 // locally deleted, or a permission change has denied access.
Raja Daoudccfd6c12015-08-03 18:46:28 -070034 rSt := newResponderState(ctx, call, s, req, initiator)
Himabindu Puchad964ef02015-06-30 01:10:47 -070035 rSt.sendDeltasPerDatabase(ctx)
36 }
37
38 // TODO(hpucha): Is there a need to call finish or some such?
39 return recvr.Err()
40}
41
42// responderState is state accumulated per Database by the responder during an
43// initiation round.
44type responderState struct {
Raja Daoudccfd6c12015-08-03 18:46:28 -070045 req interfaces.DeltaReq
46 call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
47 initiator string
48 errState error // Captures the error from the first two phases of the responder.
49 sync *syncService
50 st store.Store // Store handle to the Database.
51 diff genRangeVector
52 outVec interfaces.GenVector
Himabindu Puchad964ef02015-06-30 01:10:47 -070053}
54
Raja Daoudccfd6c12015-08-03 18:46:28 -070055func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
56 rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
Himabindu Puchad964ef02015-06-30 01:10:47 -070057 return rSt
58}
59
60// sendDeltasPerDatabase sends to an initiator all the missing generations
61// corresponding to the prefixes requested for this Database, and a genvector
62// summarizing the knowledge transferred from the responder to the
63// initiator. This happens in three phases:
64//
65// In the first phase, the initiator is checked against the SyncGroup ACLs of
66// all the SyncGroups it is requesting, and only those prefixes that belong to
67// allowed SyncGroups are carried forward.
68//
69// In the second phase, for a given set of nested prefixes from the initiator,
70// the shortest prefix in that set is extracted. The initiator's prefix
71// genvector for this shortest prefix represents the lower bound on its
72// knowledge for the entire set of nested prefixes. This prefix genvector
73// (representing the lower bound) is diffed with all the responder prefix
74// genvectors corresponding to same or deeper prefixes compared to the initiator
75// prefix. This diff produces a bound on the missing knowledge. For example, say
76// the initiator is interested in prefixes {foo, foobar}, where each prefix is
77// associated with a prefix genvector. Since the initiator strictly has as much
78// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
79// prefix genvector is chosen as the lower bound for the initiator's
80// knowledge. Similarly, say the responder has knowledge on prefixes {f,
81// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
82// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
83// compute a bound on missing generations (all responder's prefixes that match
84// "foo". Note that since the responder doesn't have a prefix genvector at
85// "foo", its knowledge at "f" is applicable to "foo").
86//
87// Since the second phase outputs an aggressive calculation of missing
88// generations containing more generation entries than strictly needed by the
89// initiator, in the third phase, each missing generation is sent to the
90// initiator only if the initiator is eligible for it and is not aware of
91// it. The generations are sent to the initiator in the same order as the
92// responder learned them so that the initiator can reconstruct the DAG for the
93// objects by learning older nodes first.
94func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error {
Raja Daoud4171c9c2015-07-14 20:07:44 -070095 // TODO(rdaoud): for such vlog.VI() calls where the function name is
96 // embedded, consider using a helper function to auto-fill it instead
97 // (see http://goo.gl/mEa4L0) but only incur that overhead when the
98 // logging level specified is enabled.
99 vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
100 rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec)
101
Himabindu Puchad964ef02015-06-30 01:10:47 -0700102 // Phase 1 of sendDeltas: Authorize the initiator and respond to the
103 // caller only for the SyncGroups that allow access.
104 rSt.authorizeAndFilterSyncGroups(ctx)
105
106 // Phase 2 of sendDeltas: diff contains the bound on the
107 // generations missing from the initiator per device.
108 rSt.computeDeltaBound(ctx)
109
110 // Phase 3 of sendDeltas: Process the diff, filtering out records that
111 // are not needed, and send the remainder on the wire ordered.
112 return rSt.filterAndSendDeltas(ctx)
113}
114
115// authorizeAndFilterSyncGroups authorizes the initiator against the requested
116// SyncGroups and filters the initiator's prefixes to only include those from
117// allowed SyncGroups (phase 1 of sendDeltas).
118func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) {
119 rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName)
120 if rSt.errState != nil {
121 return
122 }
123
124 allowedPfxs := make(map[string]struct{})
125 for sgid := range rSt.req.SgIds {
126 // Check permissions for the SyncGroup.
127 var sg *interfaces.SyncGroup
128 sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid)
129 if rSt.errState != nil {
130 return
131 }
132 rSt.errState = authorize(ctx, rSt.call.Security(), sg)
133 if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID {
134 continue
135 } else if rSt.errState != nil {
136 return
137 }
138
139 for _, p := range sg.Spec.Prefixes {
140 allowedPfxs[p] = struct{}{}
141 }
Raja Daoudccfd6c12015-08-03 18:46:28 -0700142
143 // Add the initiator to the SyncGroup membership if not already
144 // in it. It is a temporary solution until SyncGroup metadata
145 // is synchronized peer to peer.
146 // TODO(rdaoud): remove this when SyncGroups are synced.
147 rSt.addInitiatorToSyncGroup(ctx, sgid)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700148 }
149
150 // Filter the initiator's prefixes to what is allowed.
151 for pfx := range rSt.req.InitVec {
152 if _, ok := allowedPfxs[pfx]; ok {
153 continue
154 }
155 allowed := false
156 for p := range allowedPfxs {
157 if strings.HasPrefix(pfx, p) {
158 allowed = true
159 }
160 }
161
162 if !allowed {
163 delete(rSt.req.InitVec, pfx)
164 }
165 }
166 return
167}
168
Raja Daoudccfd6c12015-08-03 18:46:28 -0700169// addInitiatorToSyncGroup adds the request initiator to the membership of the
170// given SyncGroup if the initiator is not already a member. It is a temporary
171// solution until SyncGroup metadata starts being synchronized, at which time
172// peers will learn of new members through mutations of the SyncGroup metadata
173// by the SyncGroup administrators.
174// Note: the joiner metadata is fake because the responder does not have it.
175func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
176 if rSt.initiator == "" {
177 return
178 }
179
Sergey Rogulenkocf495552015-08-04 18:00:21 -0700180 err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
Raja Daoudccfd6c12015-08-03 18:46:28 -0700181 sg, err := getSyncGroupById(ctx, tx, gid)
182 if err != nil {
183 return err
184 }
185
186 // If the initiator is already a member of the SyncGroup abort
187 // the transaction with a special error code.
188 if _, ok := sg.Joiners[rSt.initiator]; ok {
189 return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
190 }
191
192 vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
193 sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
194 return setSGDataEntry(ctx, tx, gid, sg)
195 })
196
197 if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
198 vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
199 }
200}
201
Himabindu Puchad964ef02015-06-30 01:10:47 -0700202// computeDeltaBound computes the bound on missing generations across all
203// requested prefixes (phase 2 of sendDeltas).
204func (rSt *responderState) computeDeltaBound(ctx *context.T) {
205 // Check error from phase 1.
206 if rSt.errState != nil {
207 return
208 }
209
210 if len(rSt.req.InitVec) == 0 {
211 rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
212 return
213 }
214
215 var respVec interfaces.GenVector
216 var respGen uint64
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700217 respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700218 if rSt.errState != nil {
219 return
220 }
221 respPfxs := extractAndSortPrefixes(respVec)
222 initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
223
224 rSt.outVec = make(interfaces.GenVector)
225 rSt.diff = make(genRangeVector)
226 pfx := initPfxs[0]
227
228 for _, p := range initPfxs {
229 if strings.HasPrefix(p, pfx) && p != pfx {
230 continue
231 }
232
233 // Process this prefix as this is the start of a new set of
234 // nested prefixes.
235 pfx = p
236
237 // Lower bound on initiator's knowledge for this prefix set.
238 initpgv := rSt.req.InitVec[pfx]
239
240 // Find the relevant responder prefixes and add the corresponding knowledge.
241 var respgv interfaces.PrefixGenVector
242 var rpStart string
243 for _, rp := range respPfxs {
244 if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
245 // No relationship with pfx.
246 continue
247 }
248
249 if strings.HasPrefix(pfx, rp) {
250 // If rp is a prefix of pfx, remember it because
251 // it may be a potential starting point for the
252 // responder's knowledge. The actual starting
253 // point is the deepest prefix where rp is a
254 // prefix of pfx.
255 //
256 // Say the initiator is looking for "foo", and
257 // the responder has knowledge for "f" and "fo",
258 // the responder's starting point will be the
259 // prefix genvector for "fo". Similarly, if the
260 // responder has knowledge for "foo", the
261 // starting point will be the prefix genvector
262 // for "foo".
263 rpStart = rp
264 } else {
265 // If pfx is a prefix of rp, this knowledge must
266 // be definitely sent to the initiator. Diff the
267 // prefix genvectors to adjust the delta bound and
268 // include in outVec.
269 respgv = respVec[rp]
270 rSt.diffPrefixGenVectors(respgv, initpgv)
271 rSt.outVec[rp] = respgv
272 }
273 }
274
275 // Deal with the starting point.
276 if rpStart == "" {
277 // No matching prefixes for pfx were found.
278 respgv = make(interfaces.PrefixGenVector)
279 respgv[rSt.sync.id] = respGen
280 } else {
281 respgv = respVec[rpStart]
282 }
283 rSt.diffPrefixGenVectors(respgv, initpgv)
284 rSt.outVec[pfx] = respgv
285 }
286
Raja Daoud4171c9c2015-07-14 20:07:44 -0700287 vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
288 rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700289 return
290}
291
292// filterAndSendDeltas filters the computed delta to remove records already
293// known by the initiator, and sends the resulting records to the initiator
294// (phase 3 of sendDeltas).
295func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error {
296 // Always send a start and finish response so that the initiator can
297 // move on to the next Database.
298 //
299 // TODO(hpucha): Although ok for now to call SendStream once per
300 // Database, would like to make this implementation agnostic.
301 sender := rSt.call.SendStream()
302 sender.Send(interfaces.DeltaRespStart{true})
303 defer sender.Send(interfaces.DeltaRespFinish{true})
304
305 // Check error from phase 2.
306 if rSt.errState != nil {
307 return rSt.errState
308 }
309
310 // First two phases were successful. So now on to phase 3. We now visit
311 // every log record in the generation range as obtained from phase 1 in
312 // their log order. We use a heap to incrementally sort the log records
313 // as per their position in the log.
314 //
315 // Init the min heap, one entry per device in the diff.
316 mh := make(minHeap, 0, len(rSt.diff))
317 for dev, r := range rSt.diff {
318 r.cur = r.min
319 rec, err := getNextLogRec(ctx, rSt.st, dev, r)
320 if err != nil {
321 return err
322 }
323 if rec != nil {
324 mh = append(mh, rec)
325 } else {
326 delete(rSt.diff, dev)
327 }
328 }
329 heap.Init(&mh)
330
331 // Process the log records in order.
332 initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
333 for mh.Len() > 0 {
334 rec := heap.Pop(&mh).(*localLogRec)
335
336 if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
337 // Send on the wire.
Raja Daoud7cb71792015-07-08 12:00:33 -0700338 wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
339 if err != nil {
340 return err
341 }
342 sender.Send(interfaces.DeltaRespRec{*wireRec})
Himabindu Puchad964ef02015-06-30 01:10:47 -0700343 }
344
345 // Add a new record from the same device if not done.
346 dev := rec.Metadata.Id
347 rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev])
348 if err != nil {
349 return err
350 }
351 if rec != nil {
352 heap.Push(&mh, rec)
353 } else {
354 delete(rSt.diff, dev)
355 }
356 }
357
358 sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
359 return nil
360}
361
362// genRange represents a range of generations (min and max inclusive).
363type genRange struct {
364 min uint64
365 max uint64
366 cur uint64
367}
368
369type genRangeVector map[uint64]*genRange
370
371// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
372// and the initiator, and updates the range of generations per device known to
373// the responder but not known to the initiator. "gens" (generation range) is
374// passed in as an input argument so that it can be incrementally updated as the
375// range of missing generations grows when different responder prefix genvectors
376// are used to compute the diff.
377//
378// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
379// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
380// two vectors returns: {A:[6-10], C:[1-1]}.
381//
382// TODO(hpucha): Add reclaimVec for GCing.
383func (rSt *responderState) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector) {
384 // Compute missing generations for devices that are in both initiator's and responder's vectors.
385 for devid, gen := range initPVec {
386 rgen, ok := respPVec[devid]
387 if ok {
388 updateDevRange(devid, rgen, gen, rSt.diff)
389 }
390 }
391
392 // Compute missing generations for devices not in initiator's vector but in responder's vector.
393 for devid, rgen := range respPVec {
394 if _, ok := initPVec[devid]; !ok {
395 updateDevRange(devid, rgen, 0, rSt.diff)
396 }
397 }
398}
399
400func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
401 if gen < rgen {
402 // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
403 if r, ok := gens[devid]; !ok {
404 gens[devid] = &genRange{min: gen + 1, max: rgen}
405 } else {
406 if gen+1 < r.min {
407 r.min = gen + 1
408 }
409 if rgen > r.max {
410 r.max = rgen
411 }
412 }
413 }
414}
415
416func extractAndSortPrefixes(vec interfaces.GenVector) []string {
417 pfxs := make([]string, len(vec))
418 i := 0
419 for p := range vec {
420 pfxs[i] = p
421 i++
422 }
423 sort.Strings(pfxs)
424 return pfxs
425}
426
427// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
428// loop.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700429func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
Himabindu Puchad964ef02015-06-30 01:10:47 -0700430 for i := r.cur; i <= r.max; i++ {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700431 rec, err := getLogRec(ctx, st, dev, i)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700432 if err == nil {
433 r.cur = i + 1
434 return rec, nil
435 }
436 if verror.ErrorID(err) != verror.ErrNoExist.ID {
437 return nil, err
438 }
439 }
440 return nil, nil
441}
442
443// Note: initPfxs is sorted.
444func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
Raja Daoud7cb71792015-07-08 12:00:33 -0700445 // The key starts with one of the store's reserved prefixes for managed
446 // namespaces (e.g. $row, $perms). Remove that prefix before comparing
447 // it with the SyncGroup prefixes which are defined by the application.
Himabindu Pucha665f14c2015-08-13 13:42:01 -0700448 key := extractAppKey(rec.Metadata.ObjId)
Himabindu Puchad964ef02015-06-30 01:10:47 -0700449
Raja Daoud7cb71792015-07-08 12:00:33 -0700450 filter := true
Himabindu Puchad964ef02015-06-30 01:10:47 -0700451 var maxGen uint64
452 for _, p := range initPfxs {
Raja Daoud7cb71792015-07-08 12:00:33 -0700453 if strings.HasPrefix(key, p) {
Himabindu Puchad964ef02015-06-30 01:10:47 -0700454 // Do not filter. Initiator is interested in this
455 // prefix.
456 filter = false
457
458 // Track if the initiator knows of this record.
459 gen := initVec[p][rec.Metadata.Id]
460 if maxGen < gen {
461 maxGen = gen
462 }
463 }
464 }
465
466 // Filter this record if the initiator already has it.
467 if maxGen >= rec.Metadata.Gen {
Raja Daoud7cb71792015-07-08 12:00:33 -0700468 filter = true
Himabindu Puchad964ef02015-06-30 01:10:47 -0700469 }
470
471 return filter
472}
473
Raja Daoud7cb71792015-07-08 12:00:33 -0700474// makeWireLogRec creates a sync log record to send on the wire from a given
475// local sync record.
476func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700477 // Get the object value at the required version.
Raja Daoud7cb71792015-07-08 12:00:33 -0700478 key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
Himabindu Puchafeb99362015-07-24 13:29:04 -0700479 var value []byte
480 if !rec.Metadata.Delete {
481 var err error
482 value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
483 if err != nil {
484 return nil, err
485 }
Raja Daoud7cb71792015-07-08 12:00:33 -0700486 }
487
488 wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}
489 return wireRec, nil
490}
491
Himabindu Puchad964ef02015-06-30 01:10:47 -0700492// A minHeap implements heap.Interface and holds local log records.
493type minHeap []*localLogRec
494
495func (mh minHeap) Len() int { return len(mh) }
496
497func (mh minHeap) Less(i, j int) bool {
498 return mh[i].Pos < mh[j].Pos
499}
500
501func (mh minHeap) Swap(i, j int) {
502 mh[i], mh[j] = mh[j], mh[i]
503}
504
505func (mh *minHeap) Push(x interface{}) {
506 item := x.(*localLogRec)
507 *mh = append(*mh, item)
508}
509
510func (mh *minHeap) Pop() interface{} {
511 old := *mh
512 n := len(old)
513 item := old[n-1]
514 *mh = old[0 : n-1]
515 return item
516}