blob: 1f4ae19f8b4c4a45bb1678eaed42d7f07bd8477a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package runloop implements an array of data measuring workers, one per
// stream, periodically running the configured sampling scripts.
package runloop
import (
"time"
"v.io/v23/context"
"v.io/x/sensorlog/internal/measure/sampler"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/util"
)
type DataCallback func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
// Loop is a pool of data measuring workers, at most one per stream.
// NOTE: Loop is not thread-safe. Register and WaitAll should not be called
// concurrently.
type Loop interface {
// Register starts a measuring worker for the stream specified by streamKey
// and samDef, cancelling and replacing any existing worker for the same
// stream.
// The worker executes the sampling script every integer number of Intervals
// starting from Start, calling cb with the measured VDataPoint.
// The sampling script and the callback are guaranteed to each have at most
// one instance running in parallel.
// The new worker will not start until any existing worker for the same
// stream has exited.
Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback)
// Unregister cancels any existing measuring worker for the stream specified
// by streamKey.
Unregister(streamKey *sbmodel.KStreamDef)
// WaitAll waits for all currently running measuring workers to exit.
WaitAll()
}
type loopImpl struct {
tasks map[string]measureTask
runner sampleRunner
fatalCb util.ErrorCb
}
// NewLoop returns a new measuring worker pool.
func NewLoop(fatalCb util.ErrorCb) Loop {
return newLoopWithRunner(fatalCb, sampler.Run)
}
// sampleRunner should run the sampling script once, producing a single
// VDataPoint and passing it to cb.
type sampleRunner func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error
// Constructor for testing with mock sampleRunner.
func newLoopWithRunner(fatalCb util.ErrorCb, runner sampleRunner) Loop {
return &loopImpl{
tasks: make(map[string]measureTask, 16),
runner: runner,
fatalCb: fatalCb,
}
}
// Register implements Loop.Register.
func (ml *loopImpl) Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) {
ctx, cancel := context.WithCancel(ctx)
oldTask, hasOldTask := ml.tasks[streamKey.Key()]
if hasOldTask {
// Cancel existing worker, if any.
oldTask.cancel()
}
// Start new worker that first waits for the existing worker to exit, then
// starts measuring.
waitLoop := util.AsyncRun(func() error {
if hasOldTask {
// Wait for existing worker, if any, to exit.
_ = oldTask.wait()
}
return ml.measureWorker(ctx, streamKey, samDef, dataCb)
}, ml.fatalCb)
// Register new worker handle.
ml.tasks[streamKey.Key()] = measureTask{
cancel: cancel,
wait: waitLoop,
}
}
// getDelay computes the delay from now to samDef.Start or, if it has passed,
// the next multiple of samDef.Interval since samDef.Start.
func getDelay(samDef *sbmodel.SamplerDef) time.Duration {
now := time.Now()
if now.Before(samDef.Start) {
// Start time has not yet passed, wait until start time.
return samDef.Start.Sub(now)
} else {
// Start time has passed, wait until next multiple of interval since the
// start time.
diff := now.Sub(samDef.Start).Nanoseconds()
intv := samDef.Interval.Nanoseconds()
return time.Duration(intv - (diff % intv))
}
}
// measureWorker runs the measuring script on every samDef.Interval, starting
// from samDef.Start, and passes the result to dataCb. Measure intervals are
// skipped if the measuring script or dataCb is still running. The worker exits
// when ctx is cancelled or the measuring script or dataCb return an error.
func (ml *loopImpl) measureWorker(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) error {
for {
delay := getDelay(samDef)
// Wait for delay. If not cancelled while waiting, proceed to measure.
select {
case <-ctx.Done():
return nil
case <-time.After(delay):
}
// Run the measuring script, passing the result to dataCb.
if err := ml.runner(ctx, samDef, func(res *sampler.MeasureResult) error {
return dataCb(ctx, &sbmodel.KDataPoint{
DevId: streamKey.DevId,
StreamId: streamKey.StreamId,
Timestamp: res.Time,
}, res.Data)
}); err != nil {
return err
}
}
}
// Unregister implements Loop.Unregister.
func (ml *loopImpl) Unregister(streamKey *sbmodel.KStreamDef) {
if task, hasTask := ml.tasks[streamKey.Key()]; hasTask {
task.cancel()
}
// task is left in ml.tasks to allow WaitAll or a new worker to wait on it.
}
// WaitAll implements Loop.WaitAll.
func (ml *loopImpl) WaitAll() {
for _, t := range ml.tasks {
_ = t.wait()
}
}
// Worker handles for cancellation purposes.
type measureTask struct {
cancel func()
wait func() error
}