// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package syncbase

import (
	"testing"
	"time"

	"v.io/v23/context"
	wire "v.io/v23/services/syncbase"
	"v.io/v23/syncbase/crtestutil"
)

type resolverImpl1 struct {
}

func (ri *resolverImpl1) OnConflict(ctx *context.T, conflict *Conflict) (r Resolution) {
	return
}

// TestCrConnectionClose tests if db.Close() correctly shuts down the goroutine
// that runs conflict resolution code in an infinite loop.
// Setup:
//   - A mutex lock is used to cause Advance() on the receive stream to block.
//     This lock is held by the test before starting off the CR thread.
//   - ctx.Cancel() is simulated by releasing the mutex lock. This is done right
//     after calling db.Close()
//   - The mutex is held again by the test to cause the CR thread to block on
//     Advance() in case the CR thead did not shutdown. This will lead to
//     failure of the test.
// Expected result:
//   - The CR thread should not get/remain blocked on Advance() after db.Close()
func TestCrConnectionClose(t *testing.T) {
	db := newDatabase("parentFullName", wire.Id{"a1", "db1"}, getSchema(&resolverImpl1{}))
	advance := func(st *crtestutil.State) bool {
		st.SetIsBlocked(true)
		st.Mu.Lock()
		defer st.Mu.Unlock()
		st.SetIsBlocked(false)
		return false
	}

	st := &crtestutil.State{}
	crStream := &crtestutil.CrStreamImpl{
		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
		R: &crtestutil.ResolutionStreamImpl{St: st},
	}
	db.c = crtestutil.MockDbClient(db.c, crStream)
	db.crState.reconnectWaitTime = 10 * time.Millisecond

	ctx, _ := context.RootContext()
	st.Mu.Lock()                                     // causes Advance() to block
	db.EnforceSchema(ctx)                            // kicks off the CR thread
	for i := 0; i < 100 && !st.GetIsBlocked(); i++ { // wait till Advance() call is blocked
		time.Sleep(time.Millisecond)
	}
	db.Close()
	st.Mu.Unlock() // simulate ctx cancel()
	for i := 0; i < 100 && st.GetIsBlocked(); i++ {
		time.Sleep(time.Millisecond)
	}

	st.Mu.Lock() // causes Advance() to block if called again
	// let conflict resolution routine complete its cleanup.
	// If the code does not work as intended then it will end up reconnecting
	// the stream and blocking on the mutex
	time.Sleep(time.Millisecond)
	if st.GetIsBlocked() {
		t.Error("Error: The conflict resolution routine did not die")
	}
}

// TestCrConnectionReestablish tests if the CR thread reestablishes a broken
// connection stream with syncbase.
// Setup:
//   - Advance() is allowed to return immediately 3 times with return value
//     false signifying 3 consecutive breaks in the connection.
//   - A mutex lock is used to cause Advance() on the recieve stream to block
//     on the 4th attempt. This lock is held by the test before starting off the
//     CR thread. This simulates a stable connection between CR and syncbase.
// Expected result:
//   - Advance() should get blocked after some time.
//   - st.advanceCount should be equal to 3
func TestCrConnectionReestablish(t *testing.T) {
	db := newDatabase("parentFullName", wire.Id{"a1", "db1"}, getSchema(&resolverImpl1{}))
	advance := func(st *crtestutil.State) bool {
		st.IncrementAdvanceCount()
		if st.GetAdvanceCount() > 2 {
			// Connection stays stable after 3 attempts
			st.SetIsBlocked(true)
			st.Mu.Lock()
			defer st.Mu.Unlock()
		}
		st.SetIsBlocked(false)
		return false
	}

	st := &crtestutil.State{}
	crStream := &crtestutil.CrStreamImpl{
		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
		R: &crtestutil.ResolutionStreamImpl{St: st},
	}
	db.c = crtestutil.MockDbClient(db.c, crStream)
	db.crState.reconnectWaitTime = 10 * time.Millisecond

	ctx, _ := context.RootContext()
	st.Mu.Lock() // causes Advance() to block
	db.EnforceSchema(ctx)
	for i := 0; i < 100 && !st.GetIsBlocked(); i++ {
		time.Sleep(time.Millisecond) // wait till Advance() call is blocked
	}

	if !st.GetIsBlocked() {
		t.Error("Error: Advance() did not block")
	}
	if st.GetAdvanceCount() != 3 {
		t.Errorf("Error: Advance() expected to be called 3 times, instead called %d times", st.GetAdvanceCount())
	}

	// Shutdown the cr routine
	db.Close()
	st.Mu.Unlock()
}

func getSchema(cr ConflictResolver) *Schema {
	return &Schema{
		Metadata: wire.SchemaMetadata{},
		Resolver: cr,
	}
}
