| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Initiator is a goroutine that periodically picks a peer from all the known |
| // remote peers, and requests deltas from that peer for all the SyncGroups in |
| // common across all apps/databases. It then modifies the sync metadata (DAG and |
| // local log records) based on the deltas, detects and resolves conflicts if |
| // any, and suitably updates the local Databases. |
| |
| import ( |
| "sort" |
| "strings" |
| "time" |
| |
| "v.io/syncbase/v23/services/syncbase/nosql" |
| "v.io/syncbase/x/ref/services/syncbase/server/interfaces" |
| "v.io/syncbase/x/ref/services/syncbase/server/util" |
| "v.io/syncbase/x/ref/services/syncbase/server/watchable" |
| "v.io/syncbase/x/ref/services/syncbase/store" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/vdl" |
| "v.io/v23/verror" |
| "v.io/v23/vom" |
| "v.io/x/lib/vlog" |
| ) |
| |
| // Policies to pick a peer to sync with. |
| const ( |
| // Picks a peer at random from the available set. |
| selectRandom = iota |
| |
| // TODO(hpucha): implement other policies. |
| // Picks a peer with most differing generations. |
| selectMostDiff |
| |
| // Picks a peer that was synced with the furthest in the past. |
| selectOldest |
| ) |
| |
| var ( |
| // peerSyncInterval is the duration between two consecutive peer |
| // contacts. During every peer contact, the initiator obtains any |
| // pending updates from that peer. |
| peerSyncInterval = 50 * time.Millisecond |
| |
| // peerSelectionPolicy is the policy used to select a peer when |
| // the initiator gets a chance to sync. |
| peerSelectionPolicy = selectRandom |
| ) |
| |
| // syncer wakes up every peerSyncInterval to do work: (1) Act as an initiator |
| // for SyncGroup metadata by selecting a SyncGroup Admin, and syncing Syncgroup |
| // metadata with it (getting updates from the remote peer, detecting and |
| // resolving conflicts); (2) Refresh memberView if needed and act as an |
| // initiator for data by selecting a peer, and syncing data corresponding to all |
| // common SyncGroups across all Databases; (3) Act as a SyncGroup publisher to |
| // publish pending SyncGroups; (4) Garbage collect older generations. |
| // |
| // TODO(hpucha): Currently only does initiation. Add rest. |
| func (s *syncService) syncer(ctx *context.T) { |
| defer s.pending.Done() |
| |
| ticker := time.NewTicker(peerSyncInterval) |
| defer ticker.Stop() |
| |
| for { |
| select { |
| case <-s.closed: |
| vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit") |
| return |
| |
| case <-ticker.C: |
| } |
| |
| // TODO(hpucha): Cut a gen for the responder even if there is no |
| // one to initiate to? |
| |
| // Do work. |
| peer, err := s.pickPeer(ctx) |
| if err != nil { |
| continue |
| } |
| s.getDeltasFromPeer(ctx, peer) |
| } |
| } |
| |
| // getDeltasFromPeer performs an initiation round to the specified |
| // peer. An initiation round consists of: |
| // * Contacting the peer to receive all the deltas based on the local genvector. |
| // * Processing those deltas to discover objects which have been updated. |
| // * Processing updated objects to detect and resolve any conflicts if needed. |
| // * Communicating relevant object updates to the Database, and updating local |
| // genvector to catch up to the received remote genvector. |
| // |
| // The processing of the deltas is done one Database at a time. If a local error |
| // is encountered during the processing of a Database, that Database is skipped |
| // and the initiator continues on to the next one. If the connection to the peer |
| // encounters an error, this initiation round is aborted. Note that until the |
| // local genvector is updated based on the received deltas (the last step in an |
| // initiation round), the work done by the initiator is idempotent. |
| // |
| // TODO(hpucha): Check the idempotence, esp in addNode in DAG. |
| func (s *syncService) getDeltasFromPeer(ctxIn *context.T, peer string) { |
| vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer) |
| defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer) |
| |
| ctx, cancel := context.WithRootCancel(ctxIn) |
| |
| info := s.copyMemberInfo(ctx, peer) |
| if info == nil { |
| vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer) |
| } |
| connected := false |
| var stream interfaces.SyncGetDeltasClientCall |
| |
| // Sync each Database that may have SyncGroups common with this peer, |
| // one at a time. |
| for gdbName, sgInfo := range info.db2sg { |
| |
| // Initialize initiation state for syncing this Database. |
| iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo) |
| if err != nil { |
| vlog.Errorf("sync: getDeltasFromPeer: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err) |
| continue |
| } |
| |
| if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 { |
| vlog.Errorf("sync: getDeltasFromPeer: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err) |
| continue |
| } |
| |
| // Make contact with the peer once. |
| if !connected { |
| stream, connected = iSt.connectToPeer(ctx) |
| if !connected { |
| // Try a different Database. Perhaps there are |
| // different mount tables. |
| continue |
| } |
| } |
| |
| // Create local genvec so that it contains knowledge only about common prefixes. |
| if err := iSt.createLocalGenVec(ctx); err != nil { |
| vlog.Errorf("sync: getDeltasFromPeer: error creating local genvec for gdb %s, err %v", gdbName, err) |
| continue |
| } |
| |
| iSt.stream = stream |
| req := interfaces.DeltaReq{ |
| AppName: iSt.appName, |
| DbName: iSt.dbName, |
| SgIds: iSt.sgIds, |
| InitVec: iSt.local, |
| } |
| |
| vlog.VI(3).Infof("sync: getDeltasFromPeer: send request: %v", req) |
| sender := iSt.stream.SendStream() |
| sender.Send(req) |
| |
| // Obtain deltas from the peer over the network. |
| if err := iSt.recvAndProcessDeltas(ctx); err != nil { |
| vlog.Errorf("sync: getDeltasFromPeer: error receiving deltas for gdb %s, err %v", gdbName, err) |
| // Returning here since something could be wrong with |
| // the connection, and no point in attempting the next |
| // Database. |
| cancel() |
| stream.Finish() |
| return |
| } |
| vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote) |
| |
| if err := iSt.processUpdatedObjects(ctx); err != nil { |
| vlog.Errorf("sync: getDeltasFromPeer: error processing objects for gdb %s, err %v", gdbName, err) |
| // Move to the next Database even if processing updates |
| // failed. |
| continue |
| } |
| } |
| |
| if connected { |
| stream.Finish() |
| } |
| cancel() |
| } |
| |
| type sgSet map[interfaces.GroupId]struct{} |
| |
| // initiationState is accumulated for each Database during an initiation round. |
| type initiationState struct { |
| // Relative name of the peer to sync with. |
| peer string |
| |
| // Collection of mount tables that this peer may have registered with. |
| mtTables map[string]struct{} |
| |
| // SyncGroups being requested in the initiation round. |
| sgIds sgSet |
| |
| // SyncGroup prefixes being requested in the initiation round, and their |
| // corresponding SyncGroup ids. |
| sgPfxs map[string]sgSet |
| |
| // Local generation vector. |
| local interfaces.GenVector |
| |
| // Generation vector from the remote peer. |
| remote interfaces.GenVector |
| |
| // Updated local generation vector at the end of the initiation round. |
| updLocal interfaces.GenVector |
| |
| // State to track updated objects during a log replay. |
| updObjects map[string]*objConflictState |
| |
| // DAG state that tracks conflicts and common ancestors. |
| dagGraft graftMap |
| |
| sync *syncService |
| appName string |
| dbName string |
| st store.Store // Store handle to the Database. |
| stream interfaces.SyncGetDeltasClientCall // Stream handle for the GetDeltas RPC. |
| |
| // Transaction handle for the initiation round. Used during the update |
| // of objects in the Database. |
| tx store.Transaction |
| } |
| |
| // objConflictState contains the conflict state for an object that is updated |
| // during an initiator round. |
| type objConflictState struct { |
| isConflict bool |
| newHead string |
| oldHead string |
| ancestor string |
| res *conflictResolution |
| } |
| |
| // newInitiationState creates new initiation state. |
| func newInitiationState(ctx *context.T, s *syncService, peer string, name string, sgInfo sgMemberInfo) (*initiationState, error) { |
| iSt := &initiationState{} |
| iSt.peer = peer |
| iSt.updObjects = make(map[string]*objConflictState) |
| iSt.dagGraft = newGraft() |
| iSt.sync = s |
| |
| // TODO(hpucha): Would be nice to standardize on the combined "app:db" |
| // name across sync (not syncbase) so we only join split/join them at |
| // the boundary with the store part. |
| var err error |
| iSt.appName, iSt.dbName, err = splitAppDbName(ctx, name) |
| if err != nil { |
| return nil, err |
| } |
| |
| // TODO(hpucha): nil rpc.ServerCall ok? |
| iSt.st, err = s.getDbStore(ctx, nil, iSt.appName, iSt.dbName) |
| if err != nil { |
| return nil, err |
| } |
| |
| iSt.peerMtTblsAndSgInfo(ctx, peer, sgInfo) |
| |
| return iSt, nil |
| } |
| |
| // peerMtTblsAndSgInfo computes the possible mount tables, the SyncGroup Ids and |
| // prefixes common with a remote peer in a particular Database by consulting the |
| // SyncGroups in the specified Database. |
| func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) { |
| iSt.mtTables = make(map[string]struct{}) |
| iSt.sgIds = make(sgSet) |
| iSt.sgPfxs = make(map[string]sgSet) |
| |
| for id := range info { |
| sg, err := getSyncGroupById(ctx, iSt.st, id) |
| if err != nil { |
| continue |
| } |
| if _, ok := sg.Joiners[peer]; !ok { |
| // Peer is no longer part of the SyncGroup. |
| continue |
| } |
| for _, mt := range sg.Spec.MountTables { |
| iSt.mtTables[mt] = struct{}{} |
| } |
| iSt.sgIds[id] = struct{}{} |
| |
| for _, p := range sg.Spec.Prefixes { |
| sgs, ok := iSt.sgPfxs[p] |
| if !ok { |
| sgs = make(sgSet) |
| iSt.sgPfxs[p] = sgs |
| } |
| sgs[id] = struct{}{} |
| } |
| } |
| } |
| |
| // connectToPeer attempts to connect to the remote peer using the mount tables |
| // obtained from the SyncGroups being synced in the current Database. |
| func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) { |
| if len(iSt.mtTables) < 1 { |
| vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName) |
| return nil, false |
| } |
| for mt := range iSt.mtTables { |
| absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix) |
| c := interfaces.SyncClient(absName) |
| stream, err := c.GetDeltas(ctx, iSt.sync.name) |
| if err == nil { |
| vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName) |
| return stream, true |
| } |
| } |
| return nil, false |
| } |
| |
| // createLocalGenVec creates the generation vector with local knowledge for the |
| // initiator to send to the responder. |
| // |
| // TODO(hpucha): Refactor this code with computeDelta code in sync_state.go. |
| func (iSt *initiationState) createLocalGenVec(ctx *context.T) error { |
| iSt.sync.thLock.Lock() |
| defer iSt.sync.thLock.Unlock() |
| |
| // Freeze the most recent batch of local changes before fetching |
| // remote changes from a peer. This frozen state is used by the |
| // responder when responding to GetDeltas RPC. |
| // |
| // We only allow an initiator to freeze local generations (not |
| // responders/watcher) in order to maintain a static baseline |
| // for the duration of a sync. This addresses the following race |
| // condition: If we allow responders to use newer local |
| // generations while the initiator is in progress, they may beat |
| // the initiator and send these new generations to remote |
| // devices. These remote devices in turn can send these |
| // generations back to the initiator in progress which was |
| // started with older generation information. |
| if err := iSt.sync.checkptLocalGen(ctx, iSt.appName, iSt.dbName); err != nil { |
| return err |
| } |
| |
| local, lgen, err := iSt.sync.copyDbGenInfo(ctx, iSt.appName, iSt.dbName) |
| if err != nil { |
| return err |
| } |
| localPfxs := extractAndSortPrefixes(local) |
| |
| sgPfxs := make([]string, len(iSt.sgPfxs)) |
| i := 0 |
| for p := range iSt.sgPfxs { |
| sgPfxs[i] = p |
| i++ |
| } |
| sort.Strings(sgPfxs) |
| |
| iSt.local = make(interfaces.GenVector) |
| |
| if len(sgPfxs) == 0 { |
| return verror.New(verror.ErrInternal, ctx, "no syncgroups for syncing") |
| } |
| |
| pfx := sgPfxs[0] |
| for _, p := range sgPfxs { |
| if strings.HasPrefix(p, pfx) && p != pfx { |
| continue |
| } |
| |
| // Process this prefix as this is the start of a new set of |
| // nested prefixes. |
| pfx = p |
| var lpStart string |
| for _, lp := range localPfxs { |
| if !strings.HasPrefix(lp, pfx) && !strings.HasPrefix(pfx, lp) { |
| // No relationship with pfx. |
| continue |
| } |
| if strings.HasPrefix(pfx, lp) { |
| lpStart = lp |
| } else { |
| iSt.local[lp] = local[lp] |
| } |
| } |
| // Deal with the starting point. |
| if lpStart == "" { |
| // No matching prefixes for pfx were found. |
| iSt.local[pfx] = make(interfaces.PrefixGenVector) |
| iSt.local[pfx][iSt.sync.id] = lgen |
| } else { |
| iSt.local[pfx] = local[lpStart] |
| } |
| } |
| return nil |
| } |
| |
| // recvAndProcessDeltas first receives the log records and generation vector |
| // from the GetDeltas RPC and puts them in the Database. It also replays the |
| // entire log stream as the log records arrive. These records span multiple |
| // generations from different devices. It does not perform any conflict |
| // resolution during replay. This avoids resolving conflicts that have already |
| // been resolved by other devices. |
| func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error { |
| iSt.sync.thLock.Lock() |
| defer iSt.sync.thLock.Unlock() |
| |
| // TODO(hpucha): This works for now, but figure out a long term solution |
| // as this may be implementation dependent. It currently works because |
| // the RecvStream call is stateless, and grabbing a handle to it |
| // repeatedly doesn't affect what data is seen next. |
| rcvr := iSt.stream.RecvStream() |
| start, finish := false, false |
| |
| // TODO(hpucha): See if we can avoid committing the entire delta stream |
| // as one batch. Currently the dependency is between the log records and |
| // the batch info. |
| tx := iSt.st.NewTransaction() |
| committed := false |
| |
| defer func() { |
| if !committed { |
| tx.Abort() |
| } |
| }() |
| |
| // Track received batches (BatchId --> BatchCount mapping). |
| batchMap := make(map[uint64]uint64) |
| |
| for rcvr.Advance() { |
| resp := rcvr.Value() |
| switch v := resp.(type) { |
| case interfaces.DeltaRespStart: |
| if start { |
| return verror.New(verror.ErrInternal, ctx, "received start followed by start in delta response stream") |
| } |
| start = true |
| |
| case interfaces.DeltaRespFinish: |
| if finish { |
| return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream") |
| } |
| finish = true |
| |
| case interfaces.DeltaRespRespVec: |
| iSt.remote = v.Value |
| |
| case interfaces.DeltaRespRec: |
| // Insert log record in Database. |
| // TODO(hpucha): Should we reserve more positions in a batch? |
| // TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress. |
| pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1) |
| rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos} |
| batchId := rec.Metadata.BatchId |
| if batchId != NoBatchId { |
| if cnt, ok := batchMap[batchId]; !ok { |
| if iSt.sync.startBatch(ctx, tx, batchId) != batchId { |
| return verror.New(verror.ErrInternal, ctx, "failed to create batch info") |
| } |
| batchMap[batchId] = rec.Metadata.BatchCount |
| } else if cnt != rec.Metadata.BatchCount { |
| return verror.New(verror.ErrInternal, ctx, "inconsistent counts for tid", batchId, cnt, rec.Metadata.BatchCount) |
| } |
| } |
| |
| vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec) |
| if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil { |
| return err |
| } |
| |
| // Check for BlobRefs, and process them. |
| if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil { |
| return err |
| } |
| |
| // Mark object dirty. |
| iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{} |
| } |
| |
| // Break out of the stream. |
| if finish { |
| break |
| } |
| } |
| |
| if !(start && finish) { |
| return verror.New(verror.ErrInternal, ctx, "didn't receive start/finish delimiters in delta response stream") |
| } |
| |
| if err := rcvr.Err(); err != nil { |
| return err |
| } |
| |
| // End the started batches if any. |
| for bid, cnt := range batchMap { |
| if err := iSt.sync.endBatch(ctx, tx, bid, cnt); err != nil { |
| return err |
| } |
| } |
| |
| // Commit this transaction. We do not retry this transaction since it |
| // should not conflict with any other keys. So if it fails, it is a |
| // non-retriable error. |
| err := tx.Commit() |
| if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID { |
| // Note: This might be triggered with memstore until it handles |
| // transactions in a more fine-grained fashion. |
| vlog.Fatalf("sync: recvAndProcessDeltas: encountered concurrent transaction") |
| } |
| if err == nil { |
| committed = true |
| } |
| return err |
| } |
| |
| func (iSt *initiationState) processBlobRefs(ctx *context.T, m *interfaces.LogRecMetadata, valbuf []byte) error { |
| objid := m.ObjId |
| srcPeer := syncbaseIdToName(m.Id) |
| |
| vlog.VI(4).Infof("sync: processBlobRefs: begin processing blob refs for objid %s", objid) |
| defer vlog.VI(4).Infof("sync: processBlobRefs: end processing blob refs for objid %s", objid) |
| |
| if valbuf == nil { |
| return nil |
| } |
| |
| var val *vdl.Value |
| if err := vom.Decode(valbuf, &val); err != nil { |
| return err |
| } |
| |
| brs := make(map[nosql.BlobRef]struct{}) |
| if err := extractBlobRefs(val, brs); err != nil { |
| return err |
| } |
| sgIds := make(sgSet) |
| for br := range brs { |
| for p, sgs := range iSt.sgPfxs { |
| if strings.HasPrefix(extractAppKey(objid), p) { |
| for sg := range sgs { |
| sgIds[sg] = struct{}{} |
| } |
| } |
| } |
| vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.peer, srcPeer, sgIds) |
| info := &blobLocInfo{peer: iSt.peer, source: srcPeer, sgIds: sgIds} |
| if err := iSt.sync.addBlobLocInfo(ctx, br, info); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // TODO(hpucha): Handle blobrefs part of list, map, any. |
| func extractBlobRefs(val *vdl.Value, brs map[nosql.BlobRef]struct{}) error { |
| if val == nil { |
| return nil |
| } |
| switch val.Kind() { |
| case vdl.String: |
| // Could be a BlobRef. |
| var br nosql.BlobRef |
| if val.Type() == vdl.TypeOf(br) { |
| brs[nosql.BlobRef(val.RawString())] = struct{}{} |
| } |
| case vdl.Struct: |
| for i := 0; i < val.Type().NumField(); i++ { |
| v := val.StructField(i) |
| if err := extractBlobRefs(v, brs); err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // insertRecInLogDagAndDb adds a new log record to log and dag data structures, |
| // and inserts the versioned value in the Database. |
| func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error { |
| if err := putLogRec(ctx, tx, rec); err != nil { |
| return err |
| } |
| |
| m := rec.Metadata |
| logKey := logRecKey(m.Id, m.Gen) |
| |
| var err error |
| switch m.RecType { |
| case interfaces.NodeRec: |
| err = iSt.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft) |
| case interfaces.LinkRec: |
| err = iSt.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft) |
| default: |
| err = verror.New(verror.ErrInternal, ctx, "unknown log record type") |
| } |
| |
| if err != nil { |
| return err |
| } |
| // TODO(hpucha): Hack right now. Need to change Database's handling of |
| // deleted objects. Currently, the initiator needs to treat deletions |
| // specially since deletions do not get a version number or a special |
| // value in the Database. |
| if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec { |
| return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers)) |
| } |
| return nil |
| } |
| |
| // processUpdatedObjects processes all the updates received by the initiator, |
| // one object at a time. Conflict detection and resolution is carried out after |
| // the entire delta of log records is replayed, instead of incrementally after |
| // each record/batch is replayed, to avoid repeating conflict resolution already |
| // performed by other peers. |
| // |
| // For each updated object, we first check if the object has any conflicts, |
| // resulting in three possibilities: |
| // |
| // * There is no conflict, and no updates are needed to the Database |
| // (isConflict=false, newHead == oldHead). All changes received convey |
| // information that still keeps the local head as the most recent version. This |
| // occurs when conflicts are resolved by picking the existing local version. |
| // |
| // * There is no conflict, but a remote version is discovered that builds on the |
| // local head (isConflict=false, newHead != oldHead). In this case, we generate |
| // a Database update to simply update the Database to the latest value. |
| // |
| // * There is a conflict and we call into the app or use a well-known policy to |
| // resolve the conflict, resulting in three possibilties: (a) conflict was |
| // resolved by picking the local version. In this case, Database need not be |
| // updated, but a link is added to record the choice. (b) conflict was resolved |
| // by picking the remote version. In this case, Database is updated with the |
| // remote version and a link is added as well. (c) conflict was resolved by |
| // generating a new Database update. In this case, Database is updated with the |
| // new version. |
| // |
| // We collect all the updates to the Database in a transaction. In addition, as |
| // part of the same transaction, we update the log and dag state suitably (move |
| // the head ptr of the object in the dag to the latest version, and create a new |
| // log record reflecting conflict resolution if any). Finally, we update the |
| // sync state first on storage. This transaction's commit can fail since |
| // preconditions on the objects may have been violated. In this case, we wait to |
| // get the latest versions of objects from the Database, and recheck if the object |
| // has any conflicts and repeat the above steps, until the transaction commits |
| // successfully. Upon commit, we also update the in-memory sync state of the |
| // Database. |
| func (iSt *initiationState) processUpdatedObjects(ctx *context.T) error { |
| // Note that the tx handle in initiation state is cached for the scope of |
| // this function only as different stages in the pipeline add to the |
| // transaction. |
| committed := false |
| defer func() { |
| if !committed { |
| iSt.tx.Abort() |
| } |
| }() |
| |
| for { |
| vlog.VI(3).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects)) |
| |
| iSt.tx = iSt.st.NewTransaction() |
| watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression |
| |
| if count, err := iSt.detectConflicts(ctx); err != nil { |
| return err |
| } else { |
| vlog.VI(3).Infof("sync: processUpdatedObjects: %d conflicts detected", count) |
| } |
| |
| if err := iSt.resolveConflicts(ctx); err != nil { |
| return err |
| } |
| |
| err := iSt.updateDbAndSyncSt(ctx) |
| if err == nil { |
| err = iSt.tx.Commit() |
| } |
| if err == nil { |
| committed = true |
| // Update in-memory genvector since commit is successful. |
| if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil { |
| vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err) |
| } |
| vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed") |
| return nil |
| } |
| if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID { |
| return err |
| } |
| |
| // Either updateDbAndSyncSt() or tx.Commit() detected a |
| // concurrent transaction. Retry processing the remote updates. |
| // |
| // TODO(hpucha): Sleeping and retrying is a temporary |
| // solution. Next iteration will have coordination with watch |
| // thread to intelligently retry. Hence this value is not a |
| // config param. |
| vlog.VI(3).Info("sync: processUpdatedObjects: retry due to local mutations") |
| iSt.tx.Abort() |
| time.Sleep(1 * time.Second) |
| } |
| } |
| |
| // detectConflicts iterates through all the updated objects to detect conflicts. |
| func (iSt *initiationState) detectConflicts(ctx *context.T) (int, error) { |
| count := 0 |
| for objid, confSt := range iSt.updObjects { |
| // Check if object has a conflict. |
| var err error |
| confSt.isConflict, confSt.newHead, confSt.oldHead, confSt.ancestor, err = hasConflict(ctx, iSt.tx, objid, iSt.dagGraft) |
| if err != nil { |
| return 0, err |
| } |
| |
| if !confSt.isConflict { |
| if confSt.newHead == confSt.oldHead { |
| confSt.res = &conflictResolution{ty: pickLocal} |
| } else { |
| confSt.res = &conflictResolution{ty: pickRemote} |
| } |
| } else { |
| count++ |
| } |
| } |
| return count, nil |
| } |
| |
| // updateDbAndSync updates the Database, and if that is successful, updates log, |
| // dag and genvector data structures as needed. |
| func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error { |
| for objid, confSt := range iSt.updObjects { |
| // If the local version is picked, no further updates to the |
| // Database are needed. If the remote version is picked or if a |
| // new version is created, we put it in the Database. |
| if confSt.res.ty != pickLocal { |
| |
| // TODO(hpucha): Hack right now. Need to change Database's |
| // handling of deleted objects. |
| oldVersDeleted := true |
| if confSt.oldHead != NoVersion { |
| oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead) |
| if err != nil { |
| return err |
| } |
| oldVersDeleted = oldDagNode.Deleted |
| } |
| |
| var newVersion string |
| var newVersDeleted bool |
| switch confSt.res.ty { |
| case pickRemote: |
| newVersion = confSt.newHead |
| newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion) |
| if err != nil { |
| return err |
| } |
| newVersDeleted = newDagNode.Deleted |
| case createNew: |
| newVersion = confSt.res.rec.Metadata.CurVers |
| newVersDeleted = confSt.res.rec.Metadata.Delete |
| } |
| |
| // Skip delete followed by a delete. |
| if oldVersDeleted && newVersDeleted { |
| continue |
| } |
| |
| if !oldVersDeleted { |
| // Read current version to enter it in the readset of the transaction. |
| version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)) |
| if err != nil { |
| return err |
| } |
| if string(version) != confSt.oldHead { |
| vlog.VI(4).Infof("sync: updateDbAndSyncSt: concurrent updates %s %s", version, confSt.oldHead) |
| return store.NewErrConcurrentTransaction(ctx) |
| } |
| } else { |
| // Ensure key doesn't exist. |
| if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID { |
| return store.NewErrConcurrentTransaction(ctx) |
| } |
| } |
| |
| if !newVersDeleted { |
| if confSt.res.ty == createNew { |
| vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutAtVersion %s %s", objid, newVersion) |
| if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil { |
| return err |
| } |
| } |
| vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutVersion %s %s", objid, newVersion) |
| if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil { |
| return err |
| } |
| } else { |
| vlog.VI(4).Infof("sync: updateDbAndSyncSt: Deleting obj %s", objid) |
| if err := iSt.tx.Delete([]byte(objid)); err != nil { |
| return err |
| } |
| } |
| } |
| // Always update sync state irrespective of local/remote/new |
| // versions being picked. |
| if err := iSt.updateLogAndDag(ctx, objid); err != nil { |
| return err |
| } |
| } |
| |
| return iSt.updateSyncSt(ctx) |
| } |
| |
| // updateLogAndDag updates the log and dag data structures. |
| func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error { |
| confSt, ok := iSt.updObjects[obj] |
| if !ok { |
| return verror.New(verror.ErrInternal, ctx, "object state not found", obj) |
| } |
| var newVersion string |
| |
| if !confSt.isConflict { |
| newVersion = confSt.newHead |
| } else { |
| // Object had a conflict. Create a log record to reflect resolution. |
| var rec *localLogRec |
| |
| switch { |
| case confSt.res.ty == pickLocal: |
| // Local version was picked as the conflict resolution. |
| rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.oldHead, confSt.newHead) |
| newVersion = confSt.oldHead |
| case confSt.res.ty == pickRemote: |
| // Remote version was picked as the conflict resolution. |
| rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.newHead, confSt.oldHead) |
| newVersion = confSt.newHead |
| default: |
| // New version was created to resolve the conflict. |
| rec = confSt.res.rec |
| newVersion = confSt.res.rec.Metadata.CurVers |
| } |
| |
| if err := putLogRec(ctx, iSt.tx, rec); err != nil { |
| return err |
| } |
| |
| // Add a new DAG node. |
| var err error |
| m := rec.Metadata |
| switch m.RecType { |
| case interfaces.NodeRec: |
| err = iSt.sync.addNode(ctx, iSt.tx, obj, m.CurVers, logRecKey(m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil) |
| case interfaces.LinkRec: |
| err = iSt.sync.addParent(ctx, iSt.tx, obj, m.CurVers, m.Parents[0], nil) |
| default: |
| return verror.New(verror.ErrInternal, ctx, "unknown log record type") |
| } |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Move the head. This should be idempotent. We may move head to the |
| // local head in some cases. |
| return moveHead(ctx, iSt.tx, obj, newVersion) |
| } |
| |
| func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec { |
| gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1) |
| |
| vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", obj, vers, par) |
| |
| rec := &localLogRec{ |
| Metadata: interfaces.LogRecMetadata{ |
| Id: iSt.sync.id, |
| Gen: gen, |
| RecType: interfaces.LinkRec, |
| |
| ObjId: obj, |
| CurVers: vers, |
| Parents: []string{par}, |
| UpdTime: time.Now().UTC(), |
| BatchId: NoBatchId, |
| BatchCount: 1, |
| // TODO(hpucha): What is its batchid and count? |
| }, |
| Pos: pos, |
| } |
| return rec |
| } |
| |
| // updateSyncSt updates local sync state at the end of an initiator cycle. |
| func (iSt *initiationState) updateSyncSt(ctx *context.T) error { |
| // Get the current local sync state. |
| dsInMem, err := iSt.sync.copyDbSyncStateInMem(ctx, iSt.appName, iSt.dbName) |
| if err != nil { |
| return err |
| } |
| ds := &dbSyncState{ |
| Gen: dsInMem.gen, |
| CheckptGen: dsInMem.checkptGen, |
| GenVec: dsInMem.genvec, |
| } |
| |
| // remote can be a subset of local. |
| for rpfx, respgv := range iSt.remote { |
| for lpfx, lpgv := range ds.GenVec { |
| if strings.HasPrefix(lpfx, rpfx) { |
| mergePrefixGenVectors(lpgv, respgv) |
| } |
| } |
| if _, ok := ds.GenVec[rpfx]; !ok { |
| ds.GenVec[rpfx] = respgv |
| } |
| } |
| |
| iSt.updLocal = ds.GenVec |
| // Clean the genvector of any local state. Note that local state is held |
| // in gen/ckPtGen in sync state struct. |
| for _, pgv := range iSt.updLocal { |
| delete(pgv, iSt.sync.id) |
| } |
| |
| // TODO(hpucha): Add knowledge compaction. |
| |
| return putDbSyncState(ctx, iSt.tx, ds) |
| } |
| |
| // mergePrefixGenVectors merges responder prefix genvector into local genvector. |
| func mergePrefixGenVectors(lpgv, respgv interfaces.PrefixGenVector) { |
| for devid, rgen := range respgv { |
| gen, ok := lpgv[devid] |
| if !ok || gen < rgen { |
| lpgv[devid] = rgen |
| } |
| } |
| } |
| |
| //////////////////////////////////////// |
| // Peer selection policies. |
| |
| // pickPeer picks a Syncbase to sync with. |
| func (s *syncService) pickPeer(ctx *context.T) (string, error) { |
| switch peerSelectionPolicy { |
| case selectRandom: |
| members := s.getMembers(ctx) |
| // Remove myself from the set. |
| delete(members, s.name) |
| if len(members) == 0 { |
| return "", verror.New(verror.ErrInternal, ctx, "no useful peer") |
| } |
| |
| // Pick a peer at random. |
| ind := randIntn(len(members)) |
| for m := range members { |
| if ind == 0 { |
| return m, nil |
| } |
| ind-- |
| } |
| return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed") |
| default: |
| return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy") |
| } |
| } |