blob: 29757c81856d36b6f3cea9381d3dceeeb978f9ae [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package featuretests_test
import (
"fmt"
"strings"
"testing"
"time"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/test/testutil"
"v.io/x/ref/test/v23test"
)
// Tests the conflict resolution configuration rules.
// Setup:
// S0 and S1 both have data for keys foo0 to foo9 prepopulated.
// CR Rules are defined as:
// keys with prefix foo0 -> app based resolution
// remaining -> default
// S0 and S1 update all rows concurrently causing a conflict for each key.
// Result: foo0 is sent for app based resolution while rest are resolved by
// timestamps.
//
// TODO(jlodhia): Add more rules based on value type and combination of key
// prefix and value type once its implemented.
func TestV23CRRuleConfig(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 10, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
ok(t, updateData(client0Ctx, "s0", 0, 10, "concurrentUpdate"))
ok(t, updateData(client1Ctx, "s1", 0, 10, "concurrentUpdate"))
schemaKeyPrefix := "foo0"
expectedOnConflictCallCount := 1
runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, expectedOnConflictCallCount, func() {
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that the resolved data looks correct.
ok(t, waitForValue(client0Ctx, "s0", "foo0", "AppResolvedVal", schemaKeyPrefix))
ok(t, waitForValue(client0Ctx, "s0", "foo1", "concurrentUpdate"+"s1", schemaKeyPrefix))
ok(t, waitForValue(client1Ctx, "s1", "foo0", "AppResolvedVal", schemaKeyPrefix))
ok(t, waitForValue(client1Ctx, "s1", "foo1", "concurrentUpdate"+"s1", schemaKeyPrefix))
})
}
// Tests the default behavior of conflict resolution, which is last timestamp
// wins, if schema is not specified.
// Setup:
// S0 and S1 both have rows for key foo0. No schema is specified. Both update
// value for foo0 concurrently where S1's write has a newer timestamp.
// Result:
// The value for foo0 after sync settles on what S1 wrote for both syncbases.
func TestV23CRDefault(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 1, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
ok(t, updateData(client0Ctx, "s0", 0, 1, "concurrentUpdate"))
time.Sleep(5 * time.Millisecond) // make sure that the clock moves forwared between the two updates.
ok(t, updateData(client1Ctx, "s1", 0, 1, "concurrentUpdate"))
// Add new seperate keys to each syncbase so that we can verify if sync
// has happened between the two syncbases by waiting on the other's key.
ok(t, populateData(client0Ctx, "s0", testCx.Name, "foo", 22, 23))
ok(t, populateData(client1Ctx, "s1", testCx.Name, "foo", 44, 45))
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that both sides have synced with the other.
ok(t, waitForValue(client0Ctx, "s0", "foo44", "testkey", "")) // 44 is written by S1
ok(t, waitForValue(client1Ctx, "s1", "foo22", "testkey", "")) // 22 is written by S0
// Verify that the resolved data looks correct.
ok(t, waitForValue(client0Ctx, "s0", "foo0", "concurrentUpdate"+"s1", ""))
ok(t, waitForValue(client1Ctx, "s1", "foo0", "concurrentUpdate"+"s1", ""))
}
func TestV23CRGenVectorWinsOverVClock(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
// Creates S0 and S1 and populates S0 with foo0, foo1 and verifies that it
// synced to S1
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 2, true)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Set S1's local clock back in time to begining of Jan 2015
ok(t, sc("s1").DevModeUpdateVClock(client1Ctx, wire.DevModeUpdateVClockOpts{
Now: jan2015,
ElapsedTime: 0,
DoLocalUpdate: true,
}))
// Check if S1's clock is correctly set.
t0, err := sc("s0").DevModeGetTime(client0Ctx)
ok(t, err)
t1, err := sc("s1").DevModeGetTime(client1Ctx)
ok(t, err)
if !t1.Before(t0) {
t.Fatalf("expected timestamp of S1 < timestamp of S0: %v, %v", t1, t0)
}
// Update foo0 on S1 creating a new version for it with an older timestamp.
ok(t, updateData(client1Ctx, "s1", 0, 1, "newUpdateWithOldTimestamp"))
// Update foo1 on S0 followed, after a second, by an update on foo1 on S1.
// Since sync is paused, the two updates are concurrent from GenVector
// perspective.
ok(t, updateData(client0Ctx, "s0", 1, 2, "concurrentUpdate"))
time.Sleep(1 * time.Second)
ok(t, updateData(client1Ctx, "s1", 1, 2, "concurrentUpdate"))
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that S1's write for foo0 is accepted as the latest version after
// sync.
ok(t, waitForValue(client0Ctx, "s0", "foo0", "newUpdateWithOldTimestamp"+"s1", ""))
ok(t, waitForValue(client1Ctx, "s1", "foo0", "newUpdateWithOldTimestamp"+"s1", ""))
// Since foo1 was updated concurrently from GenVector point of view and S1's
// clock is way behind S0's, even though in reality S1's write happened
// after S0, S0's clock was ahead and hence its write would win.
ok(t, waitForValue(client0Ctx, "s0", "foo1", "concurrentUpdate"+"s0", ""))
ok(t, waitForValue(client1Ctx, "s1", "foo1", "concurrentUpdate"+"s0", ""))
}
// Tests last timestamp wins for batches under conflict.
// Setup:
// S0 and S1 have prepopulated values for rows foo0 to foo100.
// Conflict resolution type used is LastWins.
// S0 and S1 update all rows in parallel using goroutines so that the actual
// writes are interleaved.
// Result:
// After conflict resolution, final values for all rows within the batch must
// come from either S0 or S1 but not a mixture of the two.
func TestV23CRWithAtomicBatch(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 100, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
go ok(t, updateDataInBatch(client0Ctx, "s0", 0, 100, "concurrentBatchUpdate", "batchDoneKey1"))
go ok(t, updateDataInBatch(client1Ctx, "s1", 0, 100, "concurrentBatchUpdate", "batchDoneKey2"))
time.Sleep(1 * time.Second) // let the above go routine get scheduled.
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Make sure that the sync has completed by injecting a row on s0 and
// reading it on s1.
ok(t, populateData(client0Ctx, "s0", testCx.Name, "foo", 200, 201))
ok(t, waitForValue(client1Ctx, "s1", "foo200", "testkey", ""))
ok(t, populateData(client1Ctx, "s1", testCx.Name, "foo", 400, 401))
ok(t, waitForValue(client0Ctx, "s0", "foo400", "testkey", ""))
ok(t, verifyConflictResolvedBatch(client0Ctx, "s0", "foo", 0, 100, "concurrentBatchUpdate"))
ok(t, verifyConflictResolvedBatch(client1Ctx, "s1", "foo", 0, 100, "concurrentBatchUpdate"))
}
// Tests AppResolves resolution policy by creating conflicts for rows that will
// be resolved by the application. This test covers the following scenerios:
// 1) 5 independent rows under conflict resulting into 5 conflict resolution
// calls to the app.
// 2) 5 rows written as a single batch on both syncbases resulting into a
// single conflict for the batch.
func TestV23CRAppResolved(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 10, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
ok(t, updateData(client0Ctx, "s0", 0, 5, "concurrentUpdate"))
ok(t, updateData(client1Ctx, "s1", 0, 5, "concurrentUpdate"))
ok(t, updateDataInBatch(client0Ctx, "s0", 5, 10, "concurrentBatchUpdate", ""))
ok(t, updateDataInBatch(client1Ctx, "s1", 5, 10, "concurrentBatchUpdate", ""))
schemaPrefix := "foo"
keyPrefix := "foo"
expectedOnConflictCallCount := 6
runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that the resolved data looks correct.
keyUnderConflict := "foo8" // one of the keys under conflict
ok(t, waitForValue(client0Ctx, "s0", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client0Ctx, "s0", keyPrefix, schemaPrefix, 0, 5, "AppResolvedVal"))
ok(t, verifyConflictResolvedData(client0Ctx, "s0", keyPrefix, schemaPrefix, 5, 10, "AppResolvedVal"))
ok(t, waitForValue(client1Ctx, "s1", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client1Ctx, "s1", keyPrefix, schemaPrefix, 0, 5, "AppResolvedVal"))
ok(t, verifyConflictResolvedData(client1Ctx, "s1", keyPrefix, schemaPrefix, 5, 10, "AppResolvedVal"))
})
}
// Tests if a row which was supposed to be resolved based on LastWins policy
// is overridden by AppResolves due to association with a row that has
// AppResolves.
// Setup:
// S0 and S1 have prepopulated values for rows foo0 to foo20.
// Rows with key prefix foo1 are resolved using AppResolves while others are
// LastWins.
// S0 and S1 update all rows concurrently as a single batch where S1's writes
// are newer than S0's.
// Result:
// All rows are resolved via AppResolves.
func TestV23CRAppBasedResolutionOverridesOthers(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 20, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
ok(t, updateDataInBatch(client0Ctx, "s0", 0, 20, "concurrentBatchUpdate", ""))
ok(t, updateDataInBatch(client1Ctx, "s1", 0, 20, "concurrentBatchUpdate", ""))
schemaPrefix := "foo1"
keyPrefix := "foo"
expectedOnConflictCallCount := 1
runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that the resolved data looks correct.
keyUnderConflict := "foo11" // one of the keys under conflict
ok(t, waitForValue(client0Ctx, "s0", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client0Ctx, "s0", keyPrefix, schemaPrefix, 0, 20, "AppResolvedVal"))
ok(t, waitForValue(client1Ctx, "s1", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client1Ctx, "s1", keyPrefix, schemaPrefix, 0, 20, "AppResolvedVal"))
})
}
// Tests if 3 different batches (B1, B2 & B3) that conflict with each other due
// to a subset of rows being common between (B1, B2) and another subset of rows
// common between (B2, B3) results into a single conflict call to the app.
// Setup:
// S0 and S1 have prepopulated values for rows foo0 to foo9.
// Rows with key prefix foo are resolved using AppResolves.
// S0 writes two batches B1{foo0 to foo3}, B3{foo6 to foo9}
// S1 concurrently writes batches B2{foo3 to foo6}
// Result:
// All rows are resolved via AppResolves as a single conflict call.
func TestV23CRMultipleBatchesAsSingleConflict(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
client0Ctx, client1Ctx, _, _ := setupCRTest(t, sh, 10, false)
ok(t, pauseSync(client0Ctx, "s0"))
ok(t, pauseSync(client1Ctx, "s1"))
// Since sync is paused, the following updates are concurrent and not
// racy as long as Put() is sufficiently synchronous.
// Batch1 has 0, 1, 2, 3 on S0
ok(t, updateDataInBatch(client0Ctx, "s0", 0, 4, "concurrentBatchUpdate", ""))
// Batch2 has 6, 7, 8, 9 on S0
ok(t, updateDataInBatch(client0Ctx, "s0", 6, 10, "concurrentBatchUpdate", ""))
// Batch3 has 3, 4, 5, 6 on S1
ok(t, updateDataInBatch(client1Ctx, "s1", 3, 7, "concurrentBatchUpdate", ""))
schemaPrefix := "foo"
keyPrefix := "foo"
expectedOnConflictCallCount := 1
runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
ok(t, resumeSync(client0Ctx, "s0"))
ok(t, resumeSync(client1Ctx, "s1"))
// Verify that the resolved data looks correct.
keyUnderConflict := "foo8" // one of the keys under conflict
ok(t, waitForValue(client0Ctx, "s0", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client0Ctx, "s0", keyPrefix, schemaPrefix, 0, 10, "AppResolvedVal"))
ok(t, waitForValue(client1Ctx, "s1", keyUnderConflict, "AppResolvedVal", schemaPrefix))
ok(t, verifyConflictResolvedData(client1Ctx, "s1", keyPrefix, schemaPrefix, 0, 10, "AppResolvedVal"))
})
}
func setupCRTest(t *testing.T, sh *v23test.Shell, numInitRows int, devMode bool) (client0, client1 *context.T, sbName string, sgId wire.Id) {
sh.StartRootMountTable()
sbs := setupSyncbases(t, sh, 2, devMode)
sgId = wire.Id{Name: "SG1", Blessing: sbBlessings(sbs)}
// Create syncgroup and populate data on s0.
ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, "c", "", sbBlessings(sbs), nil, clBlessings(sbs)))
ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, numInitRows))
// Join syncgroup and verify data on s1.
ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId))
ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, numInitRows))
return sbs[0].clientCtx, sbs[1].clientCtx, sbs[0].sbName, sgId
}
// TODO(sadovsky): This pattern is not ideal in that it makes the test code
// harder to follow. It would be better to define two helper functions: one
// to do the stuff before fn(), and another to do the stuff after fn(). (Also,
// switching to channel-based signalling should simplify things substantially.)
func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, expectedCallCount int, fn func()) {
// 1) During tests, S0 and S1 may sync with each other at the same time. Hence
// they end up doing conflict resolution for the same objects on both sides
// creating new resolved objects independently. These resolved versions
// get synced again creating another set of conflicts causing 2X the amount
// of OnConflict calls. TODO(jlodhia): Once Sync handles duplicate
// resolutions internally, this will no longer be true.
//
// 2) When the CPU load goes high, the watcher can fall behind in updating
// sync dag with the latest updates. This leads to scenarios where S0's dag
// may catch up with all N updates and send them to S1 while S1's watcher
// has only caught up to N - k updates. This means that S1 will only
// recognize N - k conflicts instead of N conflicts. These conflicts result
// in up to N - k OnConflict calls. When S1 tries to commit the resolved
// updates, the store rejects the commit on account of k objects it has
// not yet caught up on. Initiator will sleep for 1 sec and then retry
// processing the N objects received from S0, causing up to N more
// OnConflict calls and finally succeeding in committing the result.
//
// Due to (1) and (2), the total number of calls to OnConflict can be up to
// 3X the expected call count.
maxCallCount := 3 * expectedCallCount
// Create and hold a conflict resolution connection on s0 and s1 to receive
// future conflicts. The expected call count is 2 * the number of batches
// because each batch is being concurrently resolved on s0 and s1 creating new
// values on each side. Later when the next round of sync happens these new
// values cause another conflict. Since the conflict resolver does not create
// new value for a duplicate conflict, no more conflict pingpongs happen.
// TODO(jlodhia): change the expected num conflicts from 12 to 6 once sync's
// cr code handles duplicate resolutions internally.
go func() {
ok(t, runConflictResolver(client0Ctx, "s0", schemaPrefix, "endKey", maxCallCount))
}()
go func() {
ok(t, runConflictResolver(client1Ctx, "s1", schemaPrefix, "endKey", maxCallCount))
}()
time.Sleep(1 * time.Millisecond) // let the above goroutines start up
fn()
// endTest signals conflict resolution thread to exit.
// TODO(sadovsky): Use channels for signaling now that everything's in the
// same process.
ok(t, endTest(client0Ctx, "s0", schemaPrefix, "endKey"))
ok(t, endTest(client1Ctx, "s1", schemaPrefix, "endKey"))
// wait for conflict resolution thread to exit
ok(t, waitForSignal(client0Ctx, "s0", schemaPrefix, "endKeyAck"))
ok(t, waitForSignal(client1Ctx, "s1", schemaPrefix, "endKeyAck"))
}
//////////////////////////////////////////////
// Helpers specific to ConflictResolution
func runConflictResolver(ctx *context.T, syncbaseName, prefix, signalKey string, maxCallCount int) error {
resolver := &CRImpl{syncbaseName: syncbaseName}
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, makeSchema(prefix, resolver))
defer d.Close()
d.EnforceSchema(ctx)
// Wait till end of test is signalled. The above statement starts a goroutine
// with a cr connection to the server which needs to stay alive till the life
// of the test in order to receive conflicts.
if err := waitSignal(ctx, d, signalKey); err != nil {
return err
}
// Check that OnConflict() was called at most 'maxCallCount' times.
var onConflictErr error
if resolver.onConflictCallCount > maxCallCount {
onConflictErr = fmt.Errorf("Unexpected OnConflict call count. Max: %d, Actual: %d", maxCallCount, resolver.onConflictCallCount)
}
// Reply to the test with a signal to notify it that it may end.
if err := sendSignal(ctx, d, signalKey+"Ack"); err != nil {
return err
}
return onConflictErr
}
func verifyConflictResolvedData(ctx *context.T, syncbaseName, keyPrefix, schemaPrefix string, start, end int, valuePrefix string) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, makeSchema(schemaPrefix, &CRImpl{syncbaseName: syncbaseName}))
c := d.CollectionForId(testCx)
for i := start; i < end; i++ {
var got string
key := fmt.Sprintf("%s%d", keyPrefix, i)
r := c.Row(key)
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v", err)
}
if got != valuePrefix+key {
return fmt.Errorf("unexpected value: got %v, want %v", got, valuePrefix)
}
}
return nil
}
func verifyConflictResolvedBatch(ctx *context.T, syncbaseName, keyPrefix string, start, end int, valuePrefix string) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
c := d.CollectionForId(testCx)
var got string
// get first row
firstKey := fmt.Sprintf("%s%d", keyPrefix, start)
r := c.Row(firstKey)
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
valueServiceStr := strings.TrimSuffix(strings.TrimPrefix(got, valuePrefix), firstKey)
for i := start; i < end; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
r := c.Row(key)
if err := r.Get(ctx, &got); err != nil {
return fmt.Errorf("r.Get() failed: %v\n", err)
}
if got != valuePrefix+valueServiceStr+key {
return fmt.Errorf("unexpected value: got %v, want %v\n", got, valuePrefix+valueServiceStr+key)
}
}
return nil
}
func waitForValue(ctx *context.T, syncbaseName, key, valuePrefix, schemaPrefix string) error {
var schema *syncbase.Schema
if schemaPrefix != "" {
schema = makeSchema(schemaPrefix, &CRImpl{syncbaseName: syncbaseName})
}
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, schema)
c := d.CollectionForId(testCx)
r := c.Row(key)
want := valuePrefix + key
var value string
return testutil.RetryFor(10*time.Second, func() error {
if err := r.Get(ctx, &value); (err == nil) && (value == want) {
return nil // Value found.
} else if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("Syncbase Error while fetching key %v: %v", key, err)
}
return testutil.TryAgain(fmt.Errorf("Value expected: %v, found: %v", want, value))
})
}
func endTest(ctx *context.T, syncbaseName, prefix, signalKey string) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, makeSchema(prefix, &CRImpl{syncbaseName: syncbaseName}))
// signal end of test so that conflict resolution can clean up its stream.
return sendSignal(ctx, d, signalKey)
}
func waitForSignal(ctx *context.T, syncbaseName, prefix, signalKey string) error {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, makeSchema(prefix, &CRImpl{syncbaseName: syncbaseName}))
// wait for signal.
return waitSignal(ctx, d, signalKey)
}
func waitSignal(ctx *context.T, d syncbase.Database, signalKey string) error {
c := d.CollectionForId(testCx)
r := c.Row(signalKey)
var end bool
return testutil.RetryFor(10*time.Second, func() error {
if err := r.Get(ctx, &end); err != nil {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("r.Get() for endkey failed: %v", err)
}
}
if end {
return nil
}
return testutil.TryAgain(fmt.Errorf("Signal %v not received", signalKey))
})
}
////////////////////////////////////////////////////////
// Conflict Resolution related code.
func makeSchema(keyPrefix string, resolver *CRImpl) *syncbase.Schema {
metadata := wire.SchemaMetadata{
Version: 1,
Policy: wire.CrPolicy{
Rules: []wire.CrRule{
wire.CrRule{
CollectionId: testCx,
KeyPrefix: keyPrefix,
Resolver: wire.ResolverTypeAppResolves,
},
},
},
}
return &syncbase.Schema{
Metadata: metadata,
Resolver: resolver,
}
}
// Client conflict resolution impl.
type CRImpl struct {
syncbaseName string
onConflictCallCount int
}
func (ri *CRImpl) OnConflict(ctx *context.T, conflict *syncbase.Conflict) syncbase.Resolution {
resolvedPrefix := "AppResolvedVal"
ri.onConflictCallCount++
res := syncbase.Resolution{ResultSet: map[string]syncbase.ResolvedRow{}}
for rowKey, row := range conflict.WriteSet.ByKey {
resolvedRow := syncbase.ResolvedRow{}
resolvedRow.Key = row.Key
// Handle objects that dont have conflict but were pulled in because of
// other conflicts in the same batch.
// For ease of testing, this is resolved as new value with prefix
// "AppResolvedVal".
if row.LocalValue.State == wire.ValueStateUnknown || row.RemoteValue.State == wire.ValueStateUnknown {
resolvedRow.Result, _ = syncbase.NewValue(ctx, resolvedPrefix+keyPart(rowKey))
res.ResultSet[row.Key] = resolvedRow
continue
}
var localVal, remoteVal string
row.LocalValue.Get(&localVal)
row.RemoteValue.Get(&remoteVal)
if localVal == remoteVal {
if row.RemoteValue.WriteTs.After(row.LocalValue.WriteTs) {
resolvedRow.Result = &row.RemoteValue
} else {
resolvedRow.Result = &row.LocalValue
}
} else {
resolvedRow.Result, _ = syncbase.NewValue(ctx, resolvedPrefix+keyPart(rowKey))
}
res.ResultSet[row.Key] = resolvedRow
}
return res
}
func keyPart(rowKey string) string {
return common.SplitKeyParts(rowKey)[1]
}