blob: c076754799317c8084f3fcb14acd76a6de62f584 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/store"
)
// crGroup is a closure of oids under conflict that got rubberbanded together
// due to conflicting batches that they belong to.
// NOTE: A closure can pull in an oid that was not under conflict originally.
type crGroup struct {
// batchSource maps batchids to the source from which the batch comes
// (Local/Remote).
batchSource map[uint64]wire.BatchSource
// batchesByOid maps oids to an array of batchIds to which the oids belong.
batchesByOid map[string][]uint64
}
// groupedCrData contains an array of crGroups and a map of oids that are
// present in at least one of the crGroups.
type groupedCrData struct {
groups []*crGroup
oids map[string]bool
}
// groupConflicts takes a map of oids to objConflictState objects and creates
// disjoint closures of oids. For example, consider input map containing oids
// a, b, c, d, e, f and g, with these objects belonging to batches as follows:
// {B1: a, b}, {B2: b, c}, {B3: d, e}, {B4: d, x, y}, f, g where x and y were
// not a part of the input conflict map but got pulled in due to B4. The result
// can be logically represented as: {a, b, c}, {d, e, x, y}, {f}, {g}.
func (iSt *initiationState) groupConflicts(ctx *context.T, conflictsForApp map[string]*objConflictState) *groupedCrData {
crData := &groupedCrData{}
crData.oids = map[string]bool{}
for oid := range conflictsForApp {
if crData.oids[oid] {
// The oid has already been added to a previous group.
continue
}
group := initCrGroup()
iSt.groupFor(ctx, group, oid)
crData.groups = append(crData.groups, group)
for k := range group.batchesByOid {
crData.oids[k] = true
}
}
return crData
}
// groupFor is a recursive method (in conjunction with processBatchId()) that
// takes an oid and creates a closure for the oid by following the local and
// remote heads for the oid as present in iSt.updObjects and recursively
// including the batches that these heads belong to.
// If the given oid is not dirty (i.e. a part of iSt.updObjects) then it was
// rubberbanded in. A new conflict object for this oid is added to
// iSt.updObjects to mark inclusion of the oid into the conflict closure but
// its local and remote head batches are not followed.
func (iSt *initiationState) groupFor(ctx *context.T, group *crGroup, oid string) {
if !iSt.isDirty(oid) {
// This means that this object was rubberbanded into the conflict by
// another object and we dont need to process its local & remote heads.
// NOTE: we never expect to see this object in the groupConflicts()
// loop without already having added it to crData.oids array which makes
// sure that we never try to create a closure over this object resulting
// in an empty group.
iSt.addNewConflictObj(ctx, oid)
return
}
// Handle dirty objects that are not under conflict. Since the object is not
// under conflict the local version will in reality be the ancestor of the
// remote version. Hence we dont need to include the batch related to this
// version in our closure. This makes sure that the closure is identical to
// what the other device would see if it pulled updates from this device
// instead.
localBatchId := NoBatchId
if iSt.updObjects[oid].isConflict {
localBatchId = getBatchId(ctx, iSt.tx, oid, iSt.getLocalVersion(oid))
}
remoteBatchId := getBatchId(ctx, iSt.tx, oid, iSt.getRemoteVersion(oid))
if (localBatchId == NoBatchId) && (remoteBatchId == NoBatchId) {
// This oid is not a part of any batch hence it was a root dirty node
// and will form a group of single oid.
group.batchesByOid[oid] = []uint64{}
return
}
// Process localBatchId.
iSt.processBatchId(ctx, group, localBatchId, wire.BatchSourceLocal)
// Process remoteBatchId.
iSt.processBatchId(ctx, group, remoteBatchId, wire.BatchSourceRemote)
}
// processBatchId takes a batch id and recursively adds a closure for each of
// the objects within this batch. If a batch has already been seen before then
// the method returns making it the termination condition for the recursion.
func (iSt *initiationState) processBatchId(ctx *context.T, group *crGroup, batchId uint64, source wire.BatchSource) {
_, seen := group.batchSource[batchId]
if (batchId == NoBatchId) || seen {
// Don't process batchId which has already been processed earlier.
return
}
elements := getBatchElements(ctx, iSt.tx, batchId)
for k := range elements {
group.batchesByOid[k] = append(group.batchesByOid[k], batchId)
}
group.batchSource[batchId] = source
// Warning: the batchId processed in this call of the method must be added
// to the batchSource map before a recursive call to groupFor() method is
// made to avoid infinite recursion.
for bOid := range elements {
iSt.groupFor(ctx, group, bOid)
}
}
func (iSt *initiationState) isDirty(oid string) bool {
state, ok := iSt.updObjects[oid]
return ok && !state.isAddedByCr
}
func (iSt *initiationState) getLocalVersion(oid string) string {
return iSt.updObjects[oid].oldHead
}
func (iSt *initiationState) getRemoteVersion(oid string) string {
return iSt.updObjects[oid].newHead
}
func (iSt *initiationState) addNewConflictObj(ctx *context.T, oid string) {
headVer, err := getHead(ctx, iSt.tx, oid)
if err != nil {
vlog.Fatalf("sync: ConflictClosure: error while fetching head for oid %v : %v", oid, err)
}
iSt.updObjects[oid] = &objConflictState{
isConflict: false,
isAddedByCr: true,
oldHead: headVer,
newHead: NoVersion, // TODO(jlodhia): figure out the remote version using gen vectors
ancestor: NoVersion,
}
}
func getBatchId(ctx *context.T, st store.StoreReader, oid, version string) uint64 {
if version == NoVersion {
return NoBatchId
}
dagNode, err := getNode(ctx, st, oid, version)
if err != nil {
vlog.Fatalf("sync: ConflictClosure: error while fetching dag node: %v", err)
}
return dagNode.BatchId
}
func getBatchElements(ctx *context.T, st store.StoreReader, batchId uint64) map[string]string {
bInfo, err := getBatch(ctx, st, batchId)
if err != nil {
vlog.Fatalf("sync: ConflictClosure: error while fetching batch info: %v", err)
}
// Add linked objects to the objects that belong to this batch.
for k, v := range bInfo.LinkedObjects {
bInfo.Objects[k] = v
}
return bInfo.Objects
}
func initCrGroup() *crGroup {
return &crGroup{
batchSource: map[uint64]wire.BatchSource{},
batchesByOid: map[string][]uint64{},
}
}