blob: 804907cae3c8f261203a62c9359c67beb0077c32 [file] [log] [blame]
Himabindu Puchafb26a832015-05-20 15:37:50 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package vsync
6
Raja Daoudcb50b5d2015-06-26 18:37:24 -07007// Syncbase Watcher is a goroutine that listens to local Database updates from
8// applications and modifies sync metadata (e.g. DAG and local log records).
9// The coupling between Syncbase storage and sync is loose, via asynchronous
10// listening by the Watcher, to unblock the application operations as soon as
11// possible, and offload the sync metadata update to the Watcher. When the
12// application mutates objects in a Database, additional entries are written
13// to a log queue, persisted in the same Database. This queue is read by the
14// sync Watcher to learn of the changes.
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070015
Raja Daoudcb50b5d2015-06-26 18:37:24 -070016import (
17 "strings"
18 "time"
19
20 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
21 "v.io/syncbase/x/ref/services/syncbase/server/util"
22 "v.io/syncbase/x/ref/services/syncbase/server/watchable"
23 "v.io/syncbase/x/ref/services/syncbase/store"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070024 "v.io/v23/context"
25 "v.io/v23/verror"
Raja Daoudcb50b5d2015-06-26 18:37:24 -070026 "v.io/v23/vom"
27 "v.io/x/lib/vlog"
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070028)
29
Raja Daoudcb50b5d2015-06-26 18:37:24 -070030var (
31 // watchPollInterval is the duration between consecutive watch polling
32 // events across all app databases. Every watch event loops across all
33 // app databases and fetches from each one at most one batch update
34 // (transaction) to process.
35 // TODO(rdaoud): add a channel between store and watch to get change
36 // notifications instead of using a polling solution.
37 watchPollInterval = 100 * time.Millisecond
Himabindu Puchafb26a832015-05-20 15:37:50 -070038
Raja Daoudcb50b5d2015-06-26 18:37:24 -070039 // watchPrefixes is an in-memory cache of SyncGroup prefixes for each
40 // app database. It is filled at startup from persisted SyncGroup data
41 // and updated at runtime when SyncGroups are joined or left. It is
42 // not guarded by a mutex because only the watcher goroutine uses it
43 // beyond the startup phase (before any sync goroutines are started).
44 // The map keys are the appdb names (globally unique).
45 watchPrefixes = make(map[string]sgPrefixes)
46)
47
48// sgPrefixes tracks SyncGroup prefixes being synced in a database and their
49// counts.
50type sgPrefixes map[string]uint32
51
52// watchStore processes updates obtained by watching the store. This is the
53// sync watcher goroutine that learns about store updates asynchronously by
54// reading log records that track object mutation histories in each database.
55// For each batch mutation, the watcher updates the sync DAG and log records.
56// When an application makes a single non-transactional put, it is represented
57// as a batch of one log record. Thus the watcher only deals with batches.
58func (s *syncService) watchStore(ctx *context.T) {
Himabindu Puchafb26a832015-05-20 15:37:50 -070059 defer s.pending.Done()
Raja Daoudcb50b5d2015-06-26 18:37:24 -070060
61 ticker := time.NewTicker(watchPollInterval)
62 defer ticker.Stop()
63
64 ctx, cancel := context.WithCancel(ctx)
65 defer cancel()
66
67 for {
68 select {
69 case <-s.closed:
70 vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit")
71 return
72
73 case <-ticker.C:
74 s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
75 s.processDatabase(ctx, appName, dbName, st)
76 return false
77 })
78 }
79 }
Himabindu Puchafb26a832015-05-20 15:37:50 -070080}
Himabindu Pucha2752a7e2015-06-12 14:07:07 -070081
Raja Daoudcb50b5d2015-06-26 18:37:24 -070082// processDatabase fetches from the given database at most one new batch update
83// (transaction) and processes it. The one-batch limit prevents one database
84// from starving others. A batch is stored as a contiguous set of log records
85// ending with one record having the "continued" flag set to false.
86func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
87 resMark, err := getResMark(ctx, st)
88 if err != nil {
89 if verror.ErrorID(err) != verror.ErrNoExist.ID {
90 vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
91 return
92 }
93 resMark = ""
94 }
95
96 // Get a batch of watch log entries, if any, after this resume marker.
97 if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
98 s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
99 }
100}
101
102// processWatchLogBatch parses the given batch of watch log records, updates the
103// watchable SyncGroup prefixes, uses the prefixes to filter the batch to the
104// subset of syncable records, and transactionally applies these updates to the
105// sync metadata (DAG & log records) and updates the watch resume marker.
106func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) {
107 if len(logs) == 0 {
108 return
109 }
110
111 // First handle SyncGroup prefix changes within the batch by updating
112 // the watch prefixes. It is as if these log entries were at the start
113 // of the batch. Exclude them from the actual data batch.
114 dataLogs := make([]*watchable.LogEntry, 0, len(logs))
115 for _, entry := range logs {
116 if processSyncGroupLogRecord(appName, dbName, entry) == false {
117 dataLogs = append(dataLogs, entry)
118 }
119 }
120
121 // Filter out the log entries for keys not part of any SyncGroup.
Raja Daoudd4543072015-06-30 11:15:55 -0700122 // Ignore as well log entries made by sync (echo suppression).
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700123 totalCount := uint64(len(dataLogs))
124 appdb := appDbName(appName, dbName)
125 logs = make([]*watchable.LogEntry, 0, len(dataLogs))
126
127 for _, entry := range dataLogs {
Raja Daoudd4543072015-06-30 11:15:55 -0700128 if !entry.FromSync && syncable(appdb, entry) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700129 logs = append(logs, entry)
130 }
131 }
132 dataLogs = nil
133
134 // Transactional processing of the batch: convert these syncable log
135 // records to a batch of sync log records, filling their parent versions
136 // from the DAG head nodes.
137 err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
138 batch := make([]*localLogRec, 0, len(logs))
139 for _, entry := range logs {
140 if rec := convertLogRecord(ctx, tx, entry); rec != nil {
141 batch = append(batch, rec)
142 }
143 }
144
145 if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil {
146 return err
147 }
148 return setResMark(ctx, tx, resMark)
149 })
150
151 if err != nil {
152 // TODO(rdaoud): don't crash, quarantine this app database.
153 vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
154 }
155}
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700156
157// processBatch applies a single batch of changes (object mutations) received
158// from watching a particular Database.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700159func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700160 _ = tx.(store.Transaction)
161
162 count := uint64(len(batch))
163 if count == 0 {
164 return nil
165 }
166
167 // If the batch has more than one mutation, start a batch for it.
168 batchId := NoBatchId
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700169 if totalCount > 1 {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700170 batchId = s.startBatch(ctx, tx, batchId)
171 if batchId == NoBatchId {
172 return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
173 }
174 }
175
176 gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
177
178 for _, rec := range batch {
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700179 // Update the log record. Portions of the record Metadata must
180 // already be filled.
181 rec.Metadata.Id = s.id
182 rec.Metadata.Gen = gen
183 rec.Metadata.RecType = interfaces.NodeRec
184
185 rec.Metadata.BatchId = batchId
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700186 rec.Metadata.BatchCount = totalCount
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700187
188 rec.Pos = pos
189
190 gen++
191 pos++
192
193 if err := s.processLocalLogRec(ctx, tx, rec); err != nil {
194 return verror.New(verror.ErrInternal, ctx, err)
195 }
196 }
197
198 // End the batch if any.
199 if batchId != NoBatchId {
200 if err := s.endBatch(ctx, tx, batchId, count); err != nil {
201 return err
202 }
203 }
204
205 return nil
206}
207
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700208// processLocalLogRec processes a local log record by adding to the Database and
Himabindu Pucha2752a7e2015-06-12 14:07:07 -0700209// suitably updating the DAG metadata.
210func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
211 // Insert the new log record into the log.
212 if err := putLogRec(ctx, tx, rec); err != nil {
213 return err
214 }
215
216 m := rec.Metadata
217 logKey := logRecKey(m.Id, m.Gen)
218
219 // Insert the new log record into dag.
220 if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {
221 return err
222 }
223
224 // Move the head.
225 return moveHead(ctx, tx, m.ObjId, m.CurVers)
226}
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700227
228// incrWatchPrefix increments (or sets) a SyncGroup prefix for an app database
229// in the watch prefix cache.
230func incrWatchPrefix(appName, dbName, prefix string) {
231 name := appDbName(appName, dbName)
232 if pfxs := watchPrefixes[name]; pfxs != nil {
233 pfxs[prefix]++ // it auto-initializes a non-existent prefix
234 } else {
235 watchPrefixes[name] = sgPrefixes{prefix: 1}
236 }
237}
238
239// decrWatchPrefix decrements (or unsets) a SyncGroup prefix for an app database
240// in the watch prefix cache.
241func decrWatchPrefix(appName, dbName, prefix string) {
242 name := appDbName(appName, dbName)
243 if pfxs := watchPrefixes[name]; pfxs != nil {
244 if pfxs[prefix] > 1 {
245 pfxs[prefix]--
246 } else if len(pfxs) > 1 {
247 delete(pfxs, prefix)
248 } else {
249 delete(watchPrefixes, name)
250 }
251 }
252}
253
254// dbLogScanArgs determines the arguments needed to start a new scan from a
255// given resume marker (last log entry read). An empty resume marker is used
256// to begin the scan from the start of the log.
257func dbLogScanArgs(resMark string) ([]byte, []byte) {
258 start, limit := util.ScanPrefixArgs(util.LogPrefix, "")
259 if resMark != "" {
260 // To start just after the current resume marker, augment it by
261 // appending an extra byte at the end. Use byte value zero to
262 // use the lowest value possible. This works because resume
263 // markers have a fixed length and are sorted lexicographically.
264 // By creationg a fake longer resume marker that falls between
265 // real resume markers, the next scan will start right after
266 // where the previous one stopped without missing data.
267 start = append([]byte(resMark), 0)
268 }
269 return start, limit
270}
271
272// getWatchLogBatch returns a batch of watch log records (a transaction) from
273// the given database and the new resume marker at the end of the batch.
274func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) {
275 scanStart, scanLimit := dbLogScanArgs(resMark)
276 endOfBatch := false
277 var newResmark string
278
279 // Use the store directly to scan these read-only log entries, no need
280 // to create a snapshot since they are never overwritten. Read and
281 // buffer a batch before processing it.
282 var logs []*watchable.LogEntry
283 stream := st.Scan(scanStart, scanLimit)
284 for stream.Advance() {
285 logKey := string(stream.Key(nil))
286 var logEnt watchable.LogEntry
287 if vom.Decode(stream.Value(nil), &logEnt) != nil {
288 vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
289 appName, dbName, logKey, stream.Value(nil))
290 }
291
292 logs = append(logs, &logEnt)
293
294 // Stop if this is the end of the batch.
295 if logEnt.Continued == false {
296 newResmark = logKey
297 endOfBatch = true
298 break
299 }
300 }
301
302 if err := stream.Err(); err != nil {
303 vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
304 return nil, resMark
305 }
306 if !endOfBatch {
307 if len(logs) > 0 {
308 vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries",
309 appName, dbName, len(logs))
310 }
311 return nil, resMark
312 }
313 return logs, newResmark
314}
315
316// convertLogRecord converts a store log entry to a sync log record. It fills
317// the previous object version (parent) by fetching its current DAG head if it
318// has one. For a delete, it generates a new object version because the store
319// does not version a deletion.
320// TODO(rdaoud): change Syncbase to store and version a deleted object to
321// simplify the store-to-sync interaction. A deleted key would still have a
322// version and its value entry would encode the "deleted" flag, either in the
323// key or probably in a value wrapper that would contain other metadata.
324func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
325 _ = tx.(store.Transaction)
Raja Daoudd4543072015-06-30 11:15:55 -0700326 var rec *localLogRec
327 timestamp := logEnt.CommitTimestamp
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700328
329 switch op := logEnt.Op.(type) {
330 case *watchable.OpGet:
331 // TODO(rdaoud): save read-set in sync.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700332
333 case *watchable.OpScan:
334 // TODO(rdaoud): save scan-set in sync.
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700335
336 case *watchable.OpPut:
Raja Daoudd4543072015-06-30 11:15:55 -0700337 rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
338
339 case *watchable.OpSyncSnapshot:
340 rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700341
342 case *watchable.OpDelete:
Raja Daoudd4543072015-06-30 11:15:55 -0700343 rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700344
345 case *watchable.OpSyncGroup:
346 vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700347
348 default:
349 vlog.Errorf("invalid watch LogEntry: %v", logEnt)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700350 }
Raja Daoudd4543072015-06-30 11:15:55 -0700351
352 return rec
353}
354
355// newLocalLogRec creates a local sync log record given its information: key,
356// version, deletion flag, and timestamp. It retrieves the current DAG head
357// for the key (if one exists) to use as its parent (previous) version.
358func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec {
359 _ = tx.(store.Transaction)
360
361 rec := localLogRec{}
362 oid := string(key)
363 rec.Metadata.ObjId = oid
364 rec.Metadata.CurVers = string(version)
365 rec.Metadata.Delete = deleted
366 if head, err := getHead(ctx, tx, oid); err == nil {
367 rec.Metadata.Parents = []string{head}
368 } else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) {
369 vlog.Fatalf("cannot getHead to convert log record for %s: %v", oid, err)
370 }
371 rec.Metadata.UpdTime = unixNanoToTime(timestamp)
372 return &rec
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700373}
374
375// processSyncGroupLogRecord checks if the log entry is a SyncGroup update and,
376// if it is, updates the watch prefixes for the app database and returns true.
377// Otherwise it returns false with no other changes.
378func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
379 switch op := logEnt.Op.(type) {
380 case *watchable.OpSyncGroup:
381 remove := op.Value.Remove
382 for _, prefix := range op.Value.Prefixes {
383 if remove {
384 decrWatchPrefix(appName, dbName, prefix)
385 } else {
386 incrWatchPrefix(appName, dbName, prefix)
387 }
388 }
389 return true
390
391 default:
392 return false
393 }
394}
395
396// syncable returns true if the given log entry falls within the scope of a
397// SyncGroup prefix for the given app database, and thus should be synced.
398// It is used to pre-filter the batch of log entries before sync processing.
399func syncable(appdb string, logEnt *watchable.LogEntry) bool {
400 var key string
401 switch op := logEnt.Op.(type) {
402 case *watchable.OpPut:
403 key = string(op.Value.Key)
404 case *watchable.OpDelete:
405 key = string(op.Value.Key)
Raja Daoudd4543072015-06-30 11:15:55 -0700406 case *watchable.OpSyncSnapshot:
407 key = string(op.Value.Key)
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700408 default:
409 return false
410 }
411
Raja Daoudd4543072015-06-30 11:15:55 -0700412 // The key starts with one of the store's reserved prefixes for managed
413 // namespaced (e.g. $row or $perm). Remove that prefix before comparing
414 // it with the SyncGroup prefixes which are defined by the application.
415 parts := util.SplitKeyParts(key)
416 if len(parts) < 2 {
417 vlog.Fatalf("syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
418 }
419 key = util.JoinKeyParts(parts[1:]...)
420
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700421 for prefix := range watchPrefixes[appdb] {
422 if strings.HasPrefix(key, prefix) {
423 return true
424 }
425 }
426 return false
427}
428
429// resMarkKey returns the key used to access the watcher resume marker.
430func resMarkKey() string {
431 return util.JoinKeyParts(util.SyncPrefix, "w", "rm")
432}
433
434// setResMark stores the watcher resume marker for a database.
435func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
436 _ = tx.(store.Transaction)
437
438 if err := util.PutObject(tx, resMarkKey(), resMark); err != nil {
439 return verror.New(verror.ErrInternal, ctx, err)
440 }
441 return nil
442}
443
444// getResMark retrieves the watcher resume marker for a database.
445func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
446 var resMark string
447 key := resMarkKey()
448 if err := util.GetObject(st, key, &resMark); err != nil {
449 return NoVersion, translateError(ctx, err, key)
450 }
451 return resMark, nil
452}