blob: 4042f004d58574ce47b6f45908ab4ff27a15a801 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncbase
import (
"sync"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/services/watch"
"v.io/v23/syncbase/util"
"v.io/v23/verror"
"v.io/x/lib/vlog"
)
const (
// Wait time before we try to reconnect a broken conflict resolution stream.
waitBeforeReconnectInMillis = 2 * time.Second
reconnectionCount = "rcc"
)
// TODO(sadovsky): Make this private. For some reason,
// v.io/x/jni/v23/syncbase/jni.go calls it directly.
func NewDatabase(parentFullName string, id wire.Id, schema *Schema) *database {
return &database{
databaseBatch: *newDatabaseBatch(parentFullName, id, ""),
schema: schema,
crState: conflictResolutionState{
reconnectWaitTime: waitBeforeReconnectInMillis,
},
}
}
type database struct {
databaseBatch
schema *Schema
crState conflictResolutionState
}
// conflictResolutionState maintains data about the connection of a conflict
// resolution stream with syncbase. It provides a way to disconnect an existing
// open stream.
type conflictResolutionState struct {
mu sync.Mutex // guards access to all fields in this struct
crContext *context.T
cancelFn context.CancelFunc
isClosed bool
reconnectWaitTime time.Duration
}
func (crs *conflictResolutionState) disconnect() {
crs.mu.Lock()
defer crs.mu.Unlock()
crs.isClosed = true
crs.cancelFn()
}
func (crs *conflictResolutionState) isDisconnected() bool {
crs.mu.Lock()
defer crs.mu.Unlock()
return crs.isClosed
}
var _ Database = (*database)(nil)
// Create implements Database.Create.
func (d *database) Create(ctx *context.T, perms access.Permissions) error {
var schemaMetadata *wire.SchemaMetadata = nil
if d.schema != nil {
schemaMetadata = &d.schema.Metadata
}
return d.c.Create(ctx, schemaMetadata, perms)
}
// Destroy implements Database.Destroy.
func (d *database) Destroy(ctx *context.T) error {
return d.c.Destroy(ctx)
}
// Exists implements Database.Exists.
func (d *database) Exists(ctx *context.T) (bool, error) {
return d.c.Exists(ctx)
}
// BeginBatch implements Database.BeginBatch.
func (d *database) BeginBatch(ctx *context.T, opts wire.BatchOptions) (BatchDatabase, error) {
batchHandle, err := d.c.BeginBatch(ctx, opts)
if err != nil {
return nil, err
}
return newBatch(d.parentFullName, d.id, batchHandle), nil
}
// SetPermissions implements Database.SetPermissions.
func (d *database) SetPermissions(ctx *context.T, perms access.Permissions, version string) error {
return d.c.SetPermissions(ctx, perms, version)
}
// GetPermissions implements Database.GetPermissions.
func (d *database) GetPermissions(ctx *context.T) (perms access.Permissions, version string, err error) {
return d.c.GetPermissions(ctx)
}
// Watch implements Database.Watch.
func (d *database) Watch(ctx *context.T, collection, prefix string, resumeMarker watch.ResumeMarker) (WatchStream, error) {
if !util.ValidCollectionName(collection) {
return nil, verror.New(wire.ErrInvalidName, ctx, collection)
}
ctx, cancel := context.WithCancel(ctx)
call, err := d.c.WatchGlob(ctx, watch.GlobRequest{
Pattern: naming.Join(collection, prefix+"*"),
ResumeMarker: resumeMarker,
})
if err != nil {
return nil, err
}
return newWatchStream(cancel, call), nil
}
// Syncgroup implements Database.Syncgroup.
func (d *database) Syncgroup(sgName string) Syncgroup {
return newSyncgroup(d.fullName, sgName)
}
// GetSyncgroupNames implements Database.GetSyncgroupNames.
func (d *database) GetSyncgroupNames(ctx *context.T) ([]string, error) {
return d.c.GetSyncgroupNames(ctx)
}
// Blob implements Database.Blob.
func (d *database) Blob(br wire.BlobRef) Blob {
return newBlob(d.fullName, br)
}
// CreateBlob implements Database.CreateBlob.
func (d *database) CreateBlob(ctx *context.T) (Blob, error) {
return createBlob(ctx, d.fullName)
}
// PauseSync implements Database.PauseSync.
func (d *database) PauseSync(ctx *context.T) error {
return d.c.PauseSync(ctx)
}
// ResumeSync implements Database.ResumeSync.
func (d *database) ResumeSync(ctx *context.T) error {
return d.c.ResumeSync(ctx)
}
// EnforceSchema implements Database.EnforceSchema.
func (d *database) EnforceSchema(ctx *context.T) error {
var schema *Schema = d.schema
if schema == nil {
return verror.New(verror.ErrBadState, ctx, "EnforceSchema cannot be used since a nil *Schema was provided at Database handle creation time.")
}
if schema.Metadata.Version < 0 {
return verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
}
if needsResolver(d.schema.Metadata) && d.schema.Resolver == nil {
return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
}
if _, err := d.updateSchemaMetadata(ctx); err != nil {
return err
}
if d.schema.Resolver == nil {
return nil
}
childCtx, cancelFn := context.WithCancel(ctx)
d.crState.crContext = childCtx
d.crState.cancelFn = cancelFn
go d.establishConflictResolution(childCtx)
return nil
}
// Close implements Database.Close.
func (d *database) Close() {
d.crState.disconnect()
// TODO(ivanpi): Abort all batches.
}
// updateSchemaMetadata reads the current SchemaMetadata from db and checks
// if the SchemaMetadata provided by the app is newer. If so, it updates the
// db with the new SchemaMetadata.
func (d *database) updateSchemaMetadata(ctx *context.T) (bool, error) {
var schema *Schema = d.schema
schemaMgr := d.getSchemaManager()
currMeta, err := schemaMgr.getSchemaMetadata(ctx)
if err != nil {
// If the client app did not set a schema as part of database creation,
// getSchemaMetadata() will return ErrNoExist. In this case, we set the
// schema here.
if verror.ErrorID(err) == verror.ErrNoExist.ID {
err := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
// The database may not yet exist. If so, the above call will return
// ErrNoExist, and here we return (false, nil). If the error is anything
// other than ErrNoExist, here we return (false, err).
if (err != nil) && (verror.ErrorID(err) != verror.ErrNoExist.ID) {
return false, err
}
return false, nil
}
return false, err
}
if currMeta.Version >= schema.Metadata.Version {
return false, nil
}
// Update the schema metadata in db to the latest version.
if metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata); metadataErr != nil {
vlog.Error(metadataErr)
return false, metadataErr
}
return true, nil
}
func (d *database) establishConflictResolution(ctx *context.T) {
count := 0
for {
count++
vlog.Infof("Starting a new conflict resolution connection. Reconnection count: %d", count)
childCtx := context.WithValue(ctx, reconnectionCount, count)
// listenForConflicts is a blocking method that returns only when the
// conflict stream is broken.
if err := d.listenForConflicts(childCtx); err != nil && !d.crState.isDisconnected() {
vlog.Errorf("Conflict resolution connection ended with error: %v", err)
}
// Check if database is closed and if we need to shutdown conflict
// resolution.
if d.crState.isDisconnected() {
vlog.Infof("Shutting down conflict resolution connection.")
break
}
// The connection might have broken because the syncbase server went down.
// Sleep for a few seconds to allow syncbase to come back up.
time.Sleep(d.crState.reconnectWaitTime)
}
}
func (d *database) listenForConflicts(ctx *context.T) error {
resolver, err := d.c.StartConflictResolver(ctx)
if err != nil {
return err
}
conflictStream := resolver.RecvStream()
resolutionStream := resolver.SendStream()
var c *Conflict = &Conflict{}
for conflictStream.Advance() {
row := conflictStream.Value()
addRowToConflict(c, &row)
if !row.Continued {
resolution := d.schema.Resolver.OnConflict(ctx, c)
if err := sendResolution(resolutionStream, resolution); err != nil {
return err
}
c = &Conflict{} // create a new conflict object for the next batch
}
}
if err := conflictStream.Err(); err != nil {
return err
}
return resolver.Finish()
}
// TODO(jlodhia): Should we check that the Resolution provided by the
// application resolves all conflicts in the write set?
func sendResolution(stream interface {
Send(item wire.ResolutionInfo) error
}, resolution Resolution) error {
size := len(resolution.ResultSet)
count := 0
for _, v := range resolution.ResultSet {
count++
ri := toResolutionInfo(v, count != size)
if err := stream.Send(ri); err != nil {
vlog.Errorf("Error while sending resolution: %v", err)
return err
}
}
return nil
}
func addRowToConflict(c *Conflict, ci *wire.ConflictInfo) {
switch v := ci.Data.(type) {
case wire.ConflictDataBatch:
if c.Batches == nil {
c.Batches = map[uint64]wire.BatchInfo{}
}
c.Batches[v.Value.Id] = v.Value
case wire.ConflictDataRow:
rowInfo := v.Value
switch op := rowInfo.Op.(type) {
case wire.OperationWrite:
if c.WriteSet == nil {
c.WriteSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint64][]ConflictRow{}}
}
cr := toConflictRow(op.Value, rowInfo.BatchIds)
c.WriteSet.ByKey[cr.Key] = cr
for _, bid := range rowInfo.BatchIds {
c.WriteSet.ByBatch[bid] = append(c.WriteSet.ByBatch[bid], cr)
}
case wire.OperationRead:
if c.ReadSet == nil {
c.ReadSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint64][]ConflictRow{}}
}
cr := toConflictRow(op.Value, rowInfo.BatchIds)
c.ReadSet.ByKey[cr.Key] = cr
for _, bid := range rowInfo.BatchIds {
c.ReadSet.ByBatch[bid] = append(c.ReadSet.ByBatch[bid], cr)
}
case wire.OperationScan:
if c.ScanSet == nil {
c.ScanSet = &ConflictScanSet{map[uint64][]wire.ScanOp{}}
}
for _, bid := range rowInfo.BatchIds {
c.ScanSet.ByBatch[bid] = append(c.ScanSet.ByBatch[bid], op.Value)
}
}
}
}
func toConflictRow(op wire.RowOp, batchIds []uint64) ConflictRow {
local := Value{
State: op.LocalValue.State,
Val: op.LocalValue.Bytes,
WriteTs: op.LocalValue.WriteTs,
Selection: wire.ValueSelectionLocal,
}
remote := Value{
State: op.RemoteValue.State,
Val: op.RemoteValue.Bytes,
WriteTs: op.RemoteValue.WriteTs,
Selection: wire.ValueSelectionRemote,
}
ancestor := Value{
State: op.AncestorValue.State,
Val: op.AncestorValue.Bytes,
WriteTs: op.AncestorValue.WriteTs,
Selection: wire.ValueSelectionOther,
}
return ConflictRow{
Key: op.Key,
LocalValue: local,
RemoteValue: remote,
AncestorValue: ancestor,
BatchIds: batchIds,
}
}
func toResolutionInfo(r ResolvedRow, lastRow bool) wire.ResolutionInfo {
sel := wire.ValueSelectionOther
resVal := (*wire.Value)(nil)
if r.Result != nil {
sel = r.Result.Selection
resVal = &wire.Value{
Bytes: r.Result.Val,
WriteTs: r.Result.WriteTs, // ignored by syncbase
}
}
return wire.ResolutionInfo{
Key: r.Key,
Selection: sel,
Result: resVal,
Continued: lastRow,
}
}
func needsResolver(metadata wire.SchemaMetadata) bool {
for _, rule := range metadata.Policy.Rules {
if rule.Resolver == wire.ResolverTypeAppResolves {
return true
}
}
return false
}
func (d *database) getSchemaManager() schemaManagerImpl {
return newSchemaManager(d.c)
}