blob: b535f467c0115372b86a40f8ef5317fadd9aa131 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runloop
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"v.io/v23/context"
tu "v.io/x/ref/test/testutil"
"v.io/x/sensorlog/internal/measure/sampler"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/util"
)
// Time delay quantum. It is assumed that several goroutines can reliably
// execute trivial code and yield within this time.
// TODO(ivanpi): Replace with logical clock that counts active goroutines.
const qdelay = 100 * time.Millisecond
// delayWriter implements a DataCallback that waits for a configurable delay
// before writing the received VDataError. It also allows asserting that the
// expected data has been written.
type delayWriter struct {
delay time.Duration
mu sync.Mutex
output string
}
// write sleeps for delay and, if not cancelled, writes the VDataError.
// Sleep is not interrupted by cancellation.
func (dw *delayWriter) write(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
time.Sleep(dw.delay)
select {
case <-ctx.Done():
return nil
default:
}
dw.mu.Lock()
defer dw.mu.Unlock()
if msg, ok := val.(sbmodel.VDataPointError); ok {
dw.output += string(msg.Value)
} else {
return fmt.Errorf("writer expected VDataError, got: %v", val)
}
return nil
}
// assert asserts that the written VDataErrors, concatenated, equal expected.
func (dw *delayWriter) assert(t *testing.T, expected string) {
dw.mu.Lock()
defer dw.mu.Unlock()
if got, want := dw.output, expected; got != want {
t.Error(tu.FormatLogLine(2, "unexpected output: got %q, want %q", got, want))
}
}
// newEchoDelaySampler returns a sampleRunner that expects mock scripts of the
// form '<delay>|<output>'. The sampleRunner sleeps for <delay> and, if not
// cancelled, calls cb with the VDataError of the form '<output>@<time>;',
// where <time> is the time when the sampleRunner was started, expressed as the
// number of whole qdelay intervals since start.
func newEchoDelaySampler(start time.Time) sampleRunner {
return func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error {
timestamp := time.Now()
parts := strings.SplitN(samDef.Script, "|", 2)
delay, err := time.ParseDuration(parts[0])
if err != nil {
panic(err)
}
select {
case <-ctx.Done():
return nil
case <-time.After(delay):
}
timeDelta := timestamp.Sub(start).Nanoseconds() / time.Duration(qdelay).Nanoseconds()
result := fmt.Sprintf("%s@%d;", parts[1], timeDelta)
return cb(&sampler.MeasureResult{
Time: timestamp,
Data: sbmodel.VDataPointError{Value: result},
})
}
}
// newFatalErrorCb returns an ErrorCb that fails the test and cancels the
// context on any error.
func newFatalErrorCb(t *testing.T, ctxCancel func()) util.ErrorCb {
return func(err error) {
t.Errorf("measuring fatal error: %v", err)
ctxCancel()
}
}
// streamKey returns a test KStreamCfg for the streamId.
func streamKey(streamId string) *sbmodel.KStreamDef {
return &sbmodel.KStreamDef{
DevId: "test",
StreamId: streamId,
}
}
// samplerDef returns a test SamplerDef with a mock script as expected by an
// echoDelaySampler.
func samplerDef(start time.Time, intv time.Duration, out string, delay time.Duration) *sbmodel.SamplerDef {
return &sbmodel.SamplerDef{
Script: fmt.Sprintf("%v|%s", delay, out),
Start: start,
Interval: intv,
}
}
// scheduler allows scheduling functions with delays and waiting for all
// scheduled functions to finish (using wg). Cancel ctx to cancel pending
// functions. Functions are run under mutex (not concurrently).
type scheduler struct {
ctx *context.T
wg sync.WaitGroup
mu sync.Mutex
}
// schedule runs the specified function after the specified delay.
func (sc *scheduler) schedule(delay time.Duration, run func()) {
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
select {
case <-time.After(delay):
sc.mu.Lock()
defer sc.mu.Unlock()
run()
case <-sc.ctx.Done():
}
}()
}
// Tests that samplers run with correct time offsets until unregistered.
func TestLoopUnregister(t *testing.T) {
ctx, ctxCancel := context.RootContext()
defer ctxCancel()
sc := scheduler{ctx: ctx}
start := time.Now()
ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
// Start at -3, measuring every 2, active 0 to 10 -> 1, 3, 5, 7, 9
dwPast := &delayWriter{delay: 0}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
})
sc.schedule(10*qdelay, func() { ml.Unregister(streamKey("s1")) })
// Start at 3, measuring every 2, active 0 to 6 -> 3, 5
dwFuture := &delayWriter{delay: 0}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(3*qdelay), 2*qdelay, "47", 0), dwFuture.write)
})
sc.schedule(6*qdelay, func() { ml.Unregister(streamKey("s2")) })
sc.wg.Wait()
ml.WaitAll()
dwPast.assert(t, "42@1;42@3;42@5;42@7;42@9;") // time 1, 3, 5, 7, 9
dwFuture.assert(t, "47@3;47@5;") // time 3, 5
}
// Tests that samplers run with correct time offsets until the context is
// cancelled.
func TestLoopCancel(t *testing.T) {
ctx, ctxCancel := context.RootContext()
defer ctxCancel()
sc := scheduler{ctx: ctx}
start := time.Now()
ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
// Start at -3, measuring every 2, active 2 to 8 (cancel) -> 3, 5, 7
dwPast := &delayWriter{delay: 0}
sc.schedule(2*qdelay, func() {
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
})
// Start at 1, measuring every 2, active 0 to 8 (cancel) -> 1, 3, 5, 7
dwFuture := &delayWriter{delay: 0}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 0), dwFuture.write)
})
sc.schedule(8*qdelay, ctxCancel)
sc.wg.Wait()
ml.WaitAll()
dwPast.assert(t, "42@3;42@5;42@7;") // time 3, 5, 7
dwFuture.assert(t, "47@1;47@3;47@5;47@7;") // time 1, 3, 5, 7
}
// Tests that no new measurement for a stream is started until the previous
// measurement has been written, skipping intervals if necessary. No data
// points should be written after the sampler is unregistered.
func TestLoopSlow(t *testing.T) {
ctx, ctxCancel := context.RootContext()
defer ctxCancel()
sc := scheduler{ctx: ctx}
start := time.Now()
ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
// Start at 1, measuring every 3, active 0 to 14, sample takes 3, write takes 1:
// 1 -> finish at 5
// 4 skipped
// 7 -> finish at 11
// 10 skipped
// 13 -> would finish at 17, cancelled during sample
dwSlowSample := &delayWriter{delay: 1 * qdelay}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 3*qdelay), dwSlowSample.write)
})
sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
// Start at 1, measuring every 3, active 0 to 18, sample takes 1, write takes 3:
// 1 -> finish at 5
// 4 skipped
// 7 -> finish at 11
// 10 skipped
// 13 -> finish at 17
// 16 -> would finish at 20, cancelled during write
dwSlowWrite := &delayWriter{delay: 3 * qdelay}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 3*qdelay, "47", 1*qdelay), dwSlowWrite.write)
})
sc.schedule(18*qdelay, func() { ml.Unregister(streamKey("s2")) })
sc.wg.Wait()
ml.WaitAll()
dwSlowSample.assert(t, "42@1;42@7;") // time 1, 7
dwSlowWrite.assert(t, "47@1;47@7;47@13;") // time 1, 7, 13
}
// Tests that re-registering an already registered stream cancels and waits for
// the existing workers to stop before starting updated ones.
func TestLoopReregister(t *testing.T) {
ctx, ctxCancel := context.RootContext()
defer ctxCancel()
sc := scheduler{ctx: ctx}
start := time.Now()
ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
// 42: Start at 1, measuring every 2, active 0 to 6 -> 1, 3, 5
// 84: Start at 9, measuring every 3, active 6 to 14 -> 9, 12
dwNoDelay := &delayWriter{delay: 0}
sc.schedule(0*qdelay, func() {
ml.Unregister(streamKey("s1"))
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 2*qdelay, "42", 0), dwNoDelay.write)
})
sc.schedule(6*qdelay, func() {
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(9*qdelay), 3*qdelay, "84", 0), dwNoDelay.write)
})
sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
// 1 -> sample until 6
// 3 skipped
// 5 skipped
// 7 -> sample would take until 12, cancelled
// re-Register at 8, effective at 8 (sample wait is interruptible)
// 9 -> sample until 13
// 12 skipped
// 15 -> sample until 19
// 18 skipped
dwSlowSample := &delayWriter{delay: 0}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 5*qdelay), dwSlowSample.write)
})
sc.schedule(8*qdelay, func() {
ml.Unregister(streamKey("s2"))
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 3*qdelay, "94", 4*qdelay), dwSlowSample.write)
})
sc.schedule(20*qdelay, func() { ml.Unregister(streamKey("s2")) })
// 1 -> write until 6
// 4 skipped
// 7 -> write would take until 12, cancelled
// re-Register starts at 8, effective at 12 (write wait is non-interruptible)
// 13 -> write until 18
// 15 skipped
// 18 -> would write until 23, cancelled
// 20 skipped
dwSlowWrite := &delayWriter{delay: 5 * qdelay}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(1*qdelay), 3*qdelay, "foo", 0), dwSlowWrite.write)
})
sc.schedule(8*qdelay, func() {
ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(-1*qdelay), 2*qdelay, "bar", 0), dwSlowWrite.write)
})
sc.schedule(21*qdelay, func() { ml.Unregister(streamKey("s3")) })
sc.wg.Wait()
ml.WaitAll()
dwNoDelay.assert(t, "42@1;42@3;42@5;"+"84@9;84@12;") // 42: time 1, 3, 5; 84: time 9, 12
dwSlowSample.assert(t, "47@1;"+"94@9;94@15;") // 47: time 1; 94: time 9, 15
dwSlowWrite.assert(t, "foo@1;"+"bar@13;") // foo: time 1; bar: time 13
}
// Tests that returning an error from the writer calls the fatal eror callback.
func TestWriteError(t *testing.T) {
ctx, ctxCancel := context.RootContext()
defer ctxCancel()
sc := scheduler{ctx: ctx}
start := time.Now()
errSentinel := fmt.Errorf("errSentinel")
ml := newLoopWithRunner(func(err error) {
if got, want := err, errSentinel; got != want {
t.Errorf("%v unexpected fatal error: got %v, want %v", time.Now().Sub(start).Nanoseconds()/time.Duration(qdelay).Nanoseconds(), got, want)
}
ctxCancel()
}, newEchoDelaySampler(start))
// 1 -> write start at 1, finish at 3
// 4 -> write start at 4, finish at 6
// 7 -> write start at 7, cancelled at 8 (error)
dwNoDelay := &delayWriter{delay: 2 * qdelay}
sc.schedule(0*qdelay, func() {
ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 0), dwNoDelay.write)
})
// error: Start at 6, measure delay 2 -> write error at 8
sc.schedule(2*qdelay, func() {
ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 4*qdelay, "47", 2*qdelay),
func(_ *context.T, _ *sbmodel.KDataPoint, _ sbmodel.VDataPoint) error {
return errSentinel
})
})
sc.wg.Wait()
ml.WaitAll()
dwNoDelay.assert(t, "42@1;42@4;") // 42: time 1, 4
}