blob: 618655e6236ffcea4ace7fe6ff186cbf4aa564a2 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncbase
import (
"testing"
"time"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase/crtestutil"
)
type resolverImpl1 struct {
}
func (ri *resolverImpl1) OnConflict(ctx *context.T, conflict *Conflict) (r Resolution) {
return
}
// TestCrConnectionClose tests if db.Close() correctly shuts down the goroutine
// that runs conflict resolution code in an infinite loop.
// Setup:
// - A mutex lock is used to cause Advance() on the receive stream to block.
// This lock is held by the test before starting off the CR thread.
// - ctx.Cancel() is simulated by releasing the mutex lock. This is done right
// after calling db.Close()
// - The mutex is held again by the test to cause the CR thread to block on
// Advance() in case the CR thead did not shutdown. This will lead to
// failure of the test.
// Expected result:
// - The CR thread should not get/remain blocked on Advance() after db.Close()
func TestCrConnectionClose(t *testing.T) {
db := newDatabase("parentFullName", wire.Id{"a1", "db1"}, getSchema(&resolverImpl1{}))
advance := func(st *crtestutil.State) bool {
st.SetIsBlocked(true)
st.Mu.Lock()
defer st.Mu.Unlock()
st.SetIsBlocked(false)
return false
}
st := &crtestutil.State{}
crStream := &crtestutil.CrStreamImpl{
C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
R: &crtestutil.ResolutionStreamImpl{St: st},
}
db.c = crtestutil.MockDbClient(db.c, crStream)
db.crState.reconnectWaitTime = 10 * time.Millisecond
ctx, cancel := context.RootContext()
st.Mu.Lock() // causes Advance() to block
db.EnforceSchema(ctx) // kicks off the CR thread
for i := 0; i < 100 && !st.GetIsBlocked(); i++ { // wait till Advance() call is blocked
time.Sleep(time.Millisecond)
}
db.Close()
st.Mu.Unlock() // simulate ctx cancel()
for i := 0; i < 100 && st.GetIsBlocked(); i++ {
time.Sleep(time.Millisecond)
}
st.Mu.Lock() // causes Advance() to block if called again
// let conflict resolution routine complete its cleanup.
// If the code does not work as intended then it will end up reconnecting
// the stream and blocking on the mutex
time.Sleep(time.Millisecond)
if st.GetIsBlocked() {
t.Error("Error: The conflict resolution routine did not die")
}
cancel()
}
// TestCrConnectionReestablish tests if the CR thread reestablishes a broken
// connection stream with syncbase.
// Setup:
// - Advance() is allowed to return immediately 3 times with return value
// false signifying 3 consecutive breaks in the connection.
// - A mutex lock is used to cause Advance() on the recieve stream to block
// on the 4th attempt. This lock is held by the test before starting off the
// CR thread. This simulates a stable connection between CR and syncbase.
// Expected result:
// - Advance() should get blocked after some time.
// - st.advanceCount should be equal to 3
func TestCrConnectionReestablish(t *testing.T) {
db := newDatabase("parentFullName", wire.Id{"a1", "db1"}, getSchema(&resolverImpl1{}))
advance := func(st *crtestutil.State) bool {
st.IncrementAdvanceCount()
if st.GetAdvanceCount() > 2 {
// Connection stays stable after 3 attempts
st.SetIsBlocked(true)
st.Mu.Lock()
defer st.Mu.Unlock()
}
st.SetIsBlocked(false)
return false
}
st := &crtestutil.State{}
crStream := &crtestutil.CrStreamImpl{
C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
R: &crtestutil.ResolutionStreamImpl{St: st},
}
db.c = crtestutil.MockDbClient(db.c, crStream)
db.crState.reconnectWaitTime = 10 * time.Millisecond
ctx, cancel := context.RootContext()
st.Mu.Lock() // causes Advance() to block
db.EnforceSchema(ctx)
for i := 0; i < 100 && !st.GetIsBlocked(); i++ {
time.Sleep(time.Millisecond) // wait till Advance() call is blocked
}
if !st.GetIsBlocked() {
t.Error("Error: Advance() did not block")
}
if st.GetAdvanceCount() != 3 {
t.Errorf("Error: Advance() expected to be called 3 times, instead called %d times", st.GetAdvanceCount())
}
// Shutdown the cr routine
db.Close()
st.Mu.Unlock()
cancel()
}
func getSchema(cr ConflictResolver) *Schema {
return &Schema{
Metadata: wire.SchemaMetadata{},
Resolver: cr,
}
}