Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 1 | package vsync |
| 2 | |
| 3 | import ( |
| 4 | "errors" |
| 5 | "fmt" |
| 6 | "io" |
| 7 | "math/rand" |
| 8 | "strings" |
| 9 | "time" |
| 10 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 11 | "veyron/services/store/raw" |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 12 | |
| 13 | "veyron2/naming" |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 14 | "veyron2/rt" |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 15 | "veyron2/storage" |
| 16 | "veyron2/vlog" |
| 17 | ) |
| 18 | |
| 19 | // Policies to pick a peer to sync with. |
| 20 | const ( |
| 21 | // Picks a peer at random from the available set. |
| 22 | selectRandom = iota |
| 23 | |
| 24 | // TODO(hpucha): implement other policies. |
| 25 | // Picks a peer with most differing generations. |
| 26 | selectMostDiff |
| 27 | |
| 28 | // Picks a peer that was synced with the furthest in the past. |
| 29 | selectOldest |
| 30 | ) |
| 31 | |
| 32 | // Policies for conflict resolution. |
| 33 | const ( |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 34 | // Resolves conflicts by picking the mutation with the most recent timestamp. |
| 35 | useTime = iota |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 36 | |
| 37 | // TODO(hpucha): implement other policies. |
| 38 | // Resolves conflicts by using the app conflict resolver callbacks via store. |
| 39 | useCallback |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 40 | ) |
| 41 | |
| 42 | var ( |
| 43 | // peerSyncInterval is the duration between two consecutive |
| 44 | // sync events. In every sync event, the initiator contacts |
| 45 | // one of its peers to obtain any pending updates. |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 46 | peerSyncInterval = 100 * time.Millisecond |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 47 | |
| 48 | // peerSelectionPolicy is the policy used to select a peer when |
| 49 | // the initiator gets a chance to sync. |
| 50 | peerSelectionPolicy = selectRandom |
| 51 | |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 52 | // conflictResolutionPolicy is the policy used to resolve conflicts. |
| 53 | conflictResolutionPolicy = useTime |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 54 | |
| 55 | errNoUsefulPeer = errors.New("no useful peer to contact") |
| 56 | ) |
| 57 | |
| 58 | // syncInitiator contains the metadata and state for the initiator thread. |
| 59 | type syncInitiator struct { |
| 60 | syncd *syncd |
| 61 | |
| 62 | // State to contact peers periodically and get deltas. |
| 63 | // TODO(hpucha): This is an initial version with command line arguments. |
| 64 | // Next steps are to tie this up into mount table and auto-discover neighbors. |
| 65 | neighbors []string |
| 66 | neighborIDs []string |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 67 | |
| 68 | updObjects map[storage.ID]*objConflictState |
| 69 | } |
| 70 | |
| 71 | // objConflictState contains the conflict state for objects that are |
| 72 | // updated during an initiator run. |
| 73 | type objConflictState struct { |
| 74 | isConflict bool |
| 75 | newHead storage.Version |
| 76 | oldHead storage.Version |
| 77 | ancestor storage.Version |
| 78 | resolvVal *LogValue |
| 79 | } |
| 80 | |
| 81 | // newInitiator creates a new initiator instance attached to the given syncd instance. |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 82 | func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string, syncTick time.Duration) *syncInitiator { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 83 | i := &syncInitiator{syncd: syncd, |
| 84 | updObjects: make(map[storage.ID]*objConflictState), |
| 85 | } |
| 86 | |
| 87 | // Bootstrap my peer list. |
| 88 | if peerEndpoints != "" { |
| 89 | i.neighbors = strings.Split(peerEndpoints, ",") |
| 90 | i.neighborIDs = strings.Split(peerDeviceIDs, ",") |
| 91 | } |
| 92 | if len(i.neighbors) != len(i.neighborIDs) { |
| 93 | vlog.Fatalf("newInitiator: Mismatch between number of endpoints and IDs") |
| 94 | } |
| 95 | |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 96 | // Override the default peerSyncInterval value if syncTick is specified. |
| 97 | if syncTick > 0 { |
| 98 | peerSyncInterval = syncTick |
| 99 | } |
| 100 | |
Himabindu Pucha | ed4f234 | 2014-05-15 15:18:24 -0700 | [diff] [blame] | 101 | vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id) |
| 102 | vlog.VI(1).Infof("newInitiator: Peer endpoints: %v", i.neighbors) |
| 103 | vlog.VI(1).Infof("newInitiator: Peer IDs: %v", i.neighborIDs) |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 104 | vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 105 | |
| 106 | return i |
| 107 | } |
| 108 | |
| 109 | // contactPeers wakes up every peerSyncInterval to contact peers and get deltas from them. |
| 110 | func (i *syncInitiator) contactPeers() { |
| 111 | ticker := time.NewTicker(peerSyncInterval) |
| 112 | for { |
| 113 | select { |
| 114 | case <-i.syncd.closed: |
| 115 | ticker.Stop() |
| 116 | i.syncd.pending.Done() |
| 117 | return |
| 118 | case <-ticker.C: |
| 119 | } |
| 120 | |
| 121 | id, ep, err := i.pickPeer() |
| 122 | if err != nil { |
| 123 | continue |
| 124 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 125 | |
| 126 | // Freeze the most recent batch of local changes |
| 127 | // before fetching remote changes from a peer. |
| 128 | // |
| 129 | // We only allow an initiator to create new local |
| 130 | // generations (not responders/watcher) in order to |
| 131 | // maintain a static baseline for the duration of a |
| 132 | // sync. This addresses the following race condition: |
| 133 | // If we allow responders to create new local |
| 134 | // generations while the initiator is in progress, |
| 135 | // they may beat the initiator and send these new |
| 136 | // generations to remote devices. These remote |
| 137 | // devices in turn can send these generations back to |
| 138 | // the initiator in progress which was started with |
| 139 | // older generation information. |
| 140 | local, err := i.updateLocalGeneration() |
| 141 | if err != nil { |
| 142 | vlog.Fatalf("contactPeers:: error updating local generation: err %v", err) |
| 143 | } |
| 144 | |
| 145 | i.getDeltasFromPeer(id, ep, local) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 146 | } |
| 147 | } |
| 148 | |
| 149 | // pickPeer picks a sync endpoint in the neighborhood to sync with. |
| 150 | func (i *syncInitiator) pickPeer() (string, string, error) { |
| 151 | switch peerSelectionPolicy { |
| 152 | case selectRandom: |
| 153 | // Pick a neighbor at random. |
| 154 | if i.neighbors == nil { |
| 155 | return "", "", errNoUsefulPeer |
| 156 | } |
| 157 | ind := rand.Intn(len(i.neighbors)) |
| 158 | return i.neighborIDs[ind], i.neighbors[ind], nil |
| 159 | default: |
| 160 | return "", "", fmt.Errorf("unknown peer selection policy") |
| 161 | } |
| 162 | } |
| 163 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 164 | // updateLocalGeneration creates a new local generation if needed and |
| 165 | // returns the newest local generation vector. |
| 166 | func (i *syncInitiator) updateLocalGeneration() (GenVector, error) { |
| 167 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 168 | i.syncd.lock.Lock() |
| 169 | defer i.syncd.lock.Unlock() |
| 170 | |
| 171 | // Create a new local generation if there are any local updates. |
| 172 | gen, err := i.syncd.log.createLocalGeneration() |
| 173 | if err == errNoUpdates { |
| 174 | vlog.VI(1).Infof("createLocalGeneration:: No new updates. Local at %d", gen) |
| 175 | return i.syncd.devtab.getGenVec(i.syncd.id) |
| 176 | } |
| 177 | if err != nil { |
| 178 | return GenVector{}, err |
| 179 | } |
| 180 | |
| 181 | // Update local generation vector in devTable. |
| 182 | if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil { |
| 183 | return GenVector{}, err |
| 184 | } |
| 185 | return i.syncd.devtab.getGenVec(i.syncd.id) |
| 186 | } |
| 187 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 188 | // getDeltasFromPeer contacts the specified endpoint to obtain deltas wrt its current generation vector. |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 189 | func (i *syncInitiator) getDeltasFromPeer(dID, ep string, local GenVector) { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 190 | vlog.VI(1).Infof("GetDeltasFromPeer:: From server %s with DeviceID %s at %v", ep, dID, time.Now().UTC()) |
| 191 | |
| 192 | // Construct a new stub that binds to peer endpoint. |
| 193 | c, err := BindSync(naming.JoinAddressName(ep, "sync")) |
| 194 | if err != nil { |
Himabindu Pucha | ed4f234 | 2014-05-15 15:18:24 -0700 | [diff] [blame] | 195 | vlog.Errorf("GetDeltasFromPeer:: error binding to server: err %v", err) |
| 196 | return |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 197 | } |
| 198 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 199 | vlog.VI(1).Infof("GetDeltasFromPeer:: Sending local information: %v", local) |
| 200 | |
| 201 | // Issue a GetDeltas() rpc. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 202 | stream, err := c.GetDeltas(rt.R().TODOContext(), local, i.syncd.id) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 203 | if err != nil { |
Himabindu Pucha | ed4f234 | 2014-05-15 15:18:24 -0700 | [diff] [blame] | 204 | vlog.Errorf("GetDeltasFromPeer:: error getting deltas: err %v", err) |
| 205 | return |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 206 | } |
| 207 | |
| 208 | minGens, err := i.processLogStream(stream) |
| 209 | if err != nil { |
| 210 | vlog.Fatalf("GetDeltasFromPeer:: error processing logs: err %v", err) |
| 211 | } |
| 212 | |
| 213 | remote, err := stream.Finish() |
| 214 | if err != nil { |
| 215 | vlog.Fatalf("GetDeltasFromPeer:: finish failed with err %v", err) |
| 216 | } |
| 217 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 218 | if err := i.processUpdatedObjects(local, minGens, remote, DeviceID(dID)); err != nil { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 219 | vlog.Fatalf("GetDeltasFromPeer:: error processing objects: err %v", err) |
| 220 | } |
| 221 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 222 | vlog.VI(1).Infof("GetDeltasFromPeer:: Local vector %v", local) |
| 223 | vlog.VI(1).Infof("GetDeltasFromPeer:: Remote vector %v", remote) |
| 224 | } |
| 225 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 226 | // processLogStream replays an entire log stream spanning multiple |
| 227 | // generations from different devices received from a single GetDeltas |
| 228 | // call. It does not perform any conflict resolution during replay. |
| 229 | // This avoids resolving conflicts that have already been resolved by |
| 230 | // other devices. |
| 231 | func (i *syncInitiator) processLogStream(stream SyncGetDeltasStream) (GenVector, error) { |
| 232 | // Map to track new generations received in the RPC reply. |
| 233 | // TODO(hpucha): If needed, this can be optimized under the |
| 234 | // assumption that an entire generation is received |
| 235 | // sequentially. We can then parse a generation at a time. |
| 236 | newGens := make(map[string]*genMetadata) |
| 237 | // Array to track order of arrival for the generations. |
| 238 | // We need to preserve this order. |
| 239 | var orderGens []string |
| 240 | // Compute the minimum generation for every device in this set. |
| 241 | minGens := GenVector{} |
| 242 | |
| 243 | for { |
| 244 | rec, err := stream.Recv() |
| 245 | if err == io.EOF { |
| 246 | break |
| 247 | } |
| 248 | if err != nil { |
| 249 | return GenVector{}, err |
| 250 | } |
| 251 | |
| 252 | if err := i.insertRecInLogAndDag(&rec); err != nil { |
| 253 | return GenVector{}, err |
| 254 | } |
| 255 | // Mark object dirty. |
| 256 | i.updObjects[rec.ObjID] = &objConflictState{} |
| 257 | |
| 258 | // Populate the generation metadata. |
| 259 | genKey := generationKey(rec.DevID, rec.GNum) |
| 260 | if gen, ok := newGens[genKey]; !ok { |
| 261 | // New generation in the stream. |
| 262 | orderGens = append(orderGens, genKey) |
| 263 | newGens[genKey] = &genMetadata{ |
| 264 | Count: 1, |
| 265 | MaxLSN: rec.LSN, |
| 266 | } |
| 267 | g, ok := minGens[rec.DevID] |
| 268 | if !ok || g > rec.GNum { |
| 269 | minGens[rec.DevID] = rec.GNum |
| 270 | } |
| 271 | } else { |
| 272 | gen.Count++ |
| 273 | if rec.LSN > gen.MaxLSN { |
| 274 | gen.MaxLSN = rec.LSN |
| 275 | } |
| 276 | } |
| 277 | } |
| 278 | |
| 279 | if err := i.createGenMetadataBatch(newGens, orderGens); err != nil { |
| 280 | return GenVector{}, err |
| 281 | } |
| 282 | |
| 283 | return minGens, nil |
| 284 | } |
| 285 | |
| 286 | // insertLogAndDag adds a new log record to log and dag data structures. |
| 287 | func (i *syncInitiator) insertRecInLogAndDag(rec *LogRec) error { |
| 288 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 289 | i.syncd.lock.Lock() |
| 290 | defer i.syncd.lock.Unlock() |
| 291 | |
| 292 | logKey, err := i.syncd.log.putLogRec(rec) |
| 293 | if err != nil { |
| 294 | return err |
| 295 | } |
| 296 | if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil { |
| 297 | return err |
| 298 | } |
| 299 | return nil |
| 300 | } |
| 301 | |
| 302 | // createGenMetadataBatch inserts a batch of generations into the log. |
| 303 | func (i *syncInitiator) createGenMetadataBatch(newGens map[string]*genMetadata, orderGens []string) error { |
| 304 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 305 | i.syncd.lock.Lock() |
| 306 | defer i.syncd.lock.Unlock() |
| 307 | |
| 308 | for _, key := range orderGens { |
| 309 | gen := newGens[key] |
| 310 | // Insert the generation metadata. |
| 311 | dev, gnum, err := splitGenerationKey(key) |
| 312 | if err != nil { |
| 313 | return err |
| 314 | } |
| 315 | if err := i.syncd.log.createRemoteGeneration(dev, gnum, gen); err != nil { |
| 316 | return err |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | return nil |
| 321 | } |
| 322 | |
| 323 | // processUpdatedObjects processes all the updates received by the |
| 324 | // initiator, one object at a time. For each updated object, we first |
| 325 | // check if the object has any conflicts. If there is a conflict, we |
| 326 | // resolve the conflict and generate a new store mutation reflecting |
| 327 | // the conflict resolution. If there is no conflict, we generate a |
| 328 | // store mutation to simply update the store to the latest value. We |
| 329 | // then put all these mutations in the store. If the put succeeds, we |
| 330 | // update the log and dag state suitably (move the head ptr of the |
| 331 | // object in the dag to the latest version, and create a new log |
| 332 | // record reflecting conflict resolution if any). Puts to store can |
| 333 | // fail since preconditions on the objects may have been violated. In |
| 334 | // this case, we wait to get the latest versions of objects from the |
| 335 | // store, and recheck if the object has any conflicts and repeat the |
| 336 | // above steps, until put to store succeeds. |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 337 | func (i *syncInitiator) processUpdatedObjects(local, minGens, remote GenVector, dID DeviceID) error { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 338 | for { |
| 339 | if err := i.detectConflicts(); err != nil { |
| 340 | return err |
| 341 | } |
| 342 | |
| 343 | m, err := i.resolveConflicts() |
| 344 | if err != nil { |
| 345 | return err |
| 346 | } |
| 347 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 348 | err = i.updateStoreAndSync(m, local, minGens, remote, dID) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 349 | if err == nil { |
| 350 | break |
| 351 | } |
| 352 | |
| 353 | vlog.Errorf("PutMutations failed %v. Will retry", err) |
| 354 | // TODO(hpucha): Sleeping and retrying is a temporary |
| 355 | // solution. Next iteration will have coordination |
| 356 | // with watch thread to intelligently retry. Hence |
| 357 | // this value is not a config param. |
| 358 | time.Sleep(10 * time.Second) |
| 359 | } |
| 360 | |
| 361 | // Remove any pending state. |
| 362 | i.updObjects = make(map[storage.ID]*objConflictState) |
| 363 | i.syncd.dag.clearGraft() |
| 364 | return nil |
| 365 | } |
| 366 | |
| 367 | // detectConflicts iterates through all the updated objects to detect |
| 368 | // conflicts. |
| 369 | func (i *syncInitiator) detectConflicts() error { |
| 370 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 371 | i.syncd.lock.RLock() |
| 372 | defer i.syncd.lock.RUnlock() |
| 373 | |
| 374 | for obj, st := range i.updObjects { |
| 375 | // Check if object has a conflict. |
| 376 | var err error |
| 377 | st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj) |
| 378 | if err != nil { |
| 379 | return err |
| 380 | } |
| 381 | if !st.isConflict { |
| 382 | rec, err := i.getLogRec(obj, st.newHead) |
| 383 | if err != nil { |
| 384 | return err |
| 385 | } |
| 386 | st.resolvVal = &rec.Value |
| 387 | // Sanity check. |
| 388 | if st.resolvVal.Mutation.Version != st.newHead { |
| 389 | return fmt.Errorf("bad mutation %d %d", |
| 390 | st.resolvVal.Mutation.Version, st.newHead) |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | return nil |
| 395 | } |
| 396 | |
| 397 | // getLogRec returns the log record corresponding to a given object and its version. |
| 398 | func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) { |
| 399 | logKey, err := i.syncd.dag.getLogrec(obj, vers) |
| 400 | if err != nil { |
| 401 | return nil, err |
| 402 | } |
| 403 | dev, gen, lsn, err := splitLogRecKey(logKey) |
| 404 | if err != nil { |
| 405 | return nil, err |
| 406 | } |
| 407 | rec, err := i.syncd.log.getLogRec(dev, gen, lsn) |
| 408 | if err != nil { |
| 409 | return nil, err |
| 410 | } |
| 411 | return rec, nil |
| 412 | } |
| 413 | |
| 414 | // resolveConflicts resolves conflicts for updated objects. |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 415 | func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 416 | switch conflictResolutionPolicy { |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 417 | case useTime: |
| 418 | if err := i.resolveConflictsByTime(); err != nil { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 419 | return nil, err |
| 420 | } |
| 421 | default: |
| 422 | return nil, fmt.Errorf("unknown conflict resolution policy") |
| 423 | } |
| 424 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 425 | var m []raw.Mutation |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 426 | for _, st := range i.updObjects { |
| 427 | // Append to mutations. |
| 428 | st.resolvVal.Mutation.PriorVersion = st.oldHead |
| 429 | m = append(m, st.resolvVal.Mutation) |
| 430 | } |
| 431 | return m, nil |
| 432 | } |
| 433 | |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 434 | // resolveConflictsByTime resolves conflicts using the timestamps |
| 435 | // of the conflicting mutations. It picks a mutation with the larger |
| 436 | // timestamp, i.e. the most recent update. If the timestamps are equal, |
| 437 | // it uses the mutation version numbers as a tie-breaker, picking the |
| 438 | // mutation with the larger version. |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 439 | // |
| 440 | // TODO(hpucha): Based on a few more policies, reconsider nesting |
| 441 | // order of the conflict resolution loop and switch-on-policy. |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 442 | func (i *syncInitiator) resolveConflictsByTime() error { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 443 | for obj, st := range i.updObjects { |
| 444 | if !st.isConflict { |
| 445 | continue |
| 446 | } |
| 447 | |
| 448 | versions := make([]storage.Version, 3) |
| 449 | versions[0] = st.oldHead |
| 450 | versions[1] = st.newHead |
| 451 | versions[2] = st.ancestor |
| 452 | |
| 453 | lrecs, err := i.getLogRecsBatch(obj, versions) |
| 454 | if err != nil { |
| 455 | return err |
| 456 | } |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 457 | |
| 458 | res := 0 |
| 459 | switch { |
| 460 | case lrecs[0].Value.SyncTime > lrecs[1].Value.SyncTime: |
| 461 | res = 0 |
| 462 | case lrecs[0].Value.SyncTime < lrecs[1].Value.SyncTime: |
| 463 | res = 1 |
| 464 | case lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version: |
| 465 | res = 0 |
| 466 | case lrecs[0].Value.Mutation.Version < lrecs[1].Value.Mutation.Version: |
| 467 | res = 1 |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 468 | } |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 469 | |
| 470 | m := lrecs[res].Value.Mutation |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 471 | m.Version = storage.NewVersion() |
| 472 | |
| 473 | // TODO(hpucha): handle continue and delete flags. |
| 474 | st.resolvVal = &LogValue{Mutation: m} |
| 475 | } |
| 476 | |
| 477 | return nil |
| 478 | } |
| 479 | |
| 480 | // getLogRecsBatch gets the log records for an array of versions. |
| 481 | func (i *syncInitiator) getLogRecsBatch(obj storage.ID, versions []storage.Version) ([]*LogRec, error) { |
| 482 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 483 | i.syncd.lock.RLock() |
| 484 | defer i.syncd.lock.RUnlock() |
| 485 | |
| 486 | lrecs := make([]*LogRec, len(versions)) |
| 487 | var err error |
| 488 | for p, v := range versions { |
| 489 | lrecs[p], err = i.getLogRec(obj, v) |
| 490 | if err != nil { |
| 491 | return nil, err |
| 492 | } |
| 493 | } |
| 494 | return lrecs, nil |
| 495 | } |
| 496 | |
| 497 | // updateStoreAndSync updates the store, and if that is successful, |
| 498 | // updates log and dag data structures. |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 499 | func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error { |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 500 | // TODO(hpucha): Eliminate reaching into syncd's lock. |
| 501 | i.syncd.lock.Lock() |
| 502 | defer i.syncd.lock.Unlock() |
| 503 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 504 | // TODO(hpucha): We will hold the lock across PutMutations rpc |
| 505 | // to prevent a race with watcher. The next iteration will |
| 506 | // clean up this coordination. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 507 | if store := i.syncd.store; store != nil && len(m) > 0 { |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame^] | 508 | stream, err := store.PutMutations(rt.R().TODOContext()) |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 509 | if err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 510 | vlog.Errorf("updateStoreAndSync:: putmutations err %v", err) |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 511 | return err |
| 512 | } |
| 513 | for i := range m { |
| 514 | if err := stream.Send(m[i]); err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 515 | vlog.Errorf("updateStoreAndSync:: send err %v", err) |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 516 | return err |
| 517 | } |
| 518 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 519 | if err := stream.CloseSend(); err != nil { |
| 520 | vlog.Errorf("updateStoreAndSync:: closesend err %v", err) |
| 521 | return err |
| 522 | } |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 523 | if err := stream.Finish(); err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 524 | vlog.Errorf("updateStoreAndSync:: finish err %v", err) |
Himabindu Pucha | ed4f234 | 2014-05-15 15:18:24 -0700 | [diff] [blame] | 525 | return err |
| 526 | } |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 527 | } |
| 528 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 529 | vlog.VI(2).Infof("updateStoreAndSync:: putmutations succeeded") |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 530 | if err := i.updateLogAndDag(); err != nil { |
| 531 | return err |
| 532 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 533 | |
| 534 | if err := i.updateGenVecs(local, minGens, remote, DeviceID(dID)); err != nil { |
| 535 | return err |
| 536 | } |
| 537 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 538 | return nil |
| 539 | } |
| 540 | |
| 541 | // updateLogAndDag updates the log and dag data structures on a successful store put. |
| 542 | func (i *syncInitiator) updateLogAndDag() error { |
| 543 | for obj, st := range i.updObjects { |
| 544 | if st.isConflict { |
| 545 | // Object had a conflict, which was resolved successfully. |
| 546 | // Put is successful, create a log record. |
| 547 | parents := []storage.Version{st.newHead, st.oldHead} |
| 548 | rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal) |
| 549 | if err != nil { |
| 550 | return err |
| 551 | } |
| 552 | |
| 553 | logKey, err := i.syncd.log.putLogRec(rec) |
| 554 | if err != nil { |
| 555 | return err |
| 556 | } |
| 557 | |
| 558 | // Put is successful, add a new DAG node. |
| 559 | if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil { |
| 560 | return err |
| 561 | } |
| 562 | } |
| 563 | |
| 564 | // Move the head. |
| 565 | if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil { |
| 566 | return err |
| 567 | } |
| 568 | } |
| 569 | return nil |
| 570 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 571 | |
| 572 | // updateGenVecs updates local, reclaim and remote vectors at the end of an initiator cycle. |
| 573 | func (i *syncInitiator) updateGenVecs(local, minGens, remote GenVector, dID DeviceID) error { |
| 574 | // Update the local gen vector and put it in kvdb only if we have new updates. |
| 575 | if len(i.updObjects) > 0 { |
| 576 | if err := i.syncd.devtab.updateLocalGenVector(local, remote); err != nil { |
| 577 | return err |
| 578 | } |
| 579 | |
| 580 | if err := i.syncd.devtab.putGenVec(i.syncd.id, local); err != nil { |
| 581 | return err |
| 582 | } |
| 583 | |
| 584 | if err := i.syncd.devtab.updateReclaimVec(minGens); err != nil { |
| 585 | return err |
| 586 | } |
| 587 | } |
| 588 | |
| 589 | // Cache the remote generation vector for space reclamation. |
| 590 | if err := i.syncd.devtab.putGenVec(dID, remote); err != nil { |
| 591 | return err |
| 592 | } |
| 593 | return nil |
| 594 | } |