blob: 875e4a49a8239869a428db41c4f7c63871b3fd8b [file] [log] [blame]
Himabindu Puchafb26a832015-05-20 15:37:50 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
Raja Daoudcb50b5d2015-06-26 18:37:24 -07007// Syncbase Watcher is a goroutine that listens to local Database updates from
8// applications and modifies sync metadata (e.g. DAG and local log records).
9// The coupling between Syncbase storage and sync is loose, via asynchronous
10// listening by the Watcher, to unblock the application operations as soon as
11// possible, and offload the sync metadata update to the Watcher. When the
12// application mutates objects in a Database, additional entries are written
13// to a log queue, persisted in the same Database. This queue is read by the
14// sync Watcher to learn of the changes.
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070015
Raja Daoudcb50b5d2015-06-26 18:37:24 -070016import (
17 "strings"
18 "time"
19
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070020 "v.io/v23/context"
Sergey Rogulenko40402b52015-08-10 15:09:48 -070021 "v.io/v23/services/watch"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070022 "v.io/v23/verror"
Raja Daoudcb50b5d2015-06-26 18:37:24 -070023 "v.io/x/lib/vlog"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070024 "v.io/x/ref/services/syncbase/server/interfaces"
25 "v.io/x/ref/services/syncbase/server/util"
26 "v.io/x/ref/services/syncbase/server/watchable"
27 "v.io/x/ref/services/syncbase/store"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070028)
29
Raja Daoudcb50b5d2015-06-26 18:37:24 -070030var (
Adam Sadovskyebc183a2015-10-08 22:26:47 -070031 // watchPrefixes is an in-memory cache of syncgroup prefixes for each
32 // app database. It is filled at startup from persisted syncgroup data
33 // and updated at runtime when syncgroups are joined or left. It is
Raja Daoudcb50b5d2015-06-26 18:37:24 -070034 // not guarded by a mutex because only the watcher goroutine uses it
35 // beyond the startup phase (before any sync goroutines are started).
36 // The map keys are the appdb names (globally unique).
37 watchPrefixes = make(map[string]sgPrefixes)
38)
39
Adam Sadovskyebc183a2015-10-08 22:26:47 -070040// sgPrefixes tracks syncgroup prefixes being synced in a database and their
Raja Daoud30225f32015-11-18 12:59:12 -080041// syncgroups.
42type sgPrefixes map[string]sgSet
Raja Daoudcb50b5d2015-06-26 18:37:24 -070043
44// watchStore processes updates obtained by watching the store. This is the
45// sync watcher goroutine that learns about store updates asynchronously by
46// reading log records that track object mutation histories in each database.
47// For each batch mutation, the watcher updates the sync DAG and log records.
48// When an application makes a single non-transactional put, it is represented
49// as a batch of one log record. Thus the watcher only deals with batches.
50func (s *syncService) watchStore(ctx *context.T) {
Himabindu Puchafb26a832015-05-20 15:37:50 -070051 defer s.pending.Done()
Raja Daoudcb50b5d2015-06-26 18:37:24 -070052
53 ticker := time.NewTicker(watchPollInterval)
54 defer ticker.Stop()
55
56 ctx, cancel := context.WithCancel(ctx)
57 defer cancel()
58
Raja Daoudc0a50762015-10-15 18:19:01 -070059 for !s.Closed() {
Raja Daoudcb50b5d2015-06-26 18:37:24 -070060 select {
Raja Daoudcb50b5d2015-06-26 18:37:24 -070061 case <-ticker.C:
Raja Daoudc0a50762015-10-15 18:19:01 -070062 if s.Closed() {
63 break
64 }
65 s.processStoreUpdates(ctx)
Himabindu Pucha7cee1812015-10-06 22:21:55 -070066
Himabindu Pucha7cee1812015-10-06 22:21:55 -070067 case <-s.closed:
Raja Daoudc0a50762015-10-15 18:19:01 -070068 break
Himabindu Pucha7cee1812015-10-06 22:21:55 -070069 }
Raja Daoudf5104582015-07-31 17:16:21 -070070 }
Raja Daoudc0a50762015-10-15 18:19:01 -070071
72 vlog.VI(1).Info("sync: watchStore: channel closed, stop watching and exit")
Raja Daoudf5104582015-07-31 17:16:21 -070073}
74
75// processStoreUpdates fetches updates from all databases and processes them.
76// To maintain fairness among databases, it processes one batch update from
77// each database, in a round-robin manner, until there are no further updates
78// from any database.
79func (s *syncService) processStoreUpdates(ctx *context.T) {
80 for {
81 total, active := 0, 0
82 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
83 if s.processDatabase(ctx, appName, dbName, st) {
84 active++
85 }
86 total++
87 return false
88 })
89
90 vlog.VI(2).Infof("sync: processStoreUpdates: %d/%d databases had updates", active, total)
91 if active == 0 {
92 break
Raja Daoudcb50b5d2015-06-26 18:37:24 -070093 }
94 }
Himabindu Puchafb26a832015-05-20 15:37:50 -070095}
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070096
Raja Daoudcb50b5d2015-06-26 18:37:24 -070097// processDatabase fetches from the given database at most one new batch update
98// (transaction) and processes it. The one-batch limit prevents one database
99// from starving others. A batch is stored as a contiguous set of log records
Raja Daoudf5104582015-07-31 17:16:21 -0700100// ending with one record having the "continued" flag set to false. The call
101// returns true if a new batch update was processed.
102func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) bool {
Himabindu Pucha05358cb2015-07-28 08:49:48 -0700103 s.thLock.Lock()
104 defer s.thLock.Unlock()
105
Raja Daoud4171c9c2015-07-14 20:07:44 -0700106 vlog.VI(2).Infof("sync: processDatabase: begin: %s, %s", appName, dbName)
107 defer vlog.VI(2).Infof("sync: processDatabase: end: %s, %s", appName, dbName)
108
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700109 resMark, err := getResMark(ctx, st)
110 if err != nil {
111 if verror.ErrorID(err) != verror.ErrNoExist.ID {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700112 vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
Raja Daoudf5104582015-07-31 17:16:21 -0700113 return false
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700114 }
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700115 resMark = watchable.MakeResumeMarker(0)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700116 }
117
Raja Daoud7cb71792015-07-08 12:00:33 -0700118 // Initialize Database sync state if needed.
Himabindu Puchaf2796e12015-10-01 19:05:10 -0700119 s.initSyncStateInMem(ctx, appName, dbName, "")
Raja Daoud7cb71792015-07-08 12:00:33 -0700120
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700121 // Get a batch of watch log entries, if any, after this resume marker.
Sergey Rogulenko8bf641c2015-08-14 17:00:09 -0700122 logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700123 if err != nil {
124 vlog.Fatalf("sync: processDatabase: %s, %s: cannot get watch log batch: %v", appName, dbName, verror.DebugString(err))
125 }
126 if logs != nil {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700127 s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
Raja Daoudf5104582015-07-31 17:16:21 -0700128 return true
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700129 }
Raja Daoudf5104582015-07-31 17:16:21 -0700130 return false
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700131}
132
133// processWatchLogBatch parses the given batch of watch log records, updates the
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700134// watchable syncgroup prefixes, uses the prefixes to filter the batch to the
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700135// subset of syncable records, and transactionally applies these updates to the
136// sync metadata (DAG & log records) and updates the watch resume marker.
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700137func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark watch.ResumeMarker) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700138 if len(logs) == 0 {
139 return
140 }
141
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700142 // If the first log entry is a syncgroup prefix operation, then this is
143 // a syncgroup snapshot and not an application batch. In this case,
144 // handle the syncgroup prefix changes by updating the watch prefixes
Raja Daoud0dfdd252015-07-10 20:02:22 -0700145 // and exclude the first entry from the batch. Also inform the batch
146 // processing below to not assign it a batch ID in the DAG.
147 appBatch := true
Raja Daoud30225f32015-11-18 12:59:12 -0800148 sgop := processSyncgroupLogRecord(appName, dbName, logs[0])
149 if sgop != nil {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700150 appBatch = false
151 logs = logs[1:]
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700152 }
153
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700154 // Filter out the log entries for keys not part of any syncgroup.
Raja Daoudd4543072015-06-30 11:15:55 -0700155 // Ignore as well log entries made by sync (echo suppression).
Raja Daoud0dfdd252015-07-10 20:02:22 -0700156 totalCount := uint64(len(logs))
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700157 appdb := appDbName(appName, dbName)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700158
Raja Daoud0dfdd252015-07-10 20:02:22 -0700159 i := 0
160 for _, entry := range logs {
Raja Daoudd4543072015-06-30 11:15:55 -0700161 if !entry.FromSync && syncable(appdb, entry) {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700162 logs[i] = entry
163 i++
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700164 }
165 }
Raja Daoud0dfdd252015-07-10 20:02:22 -0700166 logs = logs[:i]
Raja Daoud4171c9c2015-07-14 20:07:44 -0700167 vlog.VI(3).Infof("sync: processWatchLogBatch: %s, %s: sg snap %t, syncable %d, total %d",
168 appName, dbName, !appBatch, len(logs), totalCount)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700169
170 // Transactional processing of the batch: convert these syncable log
171 // records to a batch of sync log records, filling their parent versions
172 // from the DAG head nodes.
Mike Burrows45573ce2015-11-19 18:02:12 -0800173 batch := make([]*LocalLogRec, len(logs))
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700174 err := store.RunInTransaction(st, func(tx store.Transaction) error {
Raja Daoud30225f32015-11-18 12:59:12 -0800175 i := 0
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700176 for _, entry := range logs {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700177 if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
178 return err
179 } else if rec != nil {
Raja Daoud30225f32015-11-18 12:59:12 -0800180 batch[i] = rec
181 i++
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700182 }
183 }
Raja Daoud30225f32015-11-18 12:59:12 -0800184 batch = batch[:i]
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700185
Raja Daoud0dfdd252015-07-10 20:02:22 -0700186 if err := s.processBatch(ctx, appName, dbName, batch, appBatch, totalCount, tx); err != nil {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700187 return err
188 }
Himabindu Pucha7cee1812015-10-06 22:21:55 -0700189
190 if !appBatch {
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700191 if err := setSyncgroupWatchable(ctx, tx, sgop); err != nil {
Himabindu Pucha7cee1812015-10-06 22:21:55 -0700192 return err
193 }
194 }
195
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700196 return setResMark(ctx, tx, resMark)
197 })
198
199 if err != nil {
200 // TODO(rdaoud): don't crash, quarantine this app database.
Adam Sadovskyad4857e2015-10-26 14:54:45 -0700201 vlog.Fatalf("sync: processWatchLogBatch: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700202 }
Raja Daoud30225f32015-11-18 12:59:12 -0800203
204 // Extract blob refs from batch values and update blob metadata.
205 // TODO(rdaoud): the core of this step, extracting blob refs from the
206 // app data, is idempotent and should be done before the transaction
207 // above, which updated the resume marker. If Syncbase crashes here it
208 // does not re-do this blob ref processing at restart and the metadata
209 // becomes lower quality. Unfortunately, the log conversion must happen
210 // inside the transaction because it accesses DAG information that must
211 // be in the Tx read-set for optimistic locking. The TODO is to split
212 // the conversion into two phases, one non-transactional that happens
213 // first outside the transaction, followed by blob ref processing also
214 // outside the transaction (idempotent), then inside the transaction
215 // patch-up the log records in a 2nd phase.
216 if err = s.processWatchBlobRefs(ctx, appdb, st, batch); err != nil {
217 vlog.Fatalf("sync: processWatchLogBatch:: %s, %s: watcher cannot process blob refs: %v", appName, dbName, err)
218 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700219}
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700220
221// processBatch applies a single batch of changes (object mutations) received
222// from watching a particular Database.
Mike Burrows45573ce2015-11-19 18:02:12 -0800223func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*LocalLogRec, appBatch bool, totalCount uint64, tx store.Transaction) error {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700224 count := uint64(len(batch))
225 if count == 0 {
226 return nil
227 }
228
Raja Daoud0dfdd252015-07-10 20:02:22 -0700229 // If an application batch has more than one mutation, start a batch for it.
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700230 batchId := NoBatchId
Raja Daoud0dfdd252015-07-10 20:02:22 -0700231 if appBatch && totalCount > 1 {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700232 batchId = s.startBatch(ctx, tx, batchId)
233 if batchId == NoBatchId {
234 return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
235 }
236 }
237
Himabindu Puchab41fc142015-09-10 17:10:57 -0700238 gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, "", count)
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700239
Raja Daoud4171c9c2015-07-14 20:07:44 -0700240 vlog.VI(3).Infof("sync: processBatch: %s, %s: len %d, total %d, btid %x, gen %d, pos %d",
241 appName, dbName, count, totalCount, batchId, gen, pos)
242
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700243 for _, rec := range batch {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700244 // Update the log record. Portions of the record Metadata must
245 // already be filled.
246 rec.Metadata.Id = s.id
247 rec.Metadata.Gen = gen
248 rec.Metadata.RecType = interfaces.NodeRec
249
250 rec.Metadata.BatchId = batchId
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700251 rec.Metadata.BatchCount = totalCount
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700252
253 rec.Pos = pos
254
255 gen++
256 pos++
257
258 if err := s.processLocalLogRec(ctx, tx, rec); err != nil {
259 return verror.New(verror.ErrInternal, ctx, err)
260 }
261 }
262
263 // End the batch if any.
264 if batchId != NoBatchId {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700265 if err := s.endBatch(ctx, tx, batchId, totalCount); err != nil {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700266 return err
267 }
268 }
269
270 return nil
271}
272
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700273// processLocalLogRec processes a local log record by adding to the Database and
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700274// suitably updating the DAG metadata.
Mike Burrows45573ce2015-11-19 18:02:12 -0800275func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *LocalLogRec) error {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700276 // Insert the new log record into the log.
Himabindu Puchab41fc142015-09-10 17:10:57 -0700277 if err := putLogRec(ctx, tx, logDataPrefix, rec); err != nil {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700278 return err
279 }
280
Raja Daoud30225f32015-11-18 12:59:12 -0800281 m := &rec.Metadata
Himabindu Puchab41fc142015-09-10 17:10:57 -0700282 logKey := logRecKey(logDataPrefix, m.Id, m.Gen)
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700283
284 // Insert the new log record into dag.
285 if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {
286 return err
287 }
288
289 // Move the head.
290 return moveHead(ctx, tx, m.ObjId, m.CurVers)
291}
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700292
Raja Daoud30225f32015-11-18 12:59:12 -0800293// processWatchBlobRefs extracts blob refs from the data values of the updates
294// received in the watch batch and updates the blob-to-syncgroup metadata.
Mike Burrows45573ce2015-11-19 18:02:12 -0800295func (s *syncService) processWatchBlobRefs(ctx *context.T, appdb string, st store.Store, batch []*LocalLogRec) error {
Raja Daoud30225f32015-11-18 12:59:12 -0800296 if len(batch) == 0 {
297 return nil
298 }
299
300 sgPfxs := watchPrefixes[appdb]
301 if len(sgPfxs) == 0 {
302 return verror.New(verror.ErrInternal, ctx, "processWatchBlobRefs: no sg prefixes in db", appdb)
303 }
304
305 for _, rec := range batch {
306 m := &rec.Metadata
307 if m.Delete {
308 continue
309 }
310
311 buf, err := watchable.GetAtVersion(ctx, st, []byte(m.ObjId), nil, []byte(m.CurVers))
312 if err != nil {
313 return err
314 }
315
316 if err = s.processBlobRefs(ctx, s.name, sgPfxs, m, buf); err != nil {
317 return err
318 }
319 }
320 return nil
321}
322
323// addWatchPrefixSyncgroup adds a syncgroup prefix-to-ID mapping for an app
324// database in the watch prefix cache.
325func addWatchPrefixSyncgroup(appName, dbName, prefix string, gid interfaces.GroupId) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700326 name := appDbName(appName, dbName)
327 if pfxs := watchPrefixes[name]; pfxs != nil {
Raja Daoud30225f32015-11-18 12:59:12 -0800328 if sgs := pfxs[prefix]; sgs != nil {
329 sgs[gid] = struct{}{}
330 } else {
331 pfxs[prefix] = sgSet{gid: struct{}{}}
332 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700333 } else {
Raja Daoud30225f32015-11-18 12:59:12 -0800334 watchPrefixes[name] = sgPrefixes{prefix: sgSet{gid: struct{}{}}}
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700335 }
336}
337
Raja Daoud30225f32015-11-18 12:59:12 -0800338// rmWatchPrefixSyncgroup removes a syncgroup prefix-to-ID mapping for an app
339// database in the watch prefix cache.
340func rmWatchPrefixSyncgroup(appName, dbName, prefix string, gid interfaces.GroupId) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700341 name := appDbName(appName, dbName)
342 if pfxs := watchPrefixes[name]; pfxs != nil {
Raja Daoud30225f32015-11-18 12:59:12 -0800343 if sgs := pfxs[prefix]; sgs != nil {
344 delete(sgs, gid)
345 if len(sgs) == 0 {
346 delete(pfxs, prefix)
347 if len(pfxs) == 0 {
348 delete(watchPrefixes, name)
349 }
350 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700351 }
352 }
353}
354
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700355// setSyncgroupWatchable sets the local watchable state of the syncgroup.
Raja Daoud30225f32015-11-18 12:59:12 -0800356func setSyncgroupWatchable(ctx *context.T, tx store.Transaction, sgop *watchable.SyncgroupOp) error {
Himabindu Pucha7cee1812015-10-06 22:21:55 -0700357 state, err := getSGIdEntry(ctx, tx, sgop.SgId)
358 if err != nil {
359 return err
360 }
361 state.Watched = !sgop.Remove
362
363 return setSGIdEntry(ctx, tx, sgop.SgId, state)
364}
365
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700366// convertLogRecord converts a store log entry to a sync log record. It fills
367// the previous object version (parent) by fetching its current DAG head if it
368// has one. For a delete, it generates a new object version because the store
369// does not version a deletion.
370// TODO(rdaoud): change Syncbase to store and version a deleted object to
371// simplify the store-to-sync interaction. A deleted key would still have a
372// version and its value entry would encode the "deleted" flag, either in the
373// key or probably in a value wrapper that would contain other metadata.
Mike Burrows45573ce2015-11-19 18:02:12 -0800374func convertLogRecord(ctx *context.T, tx store.Transaction, logEnt *watchable.LogEntry) (*LocalLogRec, error) {
375 var rec *LocalLogRec
Raja Daoudd4543072015-06-30 11:15:55 -0700376 timestamp := logEnt.CommitTimestamp
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700377
378 switch op := logEnt.Op.(type) {
Raja Daoud7cb71792015-07-08 12:00:33 -0700379 case watchable.OpGet:
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700380 // TODO(rdaoud): save read-set in sync.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700381
Raja Daoud7cb71792015-07-08 12:00:33 -0700382 case watchable.OpScan:
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700383 // TODO(rdaoud): save scan-set in sync.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700384
Raja Daoud7cb71792015-07-08 12:00:33 -0700385 case watchable.OpPut:
Raja Daoudd4543072015-06-30 11:15:55 -0700386 rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
387
Raja Daoud7cb71792015-07-08 12:00:33 -0700388 case watchable.OpSyncSnapshot:
Raja Daoud0dfdd252015-07-10 20:02:22 -0700389 // Create records for object versions not already in the DAG.
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700390 // Duplicates can appear here in cases of nested syncgroups or
391 // peer syncgroups.
Raja Daoud4171c9c2015-07-14 20:07:44 -0700392 if ok, err := hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)); err != nil {
393 return nil, err
394 } else if !ok {
Raja Daoud0dfdd252015-07-10 20:02:22 -0700395 rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
396 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700397
Raja Daoud7cb71792015-07-08 12:00:33 -0700398 case watchable.OpDelete:
Raja Daoudd4543072015-06-30 11:15:55 -0700399 rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700400
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700401 case watchable.OpSyncgroup:
402 vlog.Errorf("sync: convertLogRecord: watch LogEntry for syncgroup should not be converted: %v", logEnt)
403 return nil, verror.New(verror.ErrInternal, ctx, "cannot convert a watch log OpSyncgroup entry")
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700404
405 default:
Raja Daoud4171c9c2015-07-14 20:07:44 -0700406 vlog.Errorf("sync: convertLogRecord: invalid watch LogEntry: %v", logEnt)
407 return nil, verror.New(verror.ErrInternal, ctx, "cannot convert unknown watch log entry")
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700408 }
Raja Daoudd4543072015-06-30 11:15:55 -0700409
Raja Daoud4171c9c2015-07-14 20:07:44 -0700410 return rec, nil
Raja Daoudd4543072015-06-30 11:15:55 -0700411}
412
413// newLocalLogRec creates a local sync log record given its information: key,
414// version, deletion flag, and timestamp. It retrieves the current DAG head
415// for the key (if one exists) to use as its parent (previous) version.
Mike Burrows45573ce2015-11-19 18:02:12 -0800416func newLocalLogRec(ctx *context.T, tx store.Transaction, key, version []byte, deleted bool, timestamp int64) *LocalLogRec {
417 rec := LocalLogRec{}
Raja Daoudd4543072015-06-30 11:15:55 -0700418 oid := string(key)
Raja Daoud0dfdd252015-07-10 20:02:22 -0700419
Raja Daoudd4543072015-06-30 11:15:55 -0700420 rec.Metadata.ObjId = oid
421 rec.Metadata.CurVers = string(version)
422 rec.Metadata.Delete = deleted
423 if head, err := getHead(ctx, tx, oid); err == nil {
424 rec.Metadata.Parents = []string{head}
425 } else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) {
Raja Daoud4171c9c2015-07-14 20:07:44 -0700426 vlog.Fatalf("sync: newLocalLogRec: cannot getHead to convert log record for %s: %v", oid, err)
Raja Daoudd4543072015-06-30 11:15:55 -0700427 }
428 rec.Metadata.UpdTime = unixNanoToTime(timestamp)
429 return &rec
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700430}
431
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700432// processSyncgroupLogRecord checks if the log entry is a syncgroup update and,
Raja Daoud30225f32015-11-18 12:59:12 -0800433// if it is, updates the watch prefixes for the app database and returns a
434// syncgroup operation. Otherwise it returns nil with no other changes.
435func processSyncgroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) *watchable.SyncgroupOp {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700436 switch op := logEnt.Op.(type) {
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700437 case watchable.OpSyncgroup:
Raja Daoud30225f32015-11-18 12:59:12 -0800438 gid, remove := op.Value.SgId, op.Value.Remove
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700439 for _, prefix := range op.Value.Prefixes {
440 if remove {
Raja Daoud30225f32015-11-18 12:59:12 -0800441 rmWatchPrefixSyncgroup(appName, dbName, prefix, gid)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700442 } else {
Raja Daoud30225f32015-11-18 12:59:12 -0800443 addWatchPrefixSyncgroup(appName, dbName, prefix, gid)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700444 }
445 }
Raja Daoud30225f32015-11-18 12:59:12 -0800446 vlog.VI(3).Infof("sync: processSyncgroupLogRecord: %s, %s: gid %d, remove %t, prefixes: %q",
447 appName, dbName, gid, remove, op.Value.Prefixes)
448 return &op.Value
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700449
450 default:
Raja Daoud30225f32015-11-18 12:59:12 -0800451 return nil
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700452 }
453}
454
455// syncable returns true if the given log entry falls within the scope of a
Adam Sadovskyebc183a2015-10-08 22:26:47 -0700456// syncgroup prefix for the given app database, and thus should be synced.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700457// It is used to pre-filter the batch of log entries before sync processing.
458func syncable(appdb string, logEnt *watchable.LogEntry) bool {
459 var key string
460 switch op := logEnt.Op.(type) {
Raja Daoud7cb71792015-07-08 12:00:33 -0700461 case watchable.OpPut:
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700462 key = string(op.Value.Key)
Raja Daoud7cb71792015-07-08 12:00:33 -0700463 case watchable.OpDelete:
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700464 key = string(op.Value.Key)
Raja Daoud7cb71792015-07-08 12:00:33 -0700465 case watchable.OpSyncSnapshot:
Raja Daoudd4543072015-06-30 11:15:55 -0700466 key = string(op.Value.Key)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700467 default:
468 return false
469 }
470
Raja Daoudd4543072015-06-30 11:15:55 -0700471 // The key starts with one of the store's reserved prefixes for managed
Adam Sadovsky819d4f12015-10-23 09:43:50 -0700472 // namespaces (e.g. "$row", "$perms"). Remove that prefix before comparing it
473 // with the syncgroup prefixes which are defined by the application.
474 key = util.StripFirstKeyPartOrDie(key)
Raja Daoudd4543072015-06-30 11:15:55 -0700475
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700476 for prefix := range watchPrefixes[appdb] {
477 if strings.HasPrefix(key, prefix) {
478 return true
479 }
480 }
481 return false
482}
483
484// resMarkKey returns the key used to access the watcher resume marker.
485func resMarkKey() string {
486 return util.JoinKeyParts(util.SyncPrefix, "w", "rm")
487}
488
489// setResMark stores the watcher resume marker for a database.
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700490func setResMark(ctx *context.T, tx store.Transaction, resMark watch.ResumeMarker) error {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700491 return util.Put(ctx, tx, resMarkKey(), resMark)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700492}
493
494// getResMark retrieves the watcher resume marker for a database.
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700495func getResMark(ctx *context.T, st store.StoreReader) (watch.ResumeMarker, error) {
496 var resMark watch.ResumeMarker
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700497 key := resMarkKey()
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700498 if err := util.Get(ctx, st, key, &resMark); err != nil {
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700499 return nil, err
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700500 }
501 return resMark, nil
502}