blob: 3d1fd9d3a3da8980d13d02addd5fe1c3c4c518e0 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"fmt"
"time"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/store/watchable"
)
// crSendStream is a named interface replicating the interface returned by
// wire.ConflictManagerStartConflictResolverServerStream.SendStream()
type crSendStream interface {
Send(item wire.ConflictInfo) error
}
// crRecvStream is a named interface replicating the interface returned by
// wire.ConflictManagerStartConflictResolverServerStream.RecvStream()
type crRecvStream interface {
Advance() bool
Value() wire.ResolutionInfo
Err() error
}
// resolveViaApp takes a groupedCrData object containing multiple disjoint
// closures of conflict batches and sends these closures for resolution.
// This method consists of the following steps:
// 1) Get CR connection stream object for this database. If none exists,
// resolution cannot proceed; return error.
// For each group,
// 2) Stream each batch info within the group to the app.
// 3) Stream each conflict info row within the group to the app with the last
// row containing continued=false.
// 4) Receive a batch of resolutions from app. This is a blocking step.
// 5) Verify if all conflicts sent within the group have a corresponding
// resolution.
// 6) Process each resolution info object by assigning an appropriate
// conflictResolution object to the Oid under conflict.
// TODO(jlodhia): Add a timeout for waiting on app's resolution.
func (iSt *initiationState) resolveViaApp(ctx *context.T, groupedConflicts *groupedCrData) error {
db := iSt.config.db
appConn := db.CrConnectionStream()
if appConn == nil {
// CR not possible now, delay conflict resolution
vlog.VI(2).Infof("sync: resolveViaApp: No ConflictResolution stream available for db. App based resolution cannot move forward.")
return interfaces.NewErrBrokenCrConnection(ctx)
}
sendStream := appConn.SendStream()
recvStream := appConn.RecvStream()
vlog.VI(2).Infof("cr: resolveViaApp: starting app based resolution on %d groups", len(groupedConflicts.groups))
for i, group := range groupedConflicts.groups {
vlog.VI(2).Infof("cr: resolveViaApp: sending conflict group %d to app", i)
// Send all batches first
if err := sendBatches(ctx, iSt, sendStream, db, group); err != nil {
return err
}
// Send all conflict rows.
if err := sendConflictRows(ctx, iSt, sendStream, db, group); err != nil {
return err
}
// Receive resolutions from the app.
results, err := receiveResolutions(ctx, recvStream)
if err != nil {
return err
}
for oid := range group.batchesByOid {
_, present := results[oid]
if !present {
// CR not possible now, delay conflict resolution
// TODO(jlodhia):[usability] send an error message to app.
errStr := fmt.Sprintf("cr: resolveViaApp: Resolution for oid %s expected but not received", oid)
vlog.VI(2).Infof(errStr)
return verror.New(verror.ErrInternal, ctx, errStr)
}
}
vlog.VI(2).Infof("cr: resolveViaApp: all resolutions received")
// Process resolutions.
conf := iSt.config
// Empty str for Data that is not SyncGroup.
sgoid := ""
// reserve more than needed
reserveCount := uint64(len(results))
gen, pos := conf.sync.reserveGenAndPosInDbLog(ctx, conf.dbId, sgoid, reserveCount)
processResInfos(ctx, iSt, results, gen, pos)
}
return nil
}
// sendBatches streams batch info row for each batch present within a cr group.
// In case of error while sending a conflict via the stream, the stream is
// deemed to be dead and is reset.
func sendBatches(ctx *context.T, iSt *initiationState, sendStream crSendStream, db interfaces.Database, group *crGroup) error {
for batchId, source := range group.batchSource {
ci := createBatchConflictInfo(batchId, source)
if err := sendStream.Send(*ci); err != nil {
// CR not possible now, delay conflict resolution
// Remove the outdated cr connection object from database.
db.ResetCrConnectionStream()
vlog.VI(2).Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
return interfaces.NewErrBrokenCrConnection(ctx)
}
}
return nil
}
// sendConflictRows streams conflict info rows present within a cr group.
// The continued field is set to false only for the last row to mark end of the
// group. In case of error while sending a conflict via the stream, the stream
// is deemed to be dead and is reset.
func sendConflictRows(ctx *context.T, iSt *initiationState, sendStream crSendStream, db interfaces.Database, group *crGroup) error {
numRows, count := len(group.batchesByOid), 0
for oid, batches := range group.batchesByOid {
count++
ci := createRowConflictInfo(ctx, iSt, oid, batches, count < numRows)
if err := sendStream.Send(*ci); err != nil {
// CR not possible now, delay conflict resolution
// Remove the outdated cr connection object from database.
db.ResetCrConnectionStream()
vlog.VI(2).Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
return interfaces.NewErrBrokenCrConnection(ctx)
}
}
return nil
}
// receiveResolutions is a blocking function that waits for the app to return
// all resolutions for the cr group. End of group is marked by field continued
// with value 'false'.
// TODO(jlodhia): Add a timeout to this function to protect against bad client
// code.
func receiveResolutions(ctx *context.T, recvStream crRecvStream) (map[string]*wire.ResolutionInfo, error) {
results := map[string]*wire.ResolutionInfo{}
for recvStream.Advance() {
resp := recvStream.Value()
// Key received as part of resolution info has struct <collection>:<row>
// while oid is a complete row key r:<collection>:<row>
results[toRowKey(resp.Key)] = &resp
if !resp.Continued {
return results, nil
}
}
if err := recvStream.Err(); err != nil {
vlog.Errorf("sync: resolveViaApp: Error while receiving resolutions from app over stream: %v", err)
return nil, err
}
// Response stream ended midway. Advance returned false but the last
// resolution info object had Continued field true.
vlog.Errorf("sync: resolveViaApp: Resoponse stream ended midway")
return nil, verror.NewErrInternal(ctx)
}
// processResInfos assigns appropriate conflictResolution objects for Oids in
// iSt.updObjects map based on ResolutionInfos received for a conflict group.
// Following are the main steps in this function
// 1) Create a new BatchId for this group. If there is a single Oid in the group
// then the batch id is NoBatchId
// 2) For each resolution create a new conflictResolution object and assign
// appropriate resolution type (pickLocal, pickRemote, createNew)
// 3) In case of createNew, add the new value to the conflictResolution object
// and create a new localLogRecord for the new value. This record is not
// written to db yet.
func processResInfos(ctx *context.T, iSt *initiationState, results map[string]*wire.ResolutionInfo, gen, pos uint64) {
// Total count for batch includes new values and linked log records
// (i.e. len(results)).
batchCount := uint64(len(results))
resBatchId := newBatch(ctx, iSt, batchCount)
sId := iSt.config.sync.id
for oid, rInfo := range results {
conflictState := iSt.updObjects[oid]
var res conflictResolution
res.batchId = resBatchId
res.batchCount = batchCount
switch {
case rInfo.Selection == wire.ValueSelectionLocal:
if createsCycle(conflictState.oldHead, conflictState) {
// The object had a remote version and since its not under
// conflict, the local version is supposed to be its ancestor.
// To avoid a dag cycle, we treat it as a createNew.
res.ty = createNew
now, _ := iSt.tx.St.Clock.Now()
timestamp := now
dagNode := getNodeOrFail(ctx, iSt.tx, oid, conflictState.oldHead)
if !dagNode.Deleted {
res.val = getObjectAtVer(ctx, iSt, oid, conflictState.oldHead)
}
parents := []string{conflictState.newHead}
res.rec = createLocalLogRec(ctx, oid, parents, dagNode.Deleted, timestamp, sId, gen, pos, resBatchId, batchCount)
gen++
pos++
} else {
res.ty = pickLocal
}
case rInfo.Selection == wire.ValueSelectionRemote:
res.ty = pickRemote
if conflictState.oldHead == conflictState.newHead {
// pickRemote gives the same version as pickLocal.
// Converting to pickLocal for optimization.
res.ty = pickLocal
}
case rInfo.Selection == wire.ValueSelectionOther:
// TODO(jlodhia):[optimization] Do byte comparison to see if
// the new value is equal to local or remote. If so dont use
// createNew.
res.ty = createNew
// TODO(m3b): What should this routine do if there's an encoding error?
res.val, _ = vom.Encode(rInfo.Result.Bytes)
// TODO(jlodhia):[correctness] Use vclock to create the write
// timestamp.
timestamp := time.Now()
parents := getResolutionParents(iSt, oid)
res.rec = createLocalLogRec(ctx, oid, parents, isDeleted(rInfo), timestamp, sId, gen, pos, resBatchId, batchCount)
gen++
pos++
}
conflictState.res = &res
}
}
// createsCycle detects whether selecting resVer as the resolution would create
// a cycle in the dag or not.
func createsCycle(resVer string, conflictState *objConflictState) bool {
if !conflictState.isConflict &&
(conflictState.newHead != NoVersion) && // object pulled in by CR
(conflictState.oldHead != conflictState.newHead) {
// This means old head is a parent of new head. If the resolution is
// oldHead then it will create a cycle.
return resVer == conflictState.oldHead
}
return false
}
func getNodeOrFail(ctx *context.T, st store.StoreReader, oid, version string) *DagNode {
dagNode, err := getNode(ctx, st, oid, version)
if err != nil {
vlog.Fatalf("sync: resolveViaApp: error while fetching dag node: %v", err)
}
return dagNode
}
func getResolutionParents(iSt *initiationState, oid string) []string {
parents := []string{}
conflictState, present := iSt.updObjects[oid]
if !present {
vlog.Fatalf("sync: resolveViaApp: getResolutionParents was called for oid that was not a part of updObjects: %v", oid)
}
// For ease of readability and maintainability, the following code is kept
// verbose.
if conflictState.isConflict {
parents = append(parents, conflictState.oldHead)
parents = append(parents, conflictState.newHead)
return parents
}
if iSt.isDirty(oid) {
// There is no conflict. The object has a remote update and the local
// head is an ancestor of the remote version. Add only the remote
// version as parent.
parents = append(parents, conflictState.newHead)
return parents
}
// This object was added by conflict resolution and only has local version.
// The remote version is unknown. Add only the local version as parent.
parents = append(parents, conflictState.oldHead)
return parents
}
// A delete in resolution is signified by a non nil Result field with nil Bytes.
func isDeleted(rInfo *wire.ResolutionInfo) bool {
return rInfo.Result.Bytes == nil
}
func newBatch(ctx *context.T, iSt *initiationState, count uint64) uint64 {
if count < 2 {
return NoBatchId
}
resBatchId := iSt.config.sync.startBatch(ctx, iSt.tx, NoBatchId)
if resBatchId == NoBatchId {
vlog.Fatalf("sync: resolveViaApp: failed to generate batch ID")
}
if err := iSt.config.sync.endBatch(ctx, iSt.tx, resBatchId, count); err != nil {
vlog.Fatalf("sync: resolveViaApp: failed end batch for id %d with error: %v", resBatchId, err)
}
return resBatchId
}
// Each batch info row has continued field set to true since the last batch
// info row will be followed by at least one conflict row.
func createBatchConflictInfo(batchId uint64, source wire.BatchSource) *wire.ConflictInfo {
// TODO(jlodhia): add app's hint once transaction api and sync handles hint.
batch := wire.BatchInfo{Id: batchId, Hint: "", Source: source}
return &wire.ConflictInfo{
Data: wire.ConflictDataBatch{Value: batch},
Continued: true,
}
}
func createRowConflictInfo(ctx *context.T, iSt *initiationState, oid string, batches []uint64, contd bool) *wire.ConflictInfo {
op := wire.RowOp{}
op.Key = common.StripFirstKeyPartOrDie(oid)
objSt := iSt.updObjects[oid]
op.AncestorValue = createValueObj(ctx, iSt, oid, objSt.ancestor, true)
op.LocalValue = createValueObj(ctx, iSt, oid, objSt.oldHead, false)
op.RemoteValue = createValueObj(ctx, iSt, oid, objSt.newHead, false)
row := wire.RowInfo{
Op: wire.OperationWrite{Value: op},
BatchIds: batches,
}
return &wire.ConflictInfo{
Data: wire.ConflictDataRow{Value: row},
Continued: contd,
}
}
func createValueObj(ctx *context.T, iSt *initiationState, oid, ver string, isAncestor bool) *wire.Value {
if ver == NoVersion {
if isAncestor {
return &wire.Value{State: wire.ValueStateNoExists}
}
return &wire.Value{State: wire.ValueStateUnknown}
}
dagNode, err := getNode(ctx, iSt.tx, oid, ver)
if err != nil {
vlog.Fatalf("sync: resolveViaApp: error while fetching dag node: %v", err)
}
var bytes []byte = nil
valueState := wire.ValueStateDeleted
var rawBytes *vom.RawBytes = nil
if !dagNode.Deleted {
bytes = getObjectAtVer(ctx, iSt, oid, ver)
// TODO(m3b): What should this routine do if there's a decoding error?
vom.Decode(bytes, &rawBytes)
valueState = wire.ValueStateExists
}
return &wire.Value{
State: valueState,
Bytes: rawBytes,
WriteTs: getLocalLogRec(ctx, iSt, oid, ver).Metadata.UpdTime,
}
}
func getLocalLogRec(ctx *context.T, iSt *initiationState, oid, version string) *LocalLogRec {
lrecs, err := iSt.getLogRecsBatch(ctx, oid, []string{version})
if err != nil {
vlog.Fatalf("sync: resolveViaApp: error while fetching LocalLogRec: %v", err)
}
if lrecs == nil || lrecs[0] == nil {
vlog.Fatalf("sync: resolveViaApp: LocalLogRec found nil for oid %v, version: %v", oid, version)
}
return lrecs[0]
}
func getObjectAtVer(ctx *context.T, iSt *initiationState, oid, ver string) []byte {
bytes, err := watchable.GetAtVersion(ctx, iSt.tx, []byte(oid), nil, []byte(ver))
if err != nil {
vlog.Fatalf("sync: resolveViaApp: error while fetching object: %v", err)
}
return bytes
}
// createLocalLogRec creates a local sync log record given its information.
func createLocalLogRec(ctx *context.T, oid string, parents []string, deleted bool, timestamp time.Time, sId, gen, pos, batchId, count uint64) *LocalLogRec {
if len(parents) == 0 {
vlog.Fatalf("sync: resolveViaApp: there must be atleast one parent")
}
rec := LocalLogRec{}
rec.Metadata.ObjId = oid
rec.Metadata.CurVers = string(watchable.NewVersion())
rec.Metadata.Delete = deleted
rec.Metadata.Parents = parents
rec.Metadata.UpdTime = timestamp
rec.Metadata.Id = sId
rec.Metadata.Gen = gen
rec.Metadata.RecType = interfaces.NodeRec
rec.Metadata.BatchId = batchId
rec.Metadata.BatchCount = count
rec.Pos = pos
return &rec
}