blob: 8d239764ada5d16708b7aaf8b33403274407adb4 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"errors"
"fmt"
"math/rand"
"os"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/vtrace"
"v.io/x/lib/vlog"
)
// Policies to pick a peer to sync with.
const (
// Picks a peer at random from the available set.
selectRandom = iota
// TODO(hpucha): implement other policies.
// Picks a peer with most differing generations.
selectMostDiff
// Picks a peer that was synced with the furthest in the past.
selectOldest
)
// Policies for conflict resolution.
const (
// Resolves conflicts by picking the mutation with the most recent timestamp.
useTime = iota
// TODO(hpucha): implement other policies.
// Resolves conflicts by using the app conflict resolver callbacks via store.
useCallback
)
var (
// peerSyncInterval is the duration between two consecutive
// sync events. In every sync event, the initiator contacts
// one of its peers to obtain any pending updates.
peerSyncInterval = 50 * time.Millisecond
// peerSelectionPolicy is the policy used to select a peer when
// the initiator gets a chance to sync.
peerSelectionPolicy = selectRandom
// conflictResolutionPolicy is the policy used to resolve conflicts.
conflictResolutionPolicy = useTime
errNoUsefulPeer = errors.New("no useful peer to contact")
)
// syncInitiator contains the metadata and state for the initiator thread.
type syncInitiator struct {
syncd *syncd
// State accumulated during an initiation round.
iState *initiationState
}
// initiationState accumulated during an initiation round.
type initiationState struct {
// Veyron name of peer being synced with.
peer string
// Local generation vector.
local map[ObjId]GenVector
// SyncGroups being requested in the initiation round.
syncGroups map[ObjId]GroupIdSet
// Map to track new generations received in the RPC reply.
newGens map[string]*genMetadata
// Array to track order of arrival for the generations.
// We need to preserve this order for the replay.
orderGens []string
// Generation vector to track the oldest generation received
// in the RPC reply per device, for garbage collection.
minGens map[ObjId]GenVector
// Generation vector from the remote peer.
remote map[ObjId]GenVector
// Tmp kvdb state.
tmpFile string
tmpDB *kvdb
tmpTbl *kvtable
// State to track updated objects during a log replay.
updObjects map[ObjId]*objConflictState
// State to delete objects from the "priv" table.
delPrivObjs map[ObjId]struct{}
}
// objConflictState contains the conflict state for objects that are
// updated during an initiator run.
type objConflictState struct {
isConflict bool
newHead Version
oldHead Version
ancestor Version
resolvVal *LogValue
srID ObjId
}
// newInitiator creates a new initiator instance attached to the given syncd instance.
func newInitiator(syncd *syncd, syncTick time.Duration) *syncInitiator {
i := &syncInitiator{syncd: syncd}
// Override the default peerSyncInterval value if syncTick is specified.
if syncTick > 0 {
peerSyncInterval = syncTick
}
vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id)
vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval)
return i
}
// contactPeers wakes up every peerSyncInterval to contact peers and get deltas from them.
func (i *syncInitiator) contactPeers() {
ticker := time.NewTicker(peerSyncInterval)
for {
select {
case <-i.syncd.closed:
ticker.Stop()
i.syncd.pending.Done()
return
case <-ticker.C:
}
peerRelName, err := i.pickPeer()
if err != nil {
continue
}
i.getDeltasFromPeer(peerRelName)
}
}
// pickPeer picks a sync endpoint to sync with.
func (i *syncInitiator) pickPeer() (string, error) {
myID := string(i.syncd.relName)
switch peerSelectionPolicy {
case selectRandom:
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.RLock()
// neighbors, err := i.syncd.sgtab.getMembers()
neighbors := make(map[string]uint32)
var err error
i.syncd.lock.RUnlock()
if err != nil {
return "", err
}
// Remove myself from the set so only neighbors are counted.
delete(neighbors, myID)
if len(neighbors) == 0 {
return "", errNoUsefulPeer
}
// Pick a neighbor at random.
ind := rand.Intn(len(neighbors))
for k := range neighbors {
if ind == 0 {
return k, nil
}
ind--
}
return "", fmt.Errorf("random selection didn't succeed")
default:
return "", fmt.Errorf("unknown peer selection policy")
}
}
// getDeltasFromPeer performs an initiation round to the specified
// peer. An initiation round consists of:
// * Creating local generation for syncroots that are going to be requested in this round.
// * Contacting the peer to receive all the deltas based on the local gen vector.
// * Processing those deltas to discover objects which have been updated.
// * Processing updated objects to detect and resolve any conflicts if needed.
// * Communicating relevant object updates to the store.
func (i *syncInitiator) getDeltasFromPeer(peerRelName string) {
vlog.VI(2).Infof("getDeltasFromPeer:: %s", peerRelName)
// Initialize initiation state.
i.newInitiationState()
defer i.clearInitiationState()
// Freeze the most recent batch of local changes
// before fetching remote changes from a peer.
//
// We only allow an initiator to create new local
// generations (not responders/watcher) in order to
// maintain a static baseline for the duration of a
// sync. This addresses the following race condition:
// If we allow responders to create new local
// generations while the initiator is in progress,
// they may beat the initiator and send these new
// generations to remote devices. These remote
// devices in turn can send these generations back to
// the initiator in progress which was started with
// older generation information.
err := i.updateLocalGeneration(peerRelName)
if err != nil {
vlog.Fatalf("getDeltasFromPeer:: error updating local generation: err %v", err)
}
// Obtain deltas from the peer over the network. These deltas
// are stored in a tmp kvdb.
if err := i.getDeltas(); err != nil {
vlog.Errorf("getDeltasFromPeer:: error getting deltas: err %v", err)
return
}
i.syncd.sgOp.Lock()
defer i.syncd.sgOp.Unlock()
if err := i.processDeltas(); err != nil {
vlog.Fatalf("getDeltasFromPeer:: error processing logs: err %v", err)
}
if err := i.processUpdatedObjects(); err != nil {
vlog.Fatalf("getDeltasFromPeer:: error processing objects: err %v", err)
}
}
// newInitiationState creates new initiation state.
func (i *syncInitiator) newInitiationState() {
st := &initiationState{}
st.local = make(map[ObjId]GenVector)
st.syncGroups = make(map[ObjId]GroupIdSet)
st.newGens = make(map[string]*genMetadata)
st.minGens = make(map[ObjId]GenVector)
st.remote = make(map[ObjId]GenVector)
st.updObjects = make(map[ObjId]*objConflictState)
st.delPrivObjs = make(map[ObjId]struct{})
i.iState = st
}
// clearInitiationState cleans up the state from the current initiation round.
func (i *syncInitiator) clearInitiationState() {
if i.iState.tmpDB != nil {
i.iState.tmpDB.close()
}
if i.iState.tmpFile != "" {
os.Remove(i.iState.tmpFile)
}
for o := range i.iState.delPrivObjs {
i.syncd.dag.delPrivNode(o)
}
i.syncd.dag.clearGraft()
i.iState = nil
}
// updateLocalGeneration creates a new local generation if needed and
// populates the newest local generation vector.
func (i *syncInitiator) updateLocalGeneration(peerRelName string) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
// peerInfo, err := i.syncd.sgtab.getMemberInfo(peerRelName)
// if err != nil {
// return err
// }
// Re-construct all mount table possibilities.
mtTables := make(map[string]struct{})
// for sg := range peerInfo.gids {
// sgData, err := i.syncd.sgtab.getSyncGroupByID(sg)
// if err != nil {
// return err
// }
// sr := ObjId(sgData.SrvInfo.RootObjId)
// for _, mt := range sgData.SrvInfo.Config.MountTables {
// mtTables[mt] = struct{}{}
// }
// i.iState.syncGroups[sr] = append(i.iState.syncGroups[sr], sg)
// }
// Create a new local generation if there are any local updates
// for every syncroot that is common with the "peer" to be
// contacted.
for sr := range i.iState.syncGroups {
gen, err := i.syncd.log.createLocalGeneration(sr)
if err != nil && err != errNoUpdates {
return err
}
if err == nil {
vlog.VI(2).Infof("updateLocalGeneration:: created gen for sr %s at %d", sr.String(), gen)
// Update local generation vector in devTable.
if err = i.syncd.devtab.updateGeneration(i.syncd.id, sr, i.syncd.id, gen); err != nil {
return err
}
}
v, err := i.syncd.devtab.getGenVec(i.syncd.id, sr)
if err != nil {
return err
}
i.iState.local[sr] = v
}
// Check if the name is absolute, and if so, use the name as-is.
if naming.Rooted(peerRelName) {
i.iState.peer = peerRelName
return nil
}
// Pick any global name to contact the peer.
for mt := range mtTables {
i.iState.peer = naming.Join(mt, peerRelName)
vlog.VI(2).Infof("updateLocalGeneration:: contacting peer %s", i.iState.peer)
return nil
}
return fmt.Errorf("no mounttables found")
}
// getDeltas obtains the deltas from the peer and stores them in a tmp kvdb.
func (i *syncInitiator) getDeltas() error {
// As log records are streamed, they are saved
// in a tmp kvdb so that they can be processed in one batch.
if err := i.initTmpKVDB(); err != nil {
return err
}
ctx, _ := vtrace.WithNewTrace(i.syncd.ctx)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
vlog.VI(1).Infof("getDeltas:: From peer with DeviceId %s at %v", i.iState.peer, time.Now().UTC())
// Construct a new stub that binds to peer endpoint.
c := SyncClient(naming.JoinAddressName(i.iState.peer, SyncSuffix))
vlog.VI(1).Infof("GetDeltasFromPeer:: Sending local information: %v", i.iState.local)
// Issue a GetDeltas() rpc.
stream, err := c.GetDeltas(ctx, i.iState.local, i.iState.syncGroups, i.syncd.id)
if err != nil {
return err
}
return i.recvLogStream(stream)
}
// initTmpKVDB initializes the tmp kvdb to store the log record stream.
func (i *syncInitiator) initTmpKVDB() error {
i.iState.tmpFile = fmt.Sprintf("%s/tmp_%d", i.syncd.kvdbPath, time.Now().UnixNano())
tmpDB, tbls, err := kvdbOpen(i.iState.tmpFile, []string{"tmprec"})
if err != nil {
return err
}
i.iState.tmpDB = tmpDB
i.iState.tmpTbl = tbls[0]
return nil
}
// recvLogStream receives the log records from the GetDeltas stream
// and puts them in tmp kvdb for later processing.
func (i *syncInitiator) recvLogStream(stream SyncGetDeltasClientCall) error {
rStream := stream.RecvStream()
for rStream.Advance() {
rec := rStream.Value()
// Insert log record in tmpTbl.
if err := i.iState.tmpTbl.set(logRecKey(rec.DevId, rec.SyncRootId, rec.GenNum, rec.SeqNum), &rec); err != nil {
// TODO(hpucha): do we need to cancel stream?
return err
}
// Populate the generation metadata.
genKey := generationKey(rec.DevId, rec.SyncRootId, rec.GenNum)
if gen, ok := i.iState.newGens[genKey]; !ok {
// New generation in the stream.
i.iState.orderGens = append(i.iState.orderGens, genKey)
i.iState.newGens[genKey] = &genMetadata{
Count: 1,
MaxSeqNum: rec.SeqNum,
}
if _, ok := i.iState.minGens[rec.SyncRootId]; !ok {
i.iState.minGens[rec.SyncRootId] = GenVector{}
}
g, ok := i.iState.minGens[rec.SyncRootId][rec.DevId]
if !ok || g > rec.GenNum {
i.iState.minGens[rec.SyncRootId][rec.DevId] = rec.GenNum
}
} else {
gen.Count++
if rec.SeqNum > gen.MaxSeqNum {
gen.MaxSeqNum = rec.SeqNum
}
}
}
if err := rStream.Err(); err != nil {
return err
}
var err error
if i.iState.remote, err = stream.Finish(); err != nil {
return err
}
vlog.VI(1).Infof("recvLogStream:: Local vector %v", i.iState.local)
vlog.VI(1).Infof("recvLogStream:: Remote vector %v", i.iState.remote)
vlog.VI(2).Infof("recvLogStream:: orderGens %v", i.iState.orderGens)
return nil
}
// processDeltas replays an entire log stream spanning multiple
// generations from different devices received from a single GetDeltas
// call. It does not perform any conflict resolution during replay.
// This avoids resolving conflicts that have already been resolved by
// other devices.
func (i *syncInitiator) processDeltas() error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
// Track received transactions.
txMap := make(map[TxId]uint32)
// Loop through each received generation in order.
for _, key := range i.iState.orderGens {
gen := i.iState.newGens[key]
dev, sr, gnum, err := splitGenerationKey(key)
if err != nil {
return err
}
// If "sr" has been left since getDeltas, skip processing.
// if !i.syncd.sgtab.isSyncRoot(sr) {
// continue
// }
for l := SeqNum(0); l <= gen.MaxSeqNum; l++ {
var rec LogRec
if err := i.iState.tmpTbl.get(logRecKey(dev, sr, gnum, l), &rec); err != nil {
return err
}
// Begin a new transaction if needed.
curTx := rec.Value.TxId
if curTx != NoTxId {
if cnt, ok := txMap[curTx]; !ok {
if i.syncd.dag.addNodeTxStart(curTx) != curTx {
return fmt.Errorf("failed to create transaction")
}
txMap[curTx] = rec.Value.TxCount
vlog.VI(2).Infof("processDeltas:: Begin Tx %v, count %d", curTx, rec.Value.TxCount)
} else if cnt != rec.Value.TxCount {
return fmt.Errorf("inconsistent counts for tid %v %d, %d", curTx, cnt, rec.Value.TxCount)
}
}
if err := i.insertRecInLogAndDag(&rec, curTx); err != nil {
return err
}
// Mark object dirty.
i.iState.updObjects[rec.ObjId] = &objConflictState{
srID: rec.SyncRootId,
}
}
// Insert the generation metadata.
if err := i.syncd.log.createRemoteGeneration(dev, sr, gnum, gen); err != nil {
return err
}
}
// End the started transactions if any.
for t, cnt := range txMap {
if err := i.syncd.dag.addNodeTxEnd(t, cnt); err != nil {
return err
}
vlog.VI(2).Infof("processLogStream:: End Tx %v %v", t, cnt)
}
return nil
}
// insertRecInLogAndDag adds a new log record to log and dag data structures.
func (i *syncInitiator) insertRecInLogAndDag(rec *LogRec, txID TxId) error {
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v, Tx %v", rec, txID)
switch rec.RecType {
case NodeRec:
return i.syncd.dag.addNode(rec.ObjId, rec.CurVers, true, rec.Value.Delete, rec.Parents, logKey, txID)
case LinkRec:
return i.syncd.dag.addParent(rec.ObjId, rec.CurVers, rec.Parents[0], true)
default:
return fmt.Errorf("unknown log record type")
}
}
// processUpdatedObjects processes all the updates received by the
// initiator, one object at a time. For each updated object, we first
// check if the object has any conflicts, resulting in three
// possibilities:
//
// * There is no conflict, and no updates are needed to the store
// (isConflict=false, newHead == oldHead). All changes received convey
// information that still keeps the local head as the most recent
// version. This occurs when conflicts are resolved by blessings.
//
// * There is no conflict, but a remote version is discovered that
// builds on the local head (isConflict=false, newHead != oldHead). In
// this case, we generate a store mutation to simply update the store
// to the latest value.
//
// * There is a conflict and we call into the app or the system to
// resolve the conflict, resulting in three possibilties: (a) conflict
// was resolved by blessing the local version. In this case, store
// need not be updated, but a link is added to record the
// blessing. (b) conflict was resolved by blessing the remote
// version. In this case, store is updated with the remote version and
// a link is added as well. (c) conflict was resolved by generating a
// new store mutation. In this case, store is updated with the new
// version.
//
// We then put all these mutations in the store. If the put succeeds,
// we update the log and dag state suitably (move the head ptr of the
// object in the dag to the latest version, and create a new log
// record reflecting conflict resolution if any). Puts to store can
// fail since preconditions on the objects may have been violated. In
// this case, we wait to get the latest versions of objects from the
// store, and recheck if the object has any conflicts and repeat the
// above steps, until put to store succeeds.
func (i *syncInitiator) processUpdatedObjects() error {
for {
if err := i.detectConflicts(); err != nil {
return err
}
if err := i.resolveConflicts(); err != nil {
return err
}
err := i.updateStoreAndSync()
if err == nil {
break
}
vlog.Errorf("PutMutations failed %v. Will retry", err)
// TODO(hpucha): Sleeping and retrying is a temporary
// solution. Next iteration will have coordination
// with watch thread to intelligently retry. Hence
// this value is not a config param.
time.Sleep(1 * time.Second)
}
return nil
}
// detectConflicts iterates through all the updated objects to detect
// conflicts.
func (i *syncInitiator) detectConflicts() error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.RLock()
defer i.syncd.lock.RUnlock()
for obj, st := range i.iState.updObjects {
// Check if object has a conflict.
var err error
st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
vlog.VI(2).Infof("detectConflicts:: object %v state %v err %v",
obj, st, err)
if err != nil {
return err
}
}
return nil
}
// resolveConflicts resolves conflicts for updated objects. Conflicts
// may be resolved by adding new versions or blessing either the local
// or the remote version.
func (i *syncInitiator) resolveConflicts() error {
for obj, st := range i.iState.updObjects {
if !st.isConflict {
continue
}
res, err := i.resolveObjConflict(obj, st.oldHead, st.newHead, st.ancestor)
if err != nil {
return err
}
st.resolvVal = res
}
return nil
}
// resolveObjConflict resolves a conflict for an object given its ID and
// the 3 versions that express the conflict: the object's local version, its
// remote version (from the device contacted), and the common ancestor from
// which both versions branched away. The function returns the new object
// value according to the conflict resolution policy.
func (i *syncInitiator) resolveObjConflict(oid ObjId,
local, remote, ancestor Version) (*LogValue, error) {
// Fetch the log records of the 3 object versions.
versions := []Version{local, remote, ancestor}
lrecs, err := i.getLogRecsBatch(oid, versions)
if err != nil {
return nil, err
}
// Resolve the conflict according to the resolution policy.
var res *LogValue
switch conflictResolutionPolicy {
case useTime:
res, err = i.resolveObjConflictByTime(oid, lrecs[0], lrecs[1], lrecs[2])
default:
err = fmt.Errorf("unknown conflict resolution policy: %v", conflictResolutionPolicy)
}
if err != nil {
return nil, err
}
resCopy := *res
// resCopy.Mutation.Version = NewVersion()
// resCopy.Mutation.Dir = resDir
resCopy.SyncTime = time.Now().UnixNano()
resCopy.TxId = NoTxId
resCopy.TxCount = 0
return &resCopy, nil
}
// resolveObjConflictByTime resolves conflicts using the timestamps
// of the conflicting mutations. It picks a mutation with the larger
// timestamp, i.e. the most recent update. If the timestamps are equal,
// it uses the mutation version numbers as a tie-breaker, picking the
// mutation with the larger version.
// Instead of creating a new version that resolves the conflict, we are
// blessing an existing version as the conflict resolution.
func (i *syncInitiator) resolveObjConflictByTime(oid ObjId,
local, remote, ancestor *LogRec) (*LogValue, error) {
var res *LogValue
switch {
case local.Value.SyncTime > remote.Value.SyncTime:
res = &local.Value
case local.Value.SyncTime < remote.Value.SyncTime:
res = &remote.Value
// case local.Value.Mutation.Version > remote.Value.Mutation.Version:
// res = &local.Value
// case local.Value.Mutation.Version < remote.Value.Mutation.Version:
// res = &remote.Value
}
return res, nil
}
// getLogRecsBatch gets the log records for an array of versions.
func (i *syncInitiator) getLogRecsBatch(obj ObjId, versions []Version) ([]*LogRec, error) {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.RLock()
defer i.syncd.lock.RUnlock()
lrecs := make([]*LogRec, len(versions))
var err error
for p, v := range versions {
lrecs[p], err = i.getLogRec(obj, v)
if err != nil {
return nil, err
}
}
return lrecs, nil
}
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
func (i *syncInitiator) updateStoreAndSync() error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
// var m []raw.Mutation
// for obj, st := range i.iState.updObjects {
// if !st.isConflict {
// rec, err := i.getLogRec(obj, st.newHead)
// if err != nil {
// return err
// }
// st.resolvVal = &rec.Value
// // Sanity check.
// if st.resolvVal.Mutation.Version != st.newHead {
// return fmt.Errorf("bad mutation %d %d",
// st.resolvVal.Mutation.Version, st.newHead)
// }
// }
// // If the local version is picked, no further updates
// // to the store are needed. If the remote version is
// // picked, we put it in the store.
// if st.resolvVal.Mutation.Version != st.oldHead {
// st.resolvVal.Mutation.PriorVersion = st.oldHead
// // Convert resolvVal.Mutation into a mutation for the Store.
// stMutation, err := i.storeMutation(obj, st.resolvVal)
// if err != nil {
// return err
// }
// vlog.VI(2).Infof("updateStoreAndSync:: Try to append mutation %v (%v) for obj %v (nh %v, oh %v)",
// st.resolvVal.Mutation, stMutation, obj, st.newHead, st.oldHead)
// // Append to mutations, skipping a delete following a delete mutation.
// if stMutation.Version != NoVersion ||
// stMutation.PriorVersion != NoVersion {
// vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
// stMutation, obj)
// m = append(m, stMutation)
// }
// }
// }
// TODO(hpucha): We will hold the lock across PutMutations rpc
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
// if store := i.syncd.store; store != nil && len(m) > 0 {
// ctx, _ := vtrace.SetNewTrace(i.syncd.ctx)
// ctx, cancel := context.WithTimeout(ctx, time.Minute)
// defer cancel()
// stream, err := store.PutMutations(ctx)
// if err != nil {
// vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
// return err
// }
// sender := stream.SendStream()
// for i := range m {
// if err := sender.Send(m[i]); err != nil {
// vlog.Errorf("updateStoreAndSync:: send err %v", err)
// return err
// }
// }
// if err := sender.Close(); err != nil {
// vlog.Errorf("updateStoreAndSync:: closesend err %v", err)
// return err
// }
// if err := stream.Finish(); err != nil {
// vlog.Errorf("updateStoreAndSync:: finish err %v", err)
// return err
// }
// }
vlog.VI(2).Infof("updateStoreAndSync:: putmutations succeeded")
if err := i.updateLogAndDag(); err != nil {
return err
}
if err := i.updateGenVecs(); err != nil {
return err
}
return nil
}
// storeMutation converts a resolved mutation generated by syncd to
// one that can be sent to the store. To send to the store, it
// converts the version numbers corresponding to object deletions to
// NoVersion when required. It also converts the version number
// appropriately to handle SyncGroup join.
// func (i *syncInitiator) storeMutation(obj ObjId, resolvVal *LogValue) (raw.Mutation, error) {
// curDelete := resolvVal.Delete
// priorDelete := false
// if resolvVal.Mutation.PriorVersion != raw.NoVersion {
// oldRec, err := i.getLogRec(obj, resolvVal.Mutation.PriorVersion)
// if err != nil {
// return raw.Mutation{}, err
// }
// priorDelete = oldRec.Value.Delete
// }
// // Handle the versioning of a SyncGroup's root ObjId during join.
// if resolvVal.Mutation.PriorVersion == raw.NoVersion {
// if i.syncd.sgtab.isSyncRoot(obj) {
// node, err := i.syncd.dag.getPrivNode(obj)
// if err != nil {
// return raw.Mutation{}, err
// }
// resolvVal.Mutation.PriorVersion = node.Mutation.Version
// i.iState.delPrivObjs[obj] = struct{}{}
// }
// }
// // Current version and prior versions are not deletes.
// if !curDelete && !priorDelete {
// return resolvVal.Mutation, nil
// }
// // Creating a new copy of the mutation to adjust version
// // numbers when handling deletions.
// stMutation := resolvVal.Mutation
// // Adjust the current version if this a deletion.
// if curDelete {
// stMutation.Version = NoVersion
// }
// // Adjust the prior version if it is a deletion.
// if priorDelete {
// stMutation.PriorVersion = NoVersion
// }
// return stMutation, nil
// }
// getLogRec returns the log record corresponding to a given object and its version.
func (i *syncInitiator) getLogRec(obj ObjId, vers Version) (*LogRec, error) {
logKey, err := i.syncd.dag.getLogrec(obj, vers)
vlog.VI(2).Infof("getLogRec:: logkey from dag is %s", logKey)
if err != nil {
return nil, err
}
dev, sg, gen, lsn, err := splitLogRecKey(logKey)
if err != nil {
return nil, err
}
vlog.VI(2).Infof("getLogRec:: splitting logkey %s to %s %v %v %v", logKey, dev, sg, gen, lsn)
rec, err := i.syncd.log.getLogRec(dev, sg, gen, lsn)
if err != nil {
return nil, err
}
return rec, nil
}
// updateLogAndDag updates the log and dag data structures on a successful store put.
func (i *syncInitiator) updateLogAndDag() error {
for obj, st := range i.iState.updObjects {
if st.isConflict {
// Object had a conflict, which was resolved successfully.
// Put is successful, create a log record.
var err error
var rec *LogRec
// switch {
// case st.resolvVal.Mutation.Version == st.oldHead:
// // Local version was blessed as the conflict resolution.
// rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.oldHead, st.newHead, st.srID)
// case st.resolvVal.Mutation.Version == st.newHead:
// // Remote version was blessed as the conflict resolution.
// rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.newHead, st.oldHead, st.srID)
// default:
// // New version was created to resolve the conflict.
// parents := []Version{st.newHead, st.oldHead}
// rec, err = i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal, st.srID)
// }
if err != nil {
return err
}
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
// Add a new DAG node.
switch rec.RecType {
case NodeRec:
// TODO(hpucha): addNode operations arising out of conflict resolution
// may need to be part of a transaction when app-driven resolution
// is introduced.
err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Value.Delete, rec.Parents, logKey, NoTxId)
case LinkRec:
err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
default:
return fmt.Errorf("unknown log record type")
}
if err != nil {
return err
}
}
// Move the head. This should be idempotent. We may
// move head to the local head in some cases.
// if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
// return err
// }
}
return nil
}
// updateGenVecs updates local, reclaim and remote vectors at the end of an initiator cycle.
func (i *syncInitiator) updateGenVecs() error {
// Update the local gen vector and put it in kvdb only if we have new updates.
if len(i.iState.updObjects) > 0 {
// remote can be a subset of local.
for sr, rVec := range i.iState.remote {
lVec := i.iState.local[sr]
if err := i.syncd.devtab.updateLocalGenVector(lVec, rVec); err != nil {
return err
}
if err := i.syncd.devtab.putGenVec(i.syncd.id, sr, lVec); err != nil {
return err
}
// if err := i.syncd.devtab.updateReclaimVec(minGens); err != nil {
// return err
// }
}
}
for sr, rVec := range i.iState.remote {
// Cache the remote generation vector for space reclamation.
if err := i.syncd.devtab.putGenVec(i.syncd.nameToDevId(i.iState.peer), sr, rVec); err != nil {
return err
}
}
return nil
}