Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package vsync |
| 6 | |
| 7 | // New log records are created when objects in the local store are created, |
| 8 | // updated or deleted. Local log records are also replayed to keep the |
| 9 | // per-object dags consistent with the local store state. Sync module assigns |
| 10 | // each log record created within a Database a unique sequence number, called |
| 11 | // the generation number. Locally on each device, the position of each log |
| 12 | // record is also recorded relative to other local and remote log records. |
| 13 | // |
| 14 | // When a device receives a request to send log records, it first computes the |
| 15 | // missing generations between itself and the incoming request on a per-prefix |
| 16 | // basis. It then sends all the log records belonging to the missing generations |
| 17 | // in the order they occur locally (using the local log position). A device that |
| 18 | // receives log records over the network replays all the records received from |
| 19 | // another device in a single batch. Each replayed log record adds a new version |
| 20 | // to the dag of the object contained in the log record. At the end of replaying |
| 21 | // all the log records, conflict detection and resolution is carried out for all |
| 22 | // the objects learned during this iteration. Conflict detection and resolution |
| 23 | // is carried out after a batch of log records are replayed, instead of |
| 24 | // incrementally after each record is replayed, to avoid repeating conflict |
| 25 | // resolution already performed by other devices. |
| 26 | // |
| 27 | // Sync module tracks the current generation number and the current local log |
| 28 | // position for each Database. In addition, it also tracks the current |
| 29 | // generation vector for a Database. Log records are indexed such that they can |
| 30 | // be selectively retrieved from the store for any missing generation from any |
| 31 | // device. |
| 32 | |
| 33 | import ( |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 34 | "container/heap" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 35 | "fmt" |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 36 | "sort" |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 37 | "strconv" |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 38 | "strings" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 39 | |
| 40 | "v.io/syncbase/x/ref/services/syncbase/server/interfaces" |
| 41 | "v.io/syncbase/x/ref/services/syncbase/server/util" |
| 42 | "v.io/syncbase/x/ref/services/syncbase/store" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 43 | "v.io/v23/context" |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 44 | "v.io/v23/rpc" |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 45 | "v.io/v23/verror" |
| 46 | ) |
| 47 | |
| 48 | // dbSyncStateInMem represents the in-memory sync state of a Database. |
| 49 | type dbSyncStateInMem struct { |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 50 | gen uint64 |
| 51 | pos uint64 |
| 52 | |
| 53 | ckPtGen uint64 |
| 54 | genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only. |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 55 | } |
| 56 | |
| 57 | // initSync initializes the sync module during startup. It scans all the |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 58 | // databases across all apps to initialize the following: |
| 59 | // a) in-memory sync state of a Database consisting of the current generation |
| 60 | // number, log position and generation vector. |
| 61 | // b) watcher map of prefixes currently being synced. |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 62 | // |
| 63 | // TODO(hpucha): This is incomplete. Flesh this out further. |
| 64 | func (s *syncService) initSync(ctx *context.T) error { |
| 65 | s.syncStateLock.Lock() |
| 66 | defer s.syncStateLock.Unlock() |
| 67 | |
| 68 | var errFinal error |
| 69 | s.syncState = make(map[string]*dbSyncStateInMem) |
| 70 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 71 | s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool { |
| 72 | // Scan the SyncGroups, skipping those not yet being watched. |
| 73 | forEachSyncGroup(st, func(sg *interfaces.SyncGroup) bool { |
| 74 | // TODO(rdaoud): only use SyncGroups that have been |
| 75 | // marked as "watchable" by the sync watcher thread. |
| 76 | // This is to handle the case of a SyncGroup being |
| 77 | // created but Syncbase restarting before the watcher |
| 78 | // processed the SyncGroupOp entry in the watch queue. |
| 79 | // It should not be syncing that SyncGroup's data after |
| 80 | // restart, but wait until the watcher processes the |
| 81 | // entry as would have happened without a restart. |
| 82 | for _, prefix := range sg.Spec.Prefixes { |
| 83 | incrWatchPrefix(appName, dbName, prefix) |
| 84 | } |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 85 | return false |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 86 | }) |
| 87 | |
| 88 | if false { |
| 89 | // Fetch the sync state. |
| 90 | ds, err := getDbSyncState(ctx, st) |
| 91 | if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID { |
| 92 | errFinal = err |
| 93 | return false |
| 94 | } |
| 95 | var scanStart, scanLimit []byte |
| 96 | // Figure out what to scan among local log records. |
| 97 | if verror.ErrorID(err) == verror.ErrNoExist.ID { |
| 98 | scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "") |
| 99 | } else { |
| 100 | scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "") |
| 101 | } |
| 102 | var maxpos uint64 |
| 103 | var dbName string |
| 104 | // Scan local log records to find the most recent one. |
| 105 | st.Scan(scanStart, scanLimit) |
| 106 | // Scan remote log records using the persisted GenVector. |
| 107 | s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1} |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 108 | } |
| 109 | |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 110 | return false |
| 111 | }) |
| 112 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 113 | return errFinal |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 114 | } |
| 115 | |
| 116 | // reserveGenAndPosInDbLog reserves a chunk of generation numbers and log |
| 117 | // positions in a Database's log. Used when local updates result in log |
| 118 | // entries. |
| 119 | func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) { |
| 120 | return s.reserveGenAndPosInternal(appName, dbName, count, count) |
| 121 | } |
| 122 | |
| 123 | // reservePosInDbLog reserves a chunk of log positions in a Database's log. Used |
| 124 | // when remote log records are received. |
| 125 | func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 { |
| 126 | _, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count) |
| 127 | return pos |
| 128 | } |
| 129 | |
| 130 | func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) { |
| 131 | s.syncStateLock.Lock() |
| 132 | defer s.syncStateLock.Unlock() |
| 133 | |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 134 | name := appDbName(appName, dbName) |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 135 | ds, ok := s.syncState[name] |
| 136 | if !ok { |
| 137 | ds = &dbSyncStateInMem{gen: 1} |
| 138 | s.syncState[name] = ds |
| 139 | } |
| 140 | |
| 141 | gen := ds.gen |
| 142 | pos := ds.pos |
| 143 | |
| 144 | ds.gen += genCount |
| 145 | ds.pos += posCount |
| 146 | |
| 147 | return gen, pos |
| 148 | } |
| 149 | |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 150 | // checkPtLocalGen freezes the local generation number for the responder's use. |
| 151 | func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error { |
| 152 | s.syncStateLock.Lock() |
| 153 | defer s.syncStateLock.Unlock() |
| 154 | |
| 155 | name := appDbName(appName, dbName) |
| 156 | ds, ok := s.syncState[name] |
| 157 | if !ok { |
| 158 | return verror.New(verror.ErrInternal, ctx, "db state not found", name) |
| 159 | } |
| 160 | |
| 161 | ds.ckPtGen = ds.gen |
| 162 | return nil |
| 163 | } |
| 164 | |
| 165 | // getDbSyncStateInMem returns a copy of the current in memory sync state of the Database. |
| 166 | func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) { |
| 167 | s.syncStateLock.Lock() |
| 168 | defer s.syncStateLock.Unlock() |
| 169 | |
| 170 | name := appDbName(appName, dbName) |
| 171 | ds, ok := s.syncState[name] |
| 172 | if !ok { |
| 173 | return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name) |
| 174 | } |
| 175 | |
| 176 | dsCopy := &dbSyncStateInMem{ |
| 177 | gen: ds.gen, |
| 178 | pos: ds.pos, |
| 179 | ckPtGen: ds.ckPtGen, |
| 180 | } |
| 181 | |
| 182 | // Make a copy of the genvec. |
| 183 | dsCopy.genvec = copyGenVec(ds.genvec) |
| 184 | |
| 185 | return dsCopy, nil |
| 186 | } |
| 187 | |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 188 | // getDbGenInfo returns a copy of the current generation information of the Database. |
| 189 | func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) { |
| 190 | s.syncStateLock.Lock() |
| 191 | defer s.syncStateLock.Unlock() |
| 192 | |
| 193 | name := appDbName(appName, dbName) |
| 194 | ds, ok := s.syncState[name] |
| 195 | if !ok { |
| 196 | return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name) |
| 197 | } |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 198 | |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 199 | // Make a copy of the genvec. |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 200 | genvec := copyGenVec(ds.genvec) |
| 201 | |
| 202 | // Add local generation information to the genvec. |
| 203 | for _, gv := range genvec { |
| 204 | gv[s.id] = ds.ckPtGen |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 205 | } |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 206 | |
| 207 | return genvec, ds.ckPtGen, nil |
| 208 | } |
| 209 | |
| 210 | // putDbGenInfoRemote puts the current remote generation information of the Database. |
| 211 | func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error { |
| 212 | s.syncStateLock.Lock() |
| 213 | defer s.syncStateLock.Unlock() |
| 214 | |
| 215 | name := appDbName(appName, dbName) |
| 216 | ds, ok := s.syncState[name] |
| 217 | if !ok { |
| 218 | return verror.New(verror.ErrInternal, ctx, "db state not found", name) |
| 219 | } |
| 220 | |
| 221 | // Make a copy of the genvec. |
| 222 | ds.genvec = copyGenVec(genvec) |
| 223 | |
| 224 | return nil |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 225 | } |
| 226 | |
Raja Daoud | 28d3b60 | 2015-06-17 20:02:20 -0700 | [diff] [blame] | 227 | // appDbName combines the app and db names to return a globally unique name for |
| 228 | // a Database. This relies on the fact that the app name is globally unique and |
| 229 | // the db name is unique within the scope of the app. |
| 230 | func appDbName(appName, dbName string) string { |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 231 | return util.JoinKeyParts(appName, dbName) |
| 232 | } |
| 233 | |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 234 | // splitAppDbName is the inverse of appDbName and returns app and db name from a |
| 235 | // globally unique name for a Database. |
| 236 | func splitAppDbName(ctx *context.T, name string) (string, string, error) { |
| 237 | parts := util.SplitKeyParts(name) |
| 238 | if len(parts) != 2 { |
| 239 | return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name) |
| 240 | } |
| 241 | return parts[0], parts[1], nil |
| 242 | } |
| 243 | |
| 244 | func copyGenVec(in interfaces.GenVector) interfaces.GenVector { |
| 245 | genvec := make(interfaces.GenVector) |
| 246 | for p, inpgv := range in { |
| 247 | pgv := make(interfaces.PrefixGenVector) |
| 248 | for id, gen := range inpgv { |
| 249 | pgv[id] = gen |
| 250 | } |
| 251 | genvec[p] = pgv |
| 252 | } |
| 253 | return genvec |
| 254 | } |
| 255 | |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 256 | //////////////////////////////////////////////////////////// |
| 257 | // Low-level utility functions to access sync state. |
| 258 | |
| 259 | // dbSyncStateKey returns the key used to access the sync state of a Database. |
| 260 | func dbSyncStateKey() string { |
| 261 | return util.JoinKeyParts(util.SyncPrefix, "dbss") |
| 262 | } |
| 263 | |
| 264 | // putDbSyncState persists the sync state object for a given Database. |
| 265 | func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error { |
| 266 | _ = tx.(store.Transaction) |
| 267 | if err := util.PutObject(tx, dbSyncStateKey(), ds); err != nil { |
| 268 | return verror.New(verror.ErrInternal, ctx, err) |
| 269 | } |
| 270 | return nil |
| 271 | } |
| 272 | |
| 273 | // getDbSyncState retrieves the sync state object for a given Database. |
| 274 | func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) { |
| 275 | var ds dbSyncState |
| 276 | if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil { |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 277 | return nil, translateError(ctx, err, dbSyncStateKey()) |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 278 | } |
| 279 | return &ds, nil |
| 280 | } |
| 281 | |
| 282 | //////////////////////////////////////////////////////////// |
| 283 | // Low-level utility functions to access log records. |
| 284 | |
| 285 | // logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device. |
| 286 | func logRecsPerDeviceScanPrefix(id uint64) string { |
| 287 | return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id)) |
| 288 | } |
| 289 | |
| 290 | // logRecKey returns the key used to access a specific log record. |
| 291 | func logRecKey(id, gen uint64) string { |
| 292 | return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen)) |
| 293 | } |
| 294 | |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 295 | // splitLogRecKey is the inverse of logRecKey and returns device id and generation number. |
| 296 | func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) { |
| 297 | parts := util.SplitKeyParts(key) |
| 298 | verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key) |
| 299 | if len(parts) != 4 { |
| 300 | return 0, 0, verr |
| 301 | } |
| 302 | id, err := strconv.ParseUint(parts[2], 10, 64) |
| 303 | if err != nil { |
| 304 | return 0, 0, verr |
| 305 | } |
| 306 | gen, err := strconv.ParseUint(parts[3], 10, 64) |
| 307 | if err != nil { |
| 308 | return 0, 0, verr |
| 309 | } |
| 310 | return id, gen, nil |
| 311 | } |
| 312 | |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 313 | // hasLogRec returns true if the log record for (devid, gen) exists. |
| 314 | func hasLogRec(st store.StoreReader, id, gen uint64) bool { |
| 315 | // TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data. |
| 316 | var rec localLogRec |
| 317 | if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil { |
| 318 | return false |
| 319 | } |
| 320 | return true |
| 321 | } |
| 322 | |
| 323 | // putLogRec stores the log record. |
| 324 | func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error { |
| 325 | _ = tx.(store.Transaction) |
| 326 | if err := util.PutObject(tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec); err != nil { |
| 327 | return verror.New(verror.ErrInternal, ctx, err) |
| 328 | } |
| 329 | return nil |
| 330 | } |
| 331 | |
| 332 | // getLogRec retrieves the log record for a given (devid, gen). |
| 333 | func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) { |
| 334 | var rec localLogRec |
| 335 | if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil { |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 336 | return nil, translateError(ctx, err, logRecKey(id, gen)) |
Himabindu Pucha | 2752a7e | 2015-06-12 14:07:07 -0700 | [diff] [blame] | 337 | } |
| 338 | return &rec, nil |
| 339 | } |
| 340 | |
| 341 | // delLogRec deletes the log record for a given (devid, gen). |
| 342 | func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error { |
| 343 | _ = tx.(store.Transaction) |
| 344 | |
| 345 | if err := tx.Delete([]byte(logRecKey(id, gen))); err != nil { |
| 346 | return verror.New(verror.ErrInternal, ctx, err) |
| 347 | } |
| 348 | return nil |
| 349 | } |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 350 | |
| 351 | //////////////////////////////////////////////////////////// |
| 352 | // Genvector-related utilities. |
| 353 | |
| 354 | // sendDeltasPerDatabase sends to an initiator all the missing generations |
| 355 | // corresponding to the prefixes requested for this Database, and a genvector |
| 356 | // summarizing the knowledge transferred from the responder to the |
| 357 | // initiator. This happens in two phases: |
| 358 | // |
| 359 | // In the first phase, for a given set of nested prefixes from the initiator, |
| 360 | // the shortest prefix in that set is extracted. The initiator's prefix |
| 361 | // genvector for this shortest prefix represents the lower bound on its |
| 362 | // knowledge for the entire set of nested prefixes. This prefix genvector |
| 363 | // (representing the lower bound) is diffed with all the responder prefix |
| 364 | // genvectors corresponding to same or deeper prefixes compared to the initiator |
| 365 | // prefix. This diff produces a bound on the missing knowledge. For example, say |
| 366 | // the initiator is interested in prefixes {foo, foobar}, where each prefix is |
| 367 | // associated with a prefix genvector. Since the initiator strictly has as much |
| 368 | // or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s |
| 369 | // prefix genvector is chosen as the lower bound for the initiator's |
| 370 | // knowledge. Similarly, say the responder has knowledge on prefixes {f, |
| 371 | // foobarX, foobarY, bar}. The responder diffs the prefix genvectors for |
| 372 | // prefixes f, foobarX and foobarY with the initiator's prefix genvector to |
| 373 | // compute a bound on missing generations (all responder's prefixes that match |
| 374 | // "foo". Note that since the responder doesn't have a prefix genvector at |
| 375 | // "foo", its knowledge at "f" is applicable to "foo"). |
| 376 | // |
| 377 | // Since the first phase outputs an aggressive calculation of missing |
| 378 | // generations containing more generation entries than strictly needed by the |
| 379 | // initiator, in the second phase, each missing generation is sent to the |
| 380 | // initiator only if the initiator is eligible for it and is not aware of |
| 381 | // it. The generations are sent to the initiator in the same order as the |
| 382 | // responder learned them so that the initiator can reconstruct the DAG for the |
| 383 | // objects by learning older nodes first. |
| 384 | func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) { |
| 385 | // Phase 1 of sendDeltas. diff contains the bound on the generations |
| 386 | // missing from the initiator per device. |
| 387 | diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec) |
| 388 | if err != nil { |
| 389 | return nil, err |
| 390 | } |
| 391 | |
| 392 | // Phase 2 of sendDeltas: Process the diff, filtering out records that |
| 393 | // are not needed, and send the remainder on the wire ordered. |
| 394 | st, err := s.getDbStore(ctx, call, appName, dbName) |
| 395 | if err != nil { |
| 396 | return nil, err |
| 397 | } |
| 398 | |
| 399 | // We now visit every log record in the generation range as obtained |
| 400 | // from phase 1 in their log order. We use a heap to incrementally sort |
| 401 | // the log records as per their position in the log. |
| 402 | // |
| 403 | // Init the min heap, one entry per device in the diff. |
| 404 | mh := make(minHeap, 0, len(diff)) |
| 405 | for dev, r := range diff { |
| 406 | r.cur = r.min |
| 407 | rec, err := getNextLogRec(ctx, st, dev, r) |
| 408 | if err != nil { |
| 409 | return nil, err |
| 410 | } |
| 411 | if rec != nil { |
| 412 | mh = append(mh, rec) |
| 413 | } else { |
| 414 | delete(diff, dev) |
| 415 | } |
| 416 | } |
| 417 | heap.Init(&mh) |
| 418 | |
| 419 | // Process the log records in order. |
| 420 | initPfxs := extractAndSortPrefixes(initVec) |
| 421 | |
| 422 | for mh.Len() > 0 { |
| 423 | rec := heap.Pop(&mh).(*localLogRec) |
| 424 | |
| 425 | if !filterLogRec(rec, initVec, initPfxs) { |
| 426 | // Send on the wire. |
| 427 | wireRec := interfaces.LogRec{Metadata: rec.Metadata} |
| 428 | // TODO(hpucha): Hash out this fake stream stuff when |
| 429 | // defining the RPC and the rest of the responder. |
| 430 | stream.Send(wireRec) |
| 431 | } |
| 432 | |
| 433 | // Add a new record from the same device if not done. |
| 434 | dev := rec.Metadata.Id |
| 435 | rec, err := getNextLogRec(ctx, st, dev, diff[dev]) |
| 436 | if err != nil { |
| 437 | return nil, err |
| 438 | } |
| 439 | if rec != nil { |
| 440 | heap.Push(&mh, rec) |
| 441 | } else { |
| 442 | delete(diff, dev) |
| 443 | } |
| 444 | } |
| 445 | |
| 446 | return outVec, nil |
| 447 | } |
| 448 | |
| 449 | // computeDeltaBound computes the bound on missing generations across all |
| 450 | // requested prefixes (phase 1 of sendDeltas). |
| 451 | func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) { |
| 452 | respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName) |
| 453 | if err != nil { |
| 454 | return nil, nil, err |
| 455 | } |
| 456 | respPfxs := extractAndSortPrefixes(respVec) |
| 457 | initPfxs := extractAndSortPrefixes(initVec) |
| 458 | if len(initPfxs) == 0 { |
| 459 | return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector") |
| 460 | } |
| 461 | |
| 462 | outVec := make(interfaces.GenVector) |
| 463 | diff := make(genRangeVector) |
| 464 | pfx := initPfxs[0] |
| 465 | |
| 466 | for _, p := range initPfxs { |
| 467 | if strings.HasPrefix(p, pfx) && p != pfx { |
| 468 | continue |
| 469 | } |
| 470 | |
| 471 | // Process this prefix as this is the start of a new set of |
| 472 | // nested prefixes. |
| 473 | pfx = p |
| 474 | |
| 475 | // Lower bound on initiator's knowledge for this prefix set. |
| 476 | initpgv := initVec[pfx] |
| 477 | |
| 478 | // Find the relevant responder prefixes and add the corresponding knowledge. |
| 479 | var respgv interfaces.PrefixGenVector |
| 480 | var rpStart string |
| 481 | for _, rp := range respPfxs { |
| 482 | if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) { |
| 483 | // No relationship with pfx. |
| 484 | continue |
| 485 | } |
| 486 | |
| 487 | if strings.HasPrefix(pfx, rp) { |
| 488 | // If rp is a prefix of pfx, remember it because |
| 489 | // it may be a potential starting point for the |
| 490 | // responder's knowledge. The actual starting |
| 491 | // point is the deepest prefix where rp is a |
| 492 | // prefix of pfx. |
| 493 | // |
| 494 | // Say the initiator is looking for "foo", and |
| 495 | // the responder has knowledge for "f" and "fo", |
| 496 | // the responder's starting point will be the |
| 497 | // prefix genvector for "fo". Similarly, if the |
| 498 | // responder has knowledge for "foo", the |
| 499 | // starting point will be the prefix genvector |
| 500 | // for "foo". |
| 501 | rpStart = rp |
| 502 | } else { |
| 503 | // If pfx is a prefix of rp, this knowledge must |
| 504 | // be definitely sent to the initiator. Diff the |
| 505 | // prefix genvectors to adjust the delta bound and |
| 506 | // include in outVec. |
| 507 | respgv = respVec[rp] |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 508 | s.diffPrefixGenVectors(respgv, initpgv, diff) |
| 509 | outVec[rp] = respgv |
| 510 | } |
| 511 | } |
| 512 | |
| 513 | // Deal with the starting point. |
| 514 | if rpStart == "" { |
| 515 | // No matching prefixes for pfx were found. |
| 516 | respgv = make(interfaces.PrefixGenVector) |
Himabindu Pucha | 03ef393 | 2015-06-26 17:56:09 -0700 | [diff] [blame] | 517 | respgv[s.id] = respGen |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 518 | } else { |
| 519 | respgv = respVec[rpStart] |
| 520 | } |
Himabindu Pucha | 5349702 | 2015-06-19 11:57:13 -0700 | [diff] [blame] | 521 | s.diffPrefixGenVectors(respgv, initpgv, diff) |
| 522 | outVec[pfx] = respgv |
| 523 | } |
| 524 | |
| 525 | return diff, outVec, nil |
| 526 | } |
| 527 | |
| 528 | // genRange represents a range of generations (min and max inclusive). |
| 529 | type genRange struct { |
| 530 | min uint64 |
| 531 | max uint64 |
| 532 | cur uint64 |
| 533 | } |
| 534 | |
| 535 | type genRangeVector map[uint64]*genRange |
| 536 | |
| 537 | // diffPrefixGenVectors diffs two generation vectors, belonging to the responder |
| 538 | // and the initiator, and updates the range of generations per device known to |
| 539 | // the responder but not known to the initiator. "gens" (generation range) is |
| 540 | // passed in as an input argument so that it can be incrementally updated as the |
| 541 | // range of missing generations grows when different responder prefix genvectors |
| 542 | // are used to compute the diff. |
| 543 | // |
| 544 | // For example: Generation vector for responder is say RVec = {A:10, B:5, C:1}, |
| 545 | // Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these |
| 546 | // two vectors returns: {A:[6-10], C:[1-1]}. |
| 547 | // |
| 548 | // TODO(hpucha): Add reclaimVec for GCing. |
| 549 | func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) { |
| 550 | // Compute missing generations for devices that are in both initiator's and responder's vectors. |
| 551 | for devid, gen := range initPVec { |
| 552 | rgen, ok := respPVec[devid] |
| 553 | // Skip since responder doesn't know of this device. |
| 554 | if ok { |
| 555 | updateDevRange(devid, rgen, gen, gens) |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | // Compute missing generations for devices not in initiator's vector but in responder's vector. |
| 560 | for devid, rgen := range respPVec { |
| 561 | if _, ok := initPVec[devid]; !ok { |
| 562 | updateDevRange(devid, rgen, 0, gens) |
| 563 | } |
| 564 | } |
| 565 | } |
| 566 | |
| 567 | func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) { |
| 568 | if gen < rgen { |
| 569 | // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive. |
| 570 | if r, ok := gens[devid]; !ok { |
| 571 | gens[devid] = &genRange{min: gen + 1, max: rgen} |
| 572 | } else { |
| 573 | if gen+1 < r.min { |
| 574 | r.min = gen + 1 |
| 575 | } |
| 576 | if rgen > r.max { |
| 577 | r.max = rgen |
| 578 | } |
| 579 | } |
| 580 | } |
| 581 | } |
| 582 | |
| 583 | func extractAndSortPrefixes(vec interfaces.GenVector) []string { |
| 584 | pfxs := make([]string, len(vec)) |
| 585 | i := 0 |
| 586 | for p := range vec { |
| 587 | pfxs[i] = p |
| 588 | i++ |
| 589 | } |
| 590 | sort.Strings(pfxs) |
| 591 | return pfxs |
| 592 | } |
| 593 | |
| 594 | // TODO(hpucha): This can be optimized using a scan instead of "gets" in a for |
| 595 | // loop. |
| 596 | func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) { |
| 597 | for i := r.cur; i <= r.max; i++ { |
| 598 | rec, err := getLogRec(ctx, sn, dev, i) |
| 599 | if err == nil { |
| 600 | r.cur = i + 1 |
| 601 | return rec, nil |
| 602 | } |
| 603 | if verror.ErrorID(err) != verror.ErrNoExist.ID { |
| 604 | return nil, err |
| 605 | } |
| 606 | } |
| 607 | return nil, nil |
| 608 | } |
| 609 | |
| 610 | // Note: initPfxs is sorted. |
| 611 | func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool { |
| 612 | filter := true |
| 613 | |
| 614 | var maxGen uint64 |
| 615 | for _, p := range initPfxs { |
| 616 | if strings.HasPrefix(rec.Metadata.ObjId, p) { |
| 617 | // Do not filter. Initiator is interested in this |
| 618 | // prefix. |
| 619 | filter = false |
| 620 | |
| 621 | // Track if the initiator knows of this record. |
| 622 | gen := initVec[p][rec.Metadata.Id] |
| 623 | if maxGen < gen { |
| 624 | maxGen = gen |
| 625 | } |
| 626 | } |
| 627 | } |
| 628 | |
| 629 | // Filter this record if the initiator already has it. |
| 630 | if maxGen >= rec.Metadata.Gen { |
| 631 | return true |
| 632 | } |
| 633 | |
| 634 | return filter |
| 635 | } |
| 636 | |
| 637 | // A minHeap implements heap.Interface and holds local log records. |
| 638 | type minHeap []*localLogRec |
| 639 | |
| 640 | func (mh minHeap) Len() int { return len(mh) } |
| 641 | |
| 642 | func (mh minHeap) Less(i, j int) bool { |
| 643 | return mh[i].Pos < mh[j].Pos |
| 644 | } |
| 645 | |
| 646 | func (mh minHeap) Swap(i, j int) { |
| 647 | mh[i], mh[j] = mh[j], mh[i] |
| 648 | } |
| 649 | |
| 650 | func (mh *minHeap) Push(x interface{}) { |
| 651 | item := x.(*localLogRec) |
| 652 | *mh = append(*mh, item) |
| 653 | } |
| 654 | |
| 655 | func (mh *minHeap) Pop() interface{} { |
| 656 | old := *mh |
| 657 | n := len(old) |
| 658 | item := old[n-1] |
| 659 | *mh = old[0 : n-1] |
| 660 | return item |
| 661 | } |
| 662 | |
| 663 | type logRecStream interface { |
| 664 | Send(interfaces.LogRec) |
| 665 | } |