// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package runloop

import (
	"fmt"
	"strings"
	"sync"
	"testing"
	"time"

	"v.io/v23/context"
	tu "v.io/x/ref/test/testutil"
	"v.io/x/sensorlog_lite/internal/measure/sampler"
	"v.io/x/sensorlog_lite/internal/sbmodel"
	"v.io/x/sensorlog_lite/internal/util"
)

// Time delay quantum. It is assumed that several goroutines can reliably
// execute trivial code and yield within this time.
// TODO(ivanpi): Replace with logical clock that counts active goroutines.
const qdelay = 100 * time.Millisecond

// delayWriter implements a DataCallback that waits for a configurable delay
// before writing the received VDataError. It also allows asserting that the
// expected data has been written.
type delayWriter struct {
	delay  time.Duration
	mu     sync.Mutex
	output string
}

// write sleeps for delay and, if not cancelled, writes the VDataError.
// Sleep is not interrupted by cancellation.
func (dw *delayWriter) write(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
	time.Sleep(dw.delay)
	select {
	case <-ctx.Done():
		return nil
	default:
	}
	dw.mu.Lock()
	defer dw.mu.Unlock()
	if msg, ok := val.(sbmodel.VDataPointError); ok {
		dw.output += string(msg.Value)
	} else {
		return fmt.Errorf("writer expected VDataError, got: %v", val)
	}
	return nil
}

// assert asserts that the written VDataErrors, concatenated, equal expected.
func (dw *delayWriter) assert(t *testing.T, expected string) {
	dw.mu.Lock()
	defer dw.mu.Unlock()
	if got, want := dw.output, expected; got != want {
		t.Error(tu.FormatLogLine(2, "unexpected output: got %q, want %q", got, want))
	}
}

// newEchoDelaySampler returns a sampleRunner that expects mock scripts of the
// form '<delay>|<output>'. The sampleRunner sleeps for <delay> and, if not
// cancelled, calls cb with the VDataError of the form '<output>@<time>;',
// where <time> is the time when the sampleRunner was started, expressed as the
// number of whole qdelay intervals since start.
func newEchoDelaySampler(start time.Time) sampleRunner {
	return func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error {
		timestamp := time.Now()
		parts := strings.SplitN(samDef.Script, "|", 2)
		delay, err := time.ParseDuration(parts[0])
		if err != nil {
			panic(err)
		}
		select {
		case <-ctx.Done():
			return nil
		case <-time.After(delay):
		}
		timeDelta := timestamp.Sub(start).Nanoseconds() / time.Duration(qdelay).Nanoseconds()
		result := fmt.Sprintf("%s@%d;", parts[1], timeDelta)
		return cb(&sampler.MeasureResult{
			Time: timestamp,
			Data: sbmodel.VDataPointError{Value: result},
		})
	}
}

// newFatalErrorCb returns an ErrorCb that fails the test and cancels the
// context on any error.
func newFatalErrorCb(t *testing.T, ctxCancel func()) util.ErrorCb {
	return func(err error) {
		t.Errorf("measuring fatal error: %v", err)
		ctxCancel()
	}
}

// streamKey returns a test KStreamCfg for the streamId.
func streamKey(streamId string) *sbmodel.KStreamDef {
	return &sbmodel.KStreamDef{
		DevId:    "test",
		StreamId: streamId,
	}
}

// samplerDef returns a test SamplerDef with a mock script as expected by an
// echoDelaySampler.
func samplerDef(start time.Time, intv time.Duration, out string, delay time.Duration) *sbmodel.SamplerDef {
	return &sbmodel.SamplerDef{
		Script:   fmt.Sprintf("%v|%s", delay, out),
		Start:    start,
		Interval: intv,
	}
}

// scheduler allows scheduling functions with delays and waiting for all
// scheduled functions to finish (using wg). Cancel ctx to cancel pending
// functions. Functions are run under mutex (not concurrently).
type scheduler struct {
	ctx *context.T
	wg  sync.WaitGroup
	mu  sync.Mutex
}

