| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package runloop |
| |
| import ( |
| "fmt" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "v.io/v23/context" |
| tu "v.io/x/ref/test/testutil" |
| "v.io/x/sensorlog/internal/measure/sampler" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/util" |
| ) |
| |
| // Time delay quantum. It is assumed that several goroutines can reliably |
| // execute trivial code and yield within this time. |
| // TODO(ivanpi): Replace with logical clock that counts active goroutines. |
| const qdelay = 100 * time.Millisecond |
| |
| // delayWriter implements a DataCallback that waits for a configurable delay |
| // before writing the received VDataError. It also allows asserting that the |
| // expected data has been written. |
| type delayWriter struct { |
| delay time.Duration |
| mu sync.Mutex |
| output string |
| } |
| |
| // write sleeps for delay and, if not cancelled, writes the VDataError. |
| // Sleep is not interrupted by cancellation. |
| func (dw *delayWriter) write(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error { |
| time.Sleep(dw.delay) |
| select { |
| case <-ctx.Done(): |
| return nil |
| default: |
| } |
| dw.mu.Lock() |
| defer dw.mu.Unlock() |
| if msg, ok := val.(sbmodel.VDataPointError); ok { |
| dw.output += string(msg.Value) |
| } else { |
| return fmt.Errorf("writer expected VDataError, got: %v", val) |
| } |
| return nil |
| } |
| |
| // assert asserts that the written VDataErrors, concatenated, equal expected. |
| func (dw *delayWriter) assert(t *testing.T, expected string) { |
| dw.mu.Lock() |
| defer dw.mu.Unlock() |
| if got, want := dw.output, expected; got != want { |
| t.Error(tu.FormatLogLine(2, "unexpected output: got %q, want %q", got, want)) |
| } |
| } |
| |
| // newEchoDelaySampler returns a sampleRunner that expects mock scripts of the |
| // form '<delay>|<output>'. The sampleRunner sleeps for <delay> and, if not |
| // cancelled, calls cb with the VDataError of the form '<output>@<time>;', |
| // where <time> is the time when the sampleRunner was started, expressed as the |
| // number of whole qdelay intervals since start. |
| func newEchoDelaySampler(start time.Time) sampleRunner { |
| return func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error { |
| timestamp := time.Now() |
| parts := strings.SplitN(samDef.Script, "|", 2) |
| delay, err := time.ParseDuration(parts[0]) |
| if err != nil { |
| panic(err) |
| } |
| select { |
| case <-ctx.Done(): |
| return nil |
| case <-time.After(delay): |
| } |
| timeDelta := timestamp.Sub(start).Nanoseconds() / time.Duration(qdelay).Nanoseconds() |
| result := fmt.Sprintf("%s@%d;", parts[1], timeDelta) |
| return cb(&sampler.MeasureResult{ |
| Time: timestamp, |
| Data: sbmodel.VDataPointError{Value: result}, |
| }) |
| } |
| } |
| |
| // newFatalErrorCb returns an ErrorCb that fails the test and cancels the |
| // context on any error. |
| func newFatalErrorCb(t *testing.T, ctxCancel func()) util.ErrorCb { |
| return func(err error) { |
| t.Errorf("measuring fatal error: %v", err) |
| ctxCancel() |
| } |
| } |
| |
| // streamKey returns a test KStreamCfg for the streamId. |
| func streamKey(streamId string) *sbmodel.KStreamDef { |
| return &sbmodel.KStreamDef{ |
| DevId: "test", |
| StreamId: streamId, |
| } |
| } |
| |
| // samplerDef returns a test SamplerDef with a mock script as expected by an |
| // echoDelaySampler. |
| func samplerDef(start time.Time, intv time.Duration, out string, delay time.Duration) *sbmodel.SamplerDef { |
| return &sbmodel.SamplerDef{ |
| Script: fmt.Sprintf("%v|%s", delay, out), |
| Start: start, |
| Interval: intv, |
| } |
| } |
| |
| // scheduler allows scheduling functions with delays and waiting for all |
| // scheduled functions to finish (using wg). Cancel ctx to cancel pending |
| // functions. Functions are run under mutex (not concurrently). |
| type scheduler struct { |
| ctx *context.T |
| wg sync.WaitGroup |
| mu sync.Mutex |
| } |
| |
| // schedule runs the specified function after the specified delay. |
| func (sc *scheduler) schedule(delay time.Duration, run func()) { |
| sc.wg.Add(1) |
| go func() { |
| defer sc.wg.Done() |
| select { |
| case <-time.After(delay): |
| sc.mu.Lock() |
| defer sc.mu.Unlock() |
| run() |
| case <-sc.ctx.Done(): |
| } |
| }() |
| } |
| |
| // Tests that samplers run with correct time offsets until unregistered. |
| func TestLoopUnregister(t *testing.T) { |
| ctx, ctxCancel := context.RootContext() |
| defer ctxCancel() |
| sc := scheduler{ctx: ctx} |
| start := time.Now() |
| ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start)) |
| |
| // Start at -3, measuring every 2, active 0 to 10 -> 1, 3, 5, 7, 9 |
| dwPast := &delayWriter{delay: 0} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write) |
| }) |
| sc.schedule(10*qdelay, func() { ml.Unregister(streamKey("s1")) }) |
| |
| // Start at 3, measuring every 2, active 0 to 6 -> 3, 5 |
| dwFuture := &delayWriter{delay: 0} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(3*qdelay), 2*qdelay, "47", 0), dwFuture.write) |
| }) |
| sc.schedule(6*qdelay, func() { ml.Unregister(streamKey("s2")) }) |
| |
| sc.wg.Wait() |
| ml.WaitAll() |
| |
| dwPast.assert(t, "42@1;42@3;42@5;42@7;42@9;") // time 1, 3, 5, 7, 9 |
| dwFuture.assert(t, "47@3;47@5;") // time 3, 5 |
| } |
| |
| // Tests that samplers run with correct time offsets until the context is |
| // cancelled. |
| func TestLoopCancel(t *testing.T) { |
| ctx, ctxCancel := context.RootContext() |
| defer ctxCancel() |
| sc := scheduler{ctx: ctx} |
| start := time.Now() |
| ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start)) |
| |
| // Start at -3, measuring every 2, active 2 to 8 (cancel) -> 3, 5, 7 |
| dwPast := &delayWriter{delay: 0} |
| sc.schedule(2*qdelay, func() { |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write) |
| }) |
| |
| // Start at 1, measuring every 2, active 0 to 8 (cancel) -> 1, 3, 5, 7 |
| dwFuture := &delayWriter{delay: 0} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 0), dwFuture.write) |
| }) |
| |
| sc.schedule(8*qdelay, ctxCancel) |
| |
| sc.wg.Wait() |
| ml.WaitAll() |
| |
| dwPast.assert(t, "42@3;42@5;42@7;") // time 3, 5, 7 |
| dwFuture.assert(t, "47@1;47@3;47@5;47@7;") // time 1, 3, 5, 7 |
| } |
| |
| // Tests that no new measurement for a stream is started until the previous |
| // measurement has been written, skipping intervals if necessary. No data |
| // points should be written after the sampler is unregistered. |
| func TestLoopSlow(t *testing.T) { |
| ctx, ctxCancel := context.RootContext() |
| defer ctxCancel() |
| sc := scheduler{ctx: ctx} |
| start := time.Now() |
| ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start)) |
| |
| // Start at 1, measuring every 3, active 0 to 14, sample takes 3, write takes 1: |
| // 1 -> finish at 5 |
| // 4 skipped |
| // 7 -> finish at 11 |
| // 10 skipped |
| // 13 -> would finish at 17, cancelled during sample |
| dwSlowSample := &delayWriter{delay: 1 * qdelay} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 3*qdelay), dwSlowSample.write) |
| }) |
| sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) }) |
| |
| // Start at 1, measuring every 3, active 0 to 18, sample takes 1, write takes 3: |
| // 1 -> finish at 5 |
| // 4 skipped |
| // 7 -> finish at 11 |
| // 10 skipped |
| // 13 -> finish at 17 |
| // 16 -> would finish at 20, cancelled during write |
| dwSlowWrite := &delayWriter{delay: 3 * qdelay} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 3*qdelay, "47", 1*qdelay), dwSlowWrite.write) |
| }) |
| sc.schedule(18*qdelay, func() { ml.Unregister(streamKey("s2")) }) |
| |
| sc.wg.Wait() |
| ml.WaitAll() |
| |
| dwSlowSample.assert(t, "42@1;42@7;") // time 1, 7 |
| dwSlowWrite.assert(t, "47@1;47@7;47@13;") // time 1, 7, 13 |
| } |
| |
| // Tests that re-registering an already registered stream cancels and waits for |
| // the existing workers to stop before starting updated ones. |
| func TestLoopReregister(t *testing.T) { |
| ctx, ctxCancel := context.RootContext() |
| defer ctxCancel() |
| sc := scheduler{ctx: ctx} |
| start := time.Now() |
| ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start)) |
| |
| // 42: Start at 1, measuring every 2, active 0 to 6 -> 1, 3, 5 |
| // 84: Start at 9, measuring every 3, active 6 to 14 -> 9, 12 |
| dwNoDelay := &delayWriter{delay: 0} |
| sc.schedule(0*qdelay, func() { |
| ml.Unregister(streamKey("s1")) |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 2*qdelay, "42", 0), dwNoDelay.write) |
| }) |
| sc.schedule(6*qdelay, func() { |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(9*qdelay), 3*qdelay, "84", 0), dwNoDelay.write) |
| }) |
| sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) }) |
| |
| // 1 -> sample until 6 |
| // 3 skipped |
| // 5 skipped |
| // 7 -> sample would take until 12, cancelled |
| // re-Register at 8, effective at 8 (sample wait is interruptible) |
| // 9 -> sample until 13 |
| // 12 skipped |
| // 15 -> sample until 19 |
| // 18 skipped |
| dwSlowSample := &delayWriter{delay: 0} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 5*qdelay), dwSlowSample.write) |
| }) |
| sc.schedule(8*qdelay, func() { |
| ml.Unregister(streamKey("s2")) |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 3*qdelay, "94", 4*qdelay), dwSlowSample.write) |
| }) |
| sc.schedule(20*qdelay, func() { ml.Unregister(streamKey("s2")) }) |
| |
| // 1 -> write until 6 |
| // 4 skipped |
| // 7 -> write would take until 12, cancelled |
| // re-Register starts at 8, effective at 12 (write wait is non-interruptible) |
| // 13 -> write until 18 |
| // 15 skipped |
| // 18 -> would write until 23, cancelled |
| // 20 skipped |
| dwSlowWrite := &delayWriter{delay: 5 * qdelay} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(1*qdelay), 3*qdelay, "foo", 0), dwSlowWrite.write) |
| }) |
| sc.schedule(8*qdelay, func() { |
| ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(-1*qdelay), 2*qdelay, "bar", 0), dwSlowWrite.write) |
| }) |
| sc.schedule(21*qdelay, func() { ml.Unregister(streamKey("s3")) }) |
| |
| sc.wg.Wait() |
| ml.WaitAll() |
| |
| dwNoDelay.assert(t, "42@1;42@3;42@5;"+"84@9;84@12;") // 42: time 1, 3, 5; 84: time 9, 12 |
| dwSlowSample.assert(t, "47@1;"+"94@9;94@15;") // 47: time 1; 94: time 9, 15 |
| dwSlowWrite.assert(t, "foo@1;"+"bar@13;") // foo: time 1; bar: time 13 |
| } |
| |
| // Tests that returning an error from the writer calls the fatal eror callback. |
| func TestWriteError(t *testing.T) { |
| ctx, ctxCancel := context.RootContext() |
| defer ctxCancel() |
| sc := scheduler{ctx: ctx} |
| start := time.Now() |
| errSentinel := fmt.Errorf("errSentinel") |
| ml := newLoopWithRunner(func(err error) { |
| if got, want := err, errSentinel; got != want { |
| t.Errorf("%v unexpected fatal error: got %v, want %v", time.Now().Sub(start).Nanoseconds()/time.Duration(qdelay).Nanoseconds(), got, want) |
| } |
| ctxCancel() |
| }, newEchoDelaySampler(start)) |
| |
| // 1 -> write start at 1, finish at 3 |
| // 4 -> write start at 4, finish at 6 |
| // 7 -> write start at 7, cancelled at 8 (error) |
| dwNoDelay := &delayWriter{delay: 2 * qdelay} |
| sc.schedule(0*qdelay, func() { |
| ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 0), dwNoDelay.write) |
| }) |
| |
| // error: Start at 6, measure delay 2 -> write error at 8 |
| sc.schedule(2*qdelay, func() { |
| ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 4*qdelay, "47", 2*qdelay), |
| func(_ *context.T, _ *sbmodel.KDataPoint, _ sbmodel.VDataPoint) error { |
| return errSentinel |
| }) |
| }) |
| |
| sc.wg.Wait() |
| ml.WaitAll() |
| |
| dwNoDelay.assert(t, "42@1;42@4;") // 42: time 1, 4 |
| } |