Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package vsync |
| 6 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 7 | // Syncbase Watcher is a goroutine that listens to local Database updates from |
| 8 | // applications and modifies sync metadata (e.g. DAG and local log records). |
| 9 | // The coupling between Syncbase storage and sync is loose, via asynchronous |
| 10 | // listening by the Watcher, to unblock the application operations as soon as |
| 11 | // possible, and offload the sync metadata update to the Watcher. When the |
| 12 | // application mutates objects in a Database, additional entries are written |
| 13 | // to a log queue, persisted in the same Database. This queue is read by the |
| 14 | // sync Watcher to learn of the changes. |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 15 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 16 | import ( |
| 17 | "strings" |
| 18 | "time" |
| 19 | |
| 20 | "v.io/syncbase/x/ref/services/syncbase/server/interfaces" |
| 21 | "v.io/syncbase/x/ref/services/syncbase/server/util" |
| 22 | "v.io/syncbase/x/ref/services/syncbase/server/watchable" |
| 23 | "v.io/syncbase/x/ref/services/syncbase/store" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 24 | "v.io/v23/context" |
| 25 | "v.io/v23/verror" |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 26 | "v.io/v23/vom" |
| 27 | "v.io/x/lib/vlog" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 28 | ) |
| 29 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 30 | var ( |
| 31 | // watchPollInterval is the duration between consecutive watch polling |
| 32 | // events across all app databases. Every watch event loops across all |
| 33 | // app databases and fetches from each one at most one batch update |
| 34 | // (transaction) to process. |
| 35 | // TODO(rdaoud): add a channel between store and watch to get change |
| 36 | // notifications instead of using a polling solution. |
| 37 | watchPollInterval = 100 * time.Millisecond |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 38 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 39 | // watchPrefixes is an in-memory cache of SyncGroup prefixes for each |
| 40 | // app database. It is filled at startup from persisted SyncGroup data |
| 41 | // and updated at runtime when SyncGroups are joined or left. It is |
| 42 | // not guarded by a mutex because only the watcher goroutine uses it |
| 43 | // beyond the startup phase (before any sync goroutines are started). |
| 44 | // The map keys are the appdb names (globally unique). |
| 45 | watchPrefixes = make(map[string]sgPrefixes) |
| 46 | ) |
| 47 | |
| 48 | // sgPrefixes tracks SyncGroup prefixes being synced in a database and their |
| 49 | // counts. |
| 50 | type sgPrefixes map[string]uint32 |
| 51 | |
| 52 | // watchStore processes updates obtained by watching the store. This is the |
| 53 | // sync watcher goroutine that learns about store updates asynchronously by |
| 54 | // reading log records that track object mutation histories in each database. |
| 55 | // For each batch mutation, the watcher updates the sync DAG and log records. |
| 56 | // When an application makes a single non-transactional put, it is represented |
| 57 | // as a batch of one log record. Thus the watcher only deals with batches. |
| 58 | func (s *syncService) watchStore(ctx *context.T) { |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 59 | defer s.pending.Done() |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 60 | |
| 61 | ticker := time.NewTicker(watchPollInterval) |
| 62 | defer ticker.Stop() |
| 63 | |
| 64 | ctx, cancel := context.WithCancel(ctx) |
| 65 | defer cancel() |
| 66 | |
| 67 | for { |
| 68 | select { |
| 69 | case <-s.closed: |
| 70 | vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit") |
| 71 | return |
| 72 | |
| 73 | case <-ticker.C: |
| 74 | s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool { |
| 75 | s.processDatabase(ctx, appName, dbName, st) |
| 76 | return false |
| 77 | }) |
| 78 | } |
| 79 | } |
Himabindu Pucha | fb26a83 | 2015-05-20 15:37:50 -0700 | [diff] [blame] | 80 | } |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 81 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 82 | // processDatabase fetches from the given database at most one new batch update |
| 83 | // (transaction) and processes it. The one-batch limit prevents one database |
| 84 | // from starving others. A batch is stored as a contiguous set of log records |
| 85 | // ending with one record having the "continued" flag set to false. |
| 86 | func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) { |
| 87 | resMark, err := getResMark(ctx, st) |
| 88 | if err != nil { |
| 89 | if verror.ErrorID(err) != verror.ErrNoExist.ID { |
| 90 | vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err) |
| 91 | return |
| 92 | } |
| 93 | resMark = "" |
| 94 | } |
| 95 | |
| 96 | // Get a batch of watch log entries, if any, after this resume marker. |
| 97 | if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil { |
| 98 | s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark) |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | // processWatchLogBatch parses the given batch of watch log records, updates the |
| 103 | // watchable SyncGroup prefixes, uses the prefixes to filter the batch to the |
| 104 | // subset of syncable records, and transactionally applies these updates to the |
| 105 | // sync metadata (DAG & log records) and updates the watch resume marker. |
| 106 | func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) { |
| 107 | if len(logs) == 0 { |
| 108 | return |
| 109 | } |
| 110 | |
| 111 | // First handle SyncGroup prefix changes within the batch by updating |
| 112 | // the watch prefixes. It is as if these log entries were at the start |
| 113 | // of the batch. Exclude them from the actual data batch. |
| 114 | dataLogs := make([]*watchable.LogEntry, 0, len(logs)) |
| 115 | for _, entry := range logs { |
| 116 | if processSyncGroupLogRecord(appName, dbName, entry) == false { |
| 117 | dataLogs = append(dataLogs, entry) |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | // Filter out the log entries for keys not part of any SyncGroup. |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 122 | // Ignore as well log entries made by sync (echo suppression). |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 123 | totalCount := uint64(len(dataLogs)) |
| 124 | appdb := appDbName(appName, dbName) |
| 125 | logs = make([]*watchable.LogEntry, 0, len(dataLogs)) |
| 126 | |
| 127 | for _, entry := range dataLogs { |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 128 | if !entry.FromSync && syncable(appdb, entry) { |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 129 | logs = append(logs, entry) |
| 130 | } |
| 131 | } |
| 132 | dataLogs = nil |
| 133 | |
| 134 | // Transactional processing of the batch: convert these syncable log |
| 135 | // records to a batch of sync log records, filling their parent versions |
| 136 | // from the DAG head nodes. |
| 137 | err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error { |
| 138 | batch := make([]*localLogRec, 0, len(logs)) |
| 139 | for _, entry := range logs { |
| 140 | if rec := convertLogRecord(ctx, tx, entry); rec != nil { |
| 141 | batch = append(batch, rec) |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil { |
| 146 | return err |
| 147 | } |
| 148 | return setResMark(ctx, tx, resMark) |
| 149 | }) |
| 150 | |
| 151 | if err != nil { |
| 152 | // TODO(rdaoud): don't crash, quarantine this app database. |
| 153 | vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err) |
| 154 | } |
| 155 | } |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 156 | |
| 157 | // processBatch applies a single batch of changes (object mutations) received |
| 158 | // from watching a particular Database. |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 159 | func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error { |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 160 | _ = tx.(store.Transaction) |
| 161 | |
| 162 | count := uint64(len(batch)) |
| 163 | if count == 0 { |
| 164 | return nil |
| 165 | } |
| 166 | |
| 167 | // If the batch has more than one mutation, start a batch for it. |
| 168 | batchId := NoBatchId |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 169 | if totalCount > 1 { |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 170 | batchId = s.startBatch(ctx, tx, batchId) |
| 171 | if batchId == NoBatchId { |
| 172 | return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID") |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count) |
| 177 | |
| 178 | for _, rec := range batch { |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 179 | // Update the log record. Portions of the record Metadata must |
| 180 | // already be filled. |
| 181 | rec.Metadata.Id = s.id |
| 182 | rec.Metadata.Gen = gen |
| 183 | rec.Metadata.RecType = interfaces.NodeRec |
| 184 | |
| 185 | rec.Metadata.BatchId = batchId |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 186 | rec.Metadata.BatchCount = totalCount |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 187 | |
| 188 | rec.Pos = pos |
| 189 | |
| 190 | gen++ |
| 191 | pos++ |
| 192 | |
| 193 | if err := s.processLocalLogRec(ctx, tx, rec); err != nil { |
| 194 | return verror.New(verror.ErrInternal, ctx, err) |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | // End the batch if any. |
| 199 | if batchId != NoBatchId { |
| 200 | if err := s.endBatch(ctx, tx, batchId, count); err != nil { |
| 201 | return err |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | return nil |
| 206 | } |
| 207 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 208 | // processLocalLogRec processes a local log record by adding to the Database and |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 209 | // suitably updating the DAG metadata. |
| 210 | func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error { |
| 211 | // Insert the new log record into the log. |
| 212 | if err := putLogRec(ctx, tx, rec); err != nil { |
| 213 | return err |
| 214 | } |
| 215 | |
| 216 | m := rec.Metadata |
| 217 | logKey := logRecKey(m.Id, m.Gen) |
| 218 | |
| 219 | // Insert the new log record into dag. |
| 220 | if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil { |
| 221 | return err |
| 222 | } |
| 223 | |
| 224 | // Move the head. |
| 225 | return moveHead(ctx, tx, m.ObjId, m.CurVers) |
| 226 | } |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 227 | |
| 228 | // incrWatchPrefix increments (or sets) a SyncGroup prefix for an app database |
| 229 | // in the watch prefix cache. |
| 230 | func incrWatchPrefix(appName, dbName, prefix string) { |
| 231 | name := appDbName(appName, dbName) |
| 232 | if pfxs := watchPrefixes[name]; pfxs != nil { |
| 233 | pfxs[prefix]++ // it auto-initializes a non-existent prefix |
| 234 | } else { |
| 235 | watchPrefixes[name] = sgPrefixes{prefix: 1} |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | // decrWatchPrefix decrements (or unsets) a SyncGroup prefix for an app database |
| 240 | // in the watch prefix cache. |
| 241 | func decrWatchPrefix(appName, dbName, prefix string) { |
| 242 | name := appDbName(appName, dbName) |
| 243 | if pfxs := watchPrefixes[name]; pfxs != nil { |
| 244 | if pfxs[prefix] > 1 { |
| 245 | pfxs[prefix]-- |
| 246 | } else if len(pfxs) > 1 { |
| 247 | delete(pfxs, prefix) |
| 248 | } else { |
| 249 | delete(watchPrefixes, name) |
| 250 | } |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | // dbLogScanArgs determines the arguments needed to start a new scan from a |
| 255 | // given resume marker (last log entry read). An empty resume marker is used |
| 256 | // to begin the scan from the start of the log. |
| 257 | func dbLogScanArgs(resMark string) ([]byte, []byte) { |
| 258 | start, limit := util.ScanPrefixArgs(util.LogPrefix, "") |
| 259 | if resMark != "" { |
| 260 | // To start just after the current resume marker, augment it by |
| 261 | // appending an extra byte at the end. Use byte value zero to |
| 262 | // use the lowest value possible. This works because resume |
| 263 | // markers have a fixed length and are sorted lexicographically. |
| 264 | // By creationg a fake longer resume marker that falls between |
| 265 | // real resume markers, the next scan will start right after |
| 266 | // where the previous one stopped without missing data. |
| 267 | start = append([]byte(resMark), 0) |
| 268 | } |
| 269 | return start, limit |
| 270 | } |
| 271 | |
| 272 | // getWatchLogBatch returns a batch of watch log records (a transaction) from |
| 273 | // the given database and the new resume marker at the end of the batch. |
| 274 | func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) { |
| 275 | scanStart, scanLimit := dbLogScanArgs(resMark) |
| 276 | endOfBatch := false |
| 277 | var newResmark string |
| 278 | |
| 279 | // Use the store directly to scan these read-only log entries, no need |
| 280 | // to create a snapshot since they are never overwritten. Read and |
| 281 | // buffer a batch before processing it. |
| 282 | var logs []*watchable.LogEntry |
| 283 | stream := st.Scan(scanStart, scanLimit) |
| 284 | for stream.Advance() { |
| 285 | logKey := string(stream.Key(nil)) |
| 286 | var logEnt watchable.LogEntry |
| 287 | if vom.Decode(stream.Value(nil), &logEnt) != nil { |
| 288 | vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v", |
| 289 | appName, dbName, logKey, stream.Value(nil)) |
| 290 | } |
| 291 | |
| 292 | logs = append(logs, &logEnt) |
| 293 | |
| 294 | // Stop if this is the end of the batch. |
| 295 | if logEnt.Continued == false { |
| 296 | newResmark = logKey |
| 297 | endOfBatch = true |
| 298 | break |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | if err := stream.Err(); err != nil { |
| 303 | vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err) |
| 304 | return nil, resMark |
| 305 | } |
| 306 | if !endOfBatch { |
| 307 | if len(logs) > 0 { |
| 308 | vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries", |
| 309 | appName, dbName, len(logs)) |
| 310 | } |
| 311 | return nil, resMark |
| 312 | } |
| 313 | return logs, newResmark |
| 314 | } |
| 315 | |
| 316 | // convertLogRecord converts a store log entry to a sync log record. It fills |
| 317 | // the previous object version (parent) by fetching its current DAG head if it |
| 318 | // has one. For a delete, it generates a new object version because the store |
| 319 | // does not version a deletion. |
| 320 | // TODO(rdaoud): change Syncbase to store and version a deleted object to |
| 321 | // simplify the store-to-sync interaction. A deleted key would still have a |
| 322 | // version and its value entry would encode the "deleted" flag, either in the |
| 323 | // key or probably in a value wrapper that would contain other metadata. |
| 324 | func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec { |
| 325 | _ = tx.(store.Transaction) |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 326 | var rec *localLogRec |
| 327 | timestamp := logEnt.CommitTimestamp |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 328 | |
| 329 | switch op := logEnt.Op.(type) { |
| 330 | case *watchable.OpGet: |
| 331 | // TODO(rdaoud): save read-set in sync. |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 332 | |
| 333 | case *watchable.OpScan: |
| 334 | // TODO(rdaoud): save scan-set in sync. |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 335 | |
| 336 | case *watchable.OpPut: |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 337 | rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp) |
| 338 | |
| 339 | case *watchable.OpSyncSnapshot: |
| 340 | rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 341 | |
| 342 | case *watchable.OpDelete: |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 343 | rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 344 | |
| 345 | case *watchable.OpSyncGroup: |
| 346 | vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 347 | |
| 348 | default: |
| 349 | vlog.Errorf("invalid watch LogEntry: %v", logEnt) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 350 | } |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 351 | |
| 352 | return rec |
| 353 | } |
| 354 | |
| 355 | // newLocalLogRec creates a local sync log record given its information: key, |
| 356 | // version, deletion flag, and timestamp. It retrieves the current DAG head |
| 357 | // for the key (if one exists) to use as its parent (previous) version. |
| 358 | func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec { |
| 359 | _ = tx.(store.Transaction) |
| 360 | |
| 361 | rec := localLogRec{} |
| 362 | oid := string(key) |
| 363 | rec.Metadata.ObjId = oid |
| 364 | rec.Metadata.CurVers = string(version) |
| 365 | rec.Metadata.Delete = deleted |
| 366 | if head, err := getHead(ctx, tx, oid); err == nil { |
| 367 | rec.Metadata.Parents = []string{head} |
| 368 | } else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) { |
| 369 | vlog.Fatalf("cannot getHead to convert log record for %s: %v", oid, err) |
| 370 | } |
| 371 | rec.Metadata.UpdTime = unixNanoToTime(timestamp) |
| 372 | return &rec |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 373 | } |
| 374 | |
| 375 | // processSyncGroupLogRecord checks if the log entry is a SyncGroup update and, |
| 376 | // if it is, updates the watch prefixes for the app database and returns true. |
| 377 | // Otherwise it returns false with no other changes. |
| 378 | func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool { |
| 379 | switch op := logEnt.Op.(type) { |
| 380 | case *watchable.OpSyncGroup: |
| 381 | remove := op.Value.Remove |
| 382 | for _, prefix := range op.Value.Prefixes { |
| 383 | if remove { |
| 384 | decrWatchPrefix(appName, dbName, prefix) |
| 385 | } else { |
| 386 | incrWatchPrefix(appName, dbName, prefix) |
| 387 | } |
| 388 | } |
| 389 | return true |
| 390 | |
| 391 | default: |
| 392 | return false |
| 393 | } |
| 394 | } |
| 395 | |
| 396 | // syncable returns true if the given log entry falls within the scope of a |
| 397 | // SyncGroup prefix for the given app database, and thus should be synced. |
| 398 | // It is used to pre-filter the batch of log entries before sync processing. |
| 399 | func syncable(appdb string, logEnt *watchable.LogEntry) bool { |
| 400 | var key string |
| 401 | switch op := logEnt.Op.(type) { |
| 402 | case *watchable.OpPut: |
| 403 | key = string(op.Value.Key) |
| 404 | case *watchable.OpDelete: |
| 405 | key = string(op.Value.Key) |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 406 | case *watchable.OpSyncSnapshot: |
| 407 | key = string(op.Value.Key) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 408 | default: |
| 409 | return false |
| 410 | } |
| 411 | |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame^] | 412 | // The key starts with one of the store's reserved prefixes for managed |
| 413 | // namespaced (e.g. $row or $perm). Remove that prefix before comparing |
| 414 | // it with the SyncGroup prefixes which are defined by the application. |
| 415 | parts := util.SplitKeyParts(key) |
| 416 | if len(parts) < 2 { |
| 417 | vlog.Fatalf("syncable: %s: invalid entry key %s: %v", appdb, key, logEnt) |
| 418 | } |
| 419 | key = util.JoinKeyParts(parts[1:]...) |
| 420 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 421 | for prefix := range watchPrefixes[appdb] { |
| 422 | if strings.HasPrefix(key, prefix) { |
| 423 | return true |
| 424 | } |
| 425 | } |
| 426 | return false |
| 427 | } |
| 428 | |
| 429 | // resMarkKey returns the key used to access the watcher resume marker. |
| 430 | func resMarkKey() string { |
| 431 | return util.JoinKeyParts(util.SyncPrefix, "w", "rm") |
| 432 | } |
| 433 | |
| 434 | // setResMark stores the watcher resume marker for a database. |
| 435 | func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error { |
| 436 | _ = tx.(store.Transaction) |
| 437 | |
| 438 | if err := util.PutObject(tx, resMarkKey(), resMark); err != nil { |
| 439 | return verror.New(verror.ErrInternal, ctx, err) |
| 440 | } |
| 441 | return nil |
| 442 | } |
| 443 | |
| 444 | // getResMark retrieves the watcher resume marker for a database. |
| 445 | func getResMark(ctx *context.T, st store.StoreReader) (string, error) { |
| 446 | var resMark string |
| 447 | key := resMarkKey() |
| 448 | if err := util.GetObject(st, key, &resMark); err != nil { |
| 449 | return NoVersion, translateError(ctx, err, key) |
| 450 | } |
| 451 | return resMark, nil |
| 452 | } |