// schedule runs the specified function after the specified delay.
func (sc *scheduler) schedule(delay time.Duration, run func()) {
	sc.wg.Add(1)
	go func() {
		defer sc.wg.Done()
		select {
		case <-time.After(delay):
			sc.mu.Lock()
			defer sc.mu.Unlock()
			run()
		case <-sc.ctx.Done():
		}
	}()
}

// Tests that samplers run with correct time offsets until unregistered.
func TestLoopUnregister(t *testing.T) {
	ctx, ctxCancel := context.RootContext()
	defer ctxCancel()
	sc := scheduler{ctx: ctx}
	start := time.Now()
	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))

	// Start at -3, measuring every 2, active 0 to 10 -> 1, 3, 5, 7, 9
	dwPast := &delayWriter{delay: 0}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
	})
	sc.schedule(10*qdelay, func() { ml.Unregister(streamKey("s1")) })

	// Start at 3, measuring every 2, active 0 to 6 -> 3, 5
	dwFuture := &delayWriter{delay: 0}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(3*qdelay), 2*qdelay, "47", 0), dwFuture.write)
	})
	sc.schedule(6*qdelay, func() { ml.Unregister(streamKey("s2")) })

	sc.wg.Wait()
	ml.WaitAll()

	dwPast.assert(t, "42@1;42@3;42@5;42@7;42@9;") // time 1, 3, 5, 7, 9
	dwFuture.assert(t, "47@3;47@5;")              // time 3, 5
}

// Tests that samplers run with correct time offsets until the context is
// cancelled.
func TestLoopCancel(t *testing.T) {
	ctx, ctxCancel := context.RootContext()
	defer ctxCancel()
	sc := scheduler{ctx: ctx}
	start := time.Now()
	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))

	// Start at -3, measuring every 2, active 2 to 8 (cancel) -> 3, 5, 7
	dwPast := &delayWriter{delay: 0}
	sc.schedule(2*qdelay, func() {
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
	})

	// Start at 1, measuring every 2, active 0 to 8 (cancel) -> 1, 3, 5, 7
	dwFuture := &delayWriter{delay: 0}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 0), dwFuture.write)
	})

	sc.schedule(8*qdelay, ctxCancel)

	sc.wg.Wait()
	ml.WaitAll()

	dwPast.assert(t, "42@3;42@5;42@7;")        // time 3, 5, 7
	dwFuture.assert(t, "47@1;47@3;47@5;47@7;") // time 1, 3, 5, 7
}

// Tests that no new measurement for a stream is started until the previous
// measurement has been written, skipping intervals if necessary. No data
// points should be written after the sampler is unregistered.
func TestLoopSlow(t *testing.T) {
	ctx, ctxCancel := context.RootContext()
	defer ctxCancel()
	sc := scheduler{ctx: ctx}
	start := time.Now()
	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))

	// Start at 1, measuring every 3, active 0 to 14, sample takes 3, write takes 1:
	// 1 -> finish at 5
	// 4 skipped
	// 7 -> finish at 11
	// 10 skipped
	// 13 -> would finish at 17, cancelled during sample
	dwSlowSample := &delayWriter{delay: 1 * qdelay}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 3*qdelay), dwSlowSample.write)
	})
	sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })

	// Start at 1, measuring every 3, active 0 to 18, sample takes 1, write takes 3:
	// 1 -> finish at 5
	// 4 skipped
	// 7 -> finish at 11
	// 10 skipped
	// 13 -> finish at 17
	// 16 -> would finish at 20, cancelled during write
	dwSlowWrite := &delayWriter{delay: 3 * qdelay}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 3*qdelay, "47", 1*qdelay), dwSlowWrite.write)
	})
	sc.schedule(18*qdelay, func() { ml.Unregister(streamKey("s2")) })

	sc.wg.Wait()
	ml.WaitAll()

	dwSlowSample.assert(t, "42@1;42@7;")      // time 1, 7
	dwSlowWrite.assert(t, "47@1;47@7;47@13;") // time 1, 7, 13
}

