blob: a9a30f40790af12c52852442ab36f63b6c7f947b [file] [log] [blame]
package vsync
import (
"errors"
"fmt"
"io"
"math/rand"
"strings"
"time"
"veyron/services/store/raw"
"veyron2/naming"
"veyron2/rt"
"veyron2/storage"
"veyron2/vlog"
)
// Policies to pick a peer to sync with.
const (
// Picks a peer at random from the available set.
selectRandom = iota
// TODO(hpucha): implement other policies.
// Picks a peer with most differing generations.
selectMostDiff
// Picks a peer that was synced with the furthest in the past.
selectOldest
)
// Policies for conflict resolution.
const (
// Resolves conflicts by picking the mutation with the most recent timestamp.
useTime = iota
// TODO(hpucha): implement other policies.
// Resolves conflicts by using the app conflict resolver callbacks via store.
useCallback
)
var (
// peerSyncInterval is the duration between two consecutive
// sync events. In every sync event, the initiator contacts
// one of its peers to obtain any pending updates.
peerSyncInterval = 100 * time.Millisecond
// peerSelectionPolicy is the policy used to select a peer when
// the initiator gets a chance to sync.
peerSelectionPolicy = selectRandom
// conflictResolutionPolicy is the policy used to resolve conflicts.
conflictResolutionPolicy = useTime
errNoUsefulPeer = errors.New("no useful peer to contact")
)
// syncInitiator contains the metadata and state for the initiator thread.
type syncInitiator struct {
syncd *syncd
// State to contact peers periodically and get deltas.
// TODO(hpucha): This is an initial version with command line arguments.
// Next steps are to tie this up into mount table and auto-discover neighbors.
neighbors []string
neighborIDs []string
updObjects map[storage.ID]*objConflictState
}
// objConflictState contains the conflict state for objects that are
// updated during an initiator run.
type objConflictState struct {
isConflict bool
newHead storage.Version
oldHead storage.Version
ancestor storage.Version
resolvVal *LogValue
}
// newInitiator creates a new initiator instance attached to the given syncd instance.
func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string, syncTick time.Duration) *syncInitiator {
i := &syncInitiator{syncd: syncd,
updObjects: make(map[storage.ID]*objConflictState),
}
// Bootstrap my peer list.
if peerEndpoints != "" {
i.neighbors = strings.Split(peerEndpoints, ",")
i.neighborIDs = strings.Split(peerDeviceIDs, ",")
}
if len(i.neighbors) != len(i.neighborIDs) {
vlog.Fatalf("newInitiator: Mismatch between number of endpoints and IDs")
}
// Override the default peerSyncInterval value if syncTick is specified.
if syncTick > 0 {
peerSyncInterval = syncTick
}
vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id)
vlog.VI(1).Infof("newInitiator: Peer endpoints: %v", i.neighbors)
vlog.VI(1).Infof("newInitiator: Peer IDs: %v", i.neighborIDs)
vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval)
return i
}
// contactPeers wakes up every peerSyncInterval to contact peers and get deltas from them.
func (i *syncInitiator) contactPeers() {
ticker := time.NewTicker(peerSyncInterval)
for {
select {
case <-i.syncd.closed:
ticker.Stop()
i.syncd.pending.Done()
return
case <-ticker.C:
}
id, ep, err := i.pickPeer()
if err != nil {
continue
}
// Freeze the most recent batch of local changes
// before fetching remote changes from a peer.
//
// We only allow an initiator to create new local
// generations (not responders/watcher) in order to
// maintain a static baseline for the duration of a
// sync. This addresses the following race condition:
// If we allow responders to create new local
// generations while the initiator is in progress,
// they may beat the initiator and send these new
// generations to remote devices. These remote
// devices in turn can send these generations back to
// the initiator in progress which was started with
// older generation information.
local, err := i.updateLocalGeneration()
if err != nil {
vlog.Fatalf("contactPeers:: error updating local generation: err %v", err)
}
i.getDeltasFromPeer(id, ep, local)
}
}
// pickPeer picks a sync endpoint in the neighborhood to sync with.
func (i *syncInitiator) pickPeer() (string, string, error) {
switch peerSelectionPolicy {
case selectRandom:
// Pick a neighbor at random.
if i.neighbors == nil {
return "", "", errNoUsefulPeer
}
ind := rand.Intn(len(i.neighbors))
return i.neighborIDs[ind], i.neighbors[ind], nil
default:
return "", "", fmt.Errorf("unknown peer selection policy")
}
}
// updateLocalGeneration creates a new local generation if needed and
// returns the newest local generation vector.
func (i *syncInitiator) updateLocalGeneration() (GenVector, error) {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
// Create a new local generation if there are any local updates.
gen, err := i.syncd.log.createLocalGeneration()
if err == errNoUpdates {
vlog.VI(1).Infof("createLocalGeneration:: No new updates. Local at %d", gen)
return i.syncd.devtab.getGenVec(i.syncd.id)
}
if err != nil {
return GenVector{}, err
}
// Update local generation vector in devTable.
if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
return GenVector{}, err
}
return i.syncd.devtab.getGenVec(i.syncd.id)
}
// getDeltasFromPeer contacts the specified endpoint to obtain deltas wrt its current generation vector.
func (i *syncInitiator) getDeltasFromPeer(dID, ep string, local GenVector) {
vlog.VI(1).Infof("GetDeltasFromPeer:: From server %s with DeviceID %s at %v", ep, dID, time.Now().UTC())
// Construct a new stub that binds to peer endpoint.
c, err := BindSync(naming.JoinAddressName(ep, "sync"))
if err != nil {
vlog.Errorf("GetDeltasFromPeer:: error binding to server: err %v", err)
return
}
vlog.VI(1).Infof("GetDeltasFromPeer:: Sending local information: %v", local)
// Issue a GetDeltas() rpc.
stream, err := c.GetDeltas(rt.R().TODOContext(), local, i.syncd.id)
if err != nil {
vlog.Errorf("GetDeltasFromPeer:: error getting deltas: err %v", err)
return
}
minGens, err := i.processLogStream(stream)
if err != nil {
vlog.Fatalf("GetDeltasFromPeer:: error processing logs: err %v", err)
}
remote, err := stream.Finish()
if err != nil {
vlog.Fatalf("GetDeltasFromPeer:: finish failed with err %v", err)
}
if err := i.processUpdatedObjects(local, minGens, remote, DeviceID(dID)); err != nil {
vlog.Fatalf("GetDeltasFromPeer:: error processing objects: err %v", err)
}
vlog.VI(1).Infof("GetDeltasFromPeer:: Local vector %v", local)
vlog.VI(1).Infof("GetDeltasFromPeer:: Remote vector %v", remote)
}
// processLogStream replays an entire log stream spanning multiple
// generations from different devices received from a single GetDeltas
// call. It does not perform any conflict resolution during replay.
// This avoids resolving conflicts that have already been resolved by
// other devices.
func (i *syncInitiator) processLogStream(stream SyncGetDeltasStream) (GenVector, error) {
// Map to track new generations received in the RPC reply.
// TODO(hpucha): If needed, this can be optimized under the
// assumption that an entire generation is received
// sequentially. We can then parse a generation at a time.
newGens := make(map[string]*genMetadata)
// Array to track order of arrival for the generations.
// We need to preserve this order.
var orderGens []string
// Compute the minimum generation for every device in this set.
minGens := GenVector{}
for {
rec, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return GenVector{}, err
}
if err := i.insertRecInLogAndDag(&rec); err != nil {
return GenVector{}, err
}
// Mark object dirty.
i.updObjects[rec.ObjID] = &objConflictState{}
// Populate the generation metadata.
genKey := generationKey(rec.DevID, rec.GNum)
if gen, ok := newGens[genKey]; !ok {
// New generation in the stream.
orderGens = append(orderGens, genKey)
newGens[genKey] = &genMetadata{
Count: 1,
MaxLSN: rec.LSN,
}
g, ok := minGens[rec.DevID]
if !ok || g > rec.GNum {
minGens[rec.DevID] = rec.GNum
}
} else {
gen.Count++
if rec.LSN > gen.MaxLSN {
gen.MaxLSN = rec.LSN
}
}
}
if err := i.createGenMetadataBatch(newGens, orderGens); err != nil {
return GenVector{}, err
}
return minGens, nil
}
// insertLogAndDag adds a new log record to log and dag data structures.
func (i *syncInitiator) insertRecInLogAndDag(rec *LogRec) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
return err
}
return nil
}
// createGenMetadataBatch inserts a batch of generations into the log.
func (i *syncInitiator) createGenMetadataBatch(newGens map[string]*genMetadata, orderGens []string) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
for _, key := range orderGens {
gen := newGens[key]
// Insert the generation metadata.
dev, gnum, err := splitGenerationKey(key)
if err != nil {
return err
}
if err := i.syncd.log.createRemoteGeneration(dev, gnum, gen); err != nil {
return err
}
}
return nil
}
// processUpdatedObjects processes all the updates received by the
// initiator, one object at a time. For each updated object, we first
// check if the object has any conflicts. If there is a conflict, we
// resolve the conflict and generate a new store mutation reflecting
// the conflict resolution. If there is no conflict, we generate a
// store mutation to simply update the store to the latest value. We
// then put all these mutations in the store. If the put succeeds, we
// update the log and dag state suitably (move the head ptr of the
// object in the dag to the latest version, and create a new log
// record reflecting conflict resolution if any). Puts to store can
// fail since preconditions on the objects may have been violated. In
// this case, we wait to get the latest versions of objects from the
// store, and recheck if the object has any conflicts and repeat the
// above steps, until put to store succeeds.
func (i *syncInitiator) processUpdatedObjects(local, minGens, remote GenVector, dID DeviceID) error {
for {
if err := i.detectConflicts(); err != nil {
return err
}
m, err := i.resolveConflicts()
if err != nil {
return err
}
err = i.updateStoreAndSync(m, local, minGens, remote, dID)
if err == nil {
break
}
vlog.Errorf("PutMutations failed %v. Will retry", err)
// TODO(hpucha): Sleeping and retrying is a temporary
// solution. Next iteration will have coordination
// with watch thread to intelligently retry. Hence
// this value is not a config param.
time.Sleep(10 * time.Second)
}
// Remove any pending state.
i.updObjects = make(map[storage.ID]*objConflictState)
i.syncd.dag.clearGraft()
return nil
}
// detectConflicts iterates through all the updated objects to detect
// conflicts.
func (i *syncInitiator) detectConflicts() error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.RLock()
defer i.syncd.lock.RUnlock()
for obj, st := range i.updObjects {
// Check if object has a conflict.
var err error
st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
if err != nil {
return err
}
if !st.isConflict {
rec, err := i.getLogRec(obj, st.newHead)
if err != nil {
return err
}
st.resolvVal = &rec.Value
// Sanity check.
if st.resolvVal.Mutation.Version != st.newHead {
return fmt.Errorf("bad mutation %d %d",
st.resolvVal.Mutation.Version, st.newHead)
}
}
}
return nil
}
// getLogRec returns the log record corresponding to a given object and its version.
func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
logKey, err := i.syncd.dag.getLogrec(obj, vers)
if err != nil {
return nil, err
}
dev, gen, lsn, err := splitLogRecKey(logKey)
if err != nil {
return nil, err
}
rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
if err != nil {
return nil, err
}
return rec, nil
}
// resolveConflicts resolves conflicts for updated objects.
func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
switch conflictResolutionPolicy {
case useTime:
if err := i.resolveConflictsByTime(); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown conflict resolution policy")
}
var m []raw.Mutation
for _, st := range i.updObjects {
// Append to mutations.
st.resolvVal.Mutation.PriorVersion = st.oldHead
m = append(m, st.resolvVal.Mutation)
}
return m, nil
}
// resolveConflictsByTime resolves conflicts using the timestamps
// of the conflicting mutations. It picks a mutation with the larger
// timestamp, i.e. the most recent update. If the timestamps are equal,
// it uses the mutation version numbers as a tie-breaker, picking the
// mutation with the larger version.
//
// TODO(hpucha): Based on a few more policies, reconsider nesting
// order of the conflict resolution loop and switch-on-policy.
func (i *syncInitiator) resolveConflictsByTime() error {
for obj, st := range i.updObjects {
if !st.isConflict {
continue
}
versions := make([]storage.Version, 3)
versions[0] = st.oldHead
versions[1] = st.newHead
versions[2] = st.ancestor
lrecs, err := i.getLogRecsBatch(obj, versions)
if err != nil {
return err
}
res := 0
switch {
case lrecs[0].Value.SyncTime > lrecs[1].Value.SyncTime:
res = 0
case lrecs[0].Value.SyncTime < lrecs[1].Value.SyncTime:
res = 1
case lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version:
res = 0
case lrecs[0].Value.Mutation.Version < lrecs[1].Value.Mutation.Version:
res = 1
}
m := lrecs[res].Value.Mutation
m.Version = storage.NewVersion()
// TODO(hpucha): handle continue and delete flags.
st.resolvVal = &LogValue{Mutation: m}
}
return nil
}
// getLogRecsBatch gets the log records for an array of versions.
func (i *syncInitiator) getLogRecsBatch(obj storage.ID, versions []storage.Version) ([]*LogRec, error) {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.RLock()
defer i.syncd.lock.RUnlock()
lrecs := make([]*LogRec, len(versions))
var err error
for p, v := range versions {
lrecs[p], err = i.getLogRec(obj, v)
if err != nil {
return nil, err
}
}
return lrecs, nil
}
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
// TODO(hpucha): We will hold the lock across PutMutations rpc
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
if store := i.syncd.store; store != nil && len(m) > 0 {
stream, err := store.PutMutations(rt.R().TODOContext())
if err != nil {
vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
return err
}
for i := range m {
if err := stream.Send(m[i]); err != nil {
vlog.Errorf("updateStoreAndSync:: send err %v", err)
return err
}
}
if err := stream.CloseSend(); err != nil {
vlog.Errorf("updateStoreAndSync:: closesend err %v", err)
return err
}
if err := stream.Finish(); err != nil {
vlog.Errorf("updateStoreAndSync:: finish err %v", err)
return err
}
}
vlog.VI(2).Infof("updateStoreAndSync:: putmutations succeeded")
if err := i.updateLogAndDag(); err != nil {
return err
}
if err := i.updateGenVecs(local, minGens, remote, DeviceID(dID)); err != nil {
return err
}
return nil
}
// updateLogAndDag updates the log and dag data structures on a successful store put.
func (i *syncInitiator) updateLogAndDag() error {
for obj, st := range i.updObjects {
if st.isConflict {
// Object had a conflict, which was resolved successfully.
// Put is successful, create a log record.
parents := []storage.Version{st.newHead, st.oldHead}
rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
if err != nil {
return err
}
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
// Put is successful, add a new DAG node.
if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
return err
}
}
// Move the head.
if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
return err
}
}
return nil
}
// updateGenVecs updates local, reclaim and remote vectors at the end of an initiator cycle.
func (i *syncInitiator) updateGenVecs(local, minGens, remote GenVector, dID DeviceID) error {
// Update the local gen vector and put it in kvdb only if we have new updates.
if len(i.updObjects) > 0 {
if err := i.syncd.devtab.updateLocalGenVector(local, remote); err != nil {
return err
}
if err := i.syncd.devtab.putGenVec(i.syncd.id, local); err != nil {
return err
}
if err := i.syncd.devtab.updateReclaimVec(minGens); err != nil {
return err
}
}
// Cache the remote generation vector for space reclamation.
if err := i.syncd.devtab.putGenVec(dID, remote); err != nil {
return err
}
return nil
}