blob: 39d63f183adf486e1e623199abf64ee1358d1055 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"v.io/v23/context"
"v.io/v23/verror"
"v.io/x/lib/vlog"
)
// Policies for conflict resolution.
// TODO(hpucha): Move relevant parts to client-facing vdl.
const (
// Resolves conflicts by picking the update with the most recent timestamp.
useTime = iota
// TODO(hpucha): implement other policies.
// Resolves conflicts by using the app conflict resolver callbacks via store.
useCallback
)
var (
// conflictResolutionPolicy is the policy used to resolve conflicts.
conflictResolutionPolicy = useTime
)
// resolutionType represents how a conflict is resolved.
type resolutionType byte
const (
pickLocal resolutionType = iota // local update was chosen as the resolution.
pickRemote // remote update was chosen as the resolution.
createNew // new update was created as the resolution.
)
// conflictResolution represents the state of a conflict resolution.
type conflictResolution struct {
ty resolutionType
rec *localLogRec // Valid only if ty == createNew.
val []byte // Valid only if ty == createNew.
}
// resolveConflicts resolves conflicts for updated objects. Conflicts may be
// resolved by adding new versions or picking either the local or the remote
// version.
func (iSt *initiationState) resolveConflicts(ctx *context.T) error {
for obj, st := range iSt.updObjects {
if !st.isConflict {
continue
}
// TODO(hpucha): Look up policy from the schema. Currently,
// hardcoded to time.
var err error
st.res, err = iSt.resolveObjConflict(ctx, obj, st.oldHead, st.newHead, st.ancestor)
if err != nil {
return err
}
}
return nil
}
// resolveObjConflict resolves a conflict for an object given its ID and the 3
// versions that express the conflict: the object's local version, its remote
// version (from the device contacted), and the closest common ancestor (see
// dag.go on how the ancestor is chosen). The function returns the new object
// value according to the conflict resolution policy.
func (iSt *initiationState) resolveObjConflict(ctx *context.T, oid, local, remote, ancestor string) (*conflictResolution, error) {
// Fetch the log records of the 3 object versions.
versions := []string{local, remote, ancestor}
lrecs, err := iSt.getLogRecsBatch(ctx, oid, versions)
if err != nil {
return nil, err
}
// Resolve the conflict according to the resolution policy.
switch conflictResolutionPolicy {
case useTime:
return iSt.resolveObjConflictByTime(ctx, oid, lrecs[0], lrecs[1], lrecs[2])
default:
return nil, verror.New(verror.ErrInternal, ctx, "unknown conflict resolution policy", conflictResolutionPolicy)
}
}
// resolveObjConflictByTime resolves conflicts using the timestamps of the
// conflicting mutations. It picks a mutation with the larger timestamp,
// i.e. the most recent update. If the timestamps are equal, it uses the
// mutation version numbers as a tie-breaker, picking the mutation with the
// larger version. Instead of creating a new version that resolves the
// conflict, we pick an existing version as the conflict resolution.
func (iSt *initiationState) resolveObjConflictByTime(ctx *context.T, oid string, local, remote, ancestor *localLogRec) (*conflictResolution, error) {
var res conflictResolution
switch {
case local.Metadata.UpdTime.After(remote.Metadata.UpdTime):
res.ty = pickLocal
case local.Metadata.UpdTime.Before(remote.Metadata.UpdTime):
res.ty = pickRemote
case local.Metadata.CurVers > remote.Metadata.CurVers:
res.ty = pickLocal
case local.Metadata.CurVers < remote.Metadata.CurVers:
res.ty = pickRemote
default:
vlog.Fatalf("sync: resolveObjConflictByTime: local and remote update times and versions are the same, local %v remote %v", local, remote)
}
return &res, nil
}
// getLogRecsBatch gets the log records for an array of versions for a given object.
func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
lrecs := make([]*localLogRec, len(versions))
for p, v := range versions {
logKey, err := getLogRecKey(ctx, iSt.tx, obj, v)
if err != nil {
return nil, err
}
dev, gen, err := splitLogRecKey(ctx, logKey)
if err != nil {
return nil, err
}
lrecs[p], err = getLogRec(ctx, iSt.tx, dev, gen)
if err != nil {
return nil, err
}
}
return lrecs, nil
}