// Tests that re-registering an already registered stream cancels and waits for
// the existing workers to stop before starting updated ones.
func TestLoopReregister(t *testing.T) {
	ctx, ctxCancel := context.RootContext()
	defer ctxCancel()
	sc := scheduler{ctx: ctx}
	start := time.Now()
	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))

	// 42: Start at 1, measuring every 2, active 0 to 6 -> 1, 3, 5
	// 84: Start at 9, measuring every 3, active 6 to 14 -> 9, 12
	dwNoDelay := &delayWriter{delay: 0}
	sc.schedule(0*qdelay, func() {
		ml.Unregister(streamKey("s1"))
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 2*qdelay, "42", 0), dwNoDelay.write)
	})
	sc.schedule(6*qdelay, func() {
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(9*qdelay), 3*qdelay, "84", 0), dwNoDelay.write)
	})
	sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })

	// 1 -> sample until 6
	// 3 skipped
	// 5 skipped
	// 7 -> sample would take until 12, cancelled
	// re-Register at 8, effective at 8 (sample wait is interruptible)
	// 9 -> sample until 13
	// 12 skipped
	// 15 -> sample until 19
	// 18 skipped
	dwSlowSample := &delayWriter{delay: 0}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 5*qdelay), dwSlowSample.write)
	})
	sc.schedule(8*qdelay, func() {
		ml.Unregister(streamKey("s2"))
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 3*qdelay, "94", 4*qdelay), dwSlowSample.write)
	})
	sc.schedule(20*qdelay, func() { ml.Unregister(streamKey("s2")) })

	// 1 -> write until 6
	// 4 skipped
	// 7 -> write would take until 12, cancelled
	// re-Register starts at 8, effective at 12 (write wait is non-interruptible)
	// 13 -> write until 18
	// 15 skipped
	// 18 -> would write until 23, cancelled
	// 20 skipped
	dwSlowWrite := &delayWriter{delay: 5 * qdelay}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(1*qdelay), 3*qdelay, "foo", 0), dwSlowWrite.write)
	})
	sc.schedule(8*qdelay, func() {
		ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(-1*qdelay), 2*qdelay, "bar", 0), dwSlowWrite.write)
	})
	sc.schedule(21*qdelay, func() { ml.Unregister(streamKey("s3")) })

	sc.wg.Wait()
	ml.WaitAll()

	dwNoDelay.assert(t, "42@1;42@3;42@5;"+"84@9;84@12;") // 42: time 1, 3, 5; 84: time 9, 12
	dwSlowSample.assert(t, "47@1;"+"94@9;94@15;")        // 47: time 1; 94: time 9, 15
	dwSlowWrite.assert(t, "foo@1;"+"bar@13;")            // foo: time 1; bar: time 13
}

// Tests that returning an error from the writer calls the fatal eror callback.
func TestWriteError(t *testing.T) {
	ctx, ctxCancel := context.RootContext()
	defer ctxCancel()
	sc := scheduler{ctx: ctx}
	start := time.Now()
	errSentinel := fmt.Errorf("errSentinel")
	ml := newLoopWithRunner(func(err error) {
		if got, want := err, errSentinel; got != want {
			t.Errorf("%v unexpected fatal error: got %v, want %v", time.Now().Sub(start).Nanoseconds()/time.Duration(qdelay).Nanoseconds(), got, want)
		}
		ctxCancel()
	}, newEchoDelaySampler(start))

	// 1 -> write start at 1, finish at 3
	// 4 -> write start at 4, finish at 6
	// 7 -> write start at 7, cancelled at 8 (error)
	dwNoDelay := &delayWriter{delay: 2 * qdelay}
	sc.schedule(0*qdelay, func() {
		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 0), dwNoDelay.write)
	})

	// error: Start at 6, measure delay 2 -> write error at 8
	sc.schedule(2*qdelay, func() {
		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 4*qdelay, "47", 2*qdelay),
			func(_ *context.T, _ *sbmodel.KDataPoint, _ sbmodel.VDataPoint) error {
				return errSentinel
			})
	})

	sc.wg.Wait()
	ml.WaitAll()

	dwNoDelay.assert(t, "42@1;42@4;") // 42: time 1, 4
}
