| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package runloop implements an array of data measuring workers, one per |
| // stream, periodically running the configured sampling scripts. |
| package runloop |
| |
| import ( |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/x/sensorlog_lite/internal/measure/sampler" |
| "v.io/x/sensorlog_lite/internal/sbmodel" |
| "v.io/x/sensorlog_lite/internal/util" |
| ) |
| |
| type DataCallback func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error |
| |
| // Loop is a pool of data measuring workers, at most one per stream. |
| // NOTE: Loop is not thread-safe. Register and WaitAll should not be called |
| // concurrently. |
| type Loop interface { |
| // Register starts a measuring worker for the stream specified by streamKey |
| // and samDef, cancelling and replacing any existing worker for the same |
| // stream. |
| // The worker executes the sampling script every integer number of Intervals |
| // starting from Start, calling cb with the measured VDataPoint. |
| // The sampling script and the callback are guaranteed to each have at most |
| // one instance running in parallel. |
| // The new worker will not start until any existing worker for the same |
| // stream has exited. |
| Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) |
| |
| // Unregister cancels any existing measuring worker for the stream specified |
| // by streamKey. |
| Unregister(streamKey *sbmodel.KStreamDef) |
| |
| // WaitAll waits for all currently running measuring workers to exit. |
| WaitAll() |
| } |
| |
| type loopImpl struct { |
| tasks map[string]measureTask |
| runner sampleRunner |
| fatalCb util.ErrorCb |
| } |
| |
| // NewLoop returns a new measuring worker pool. |
| func NewLoop(fatalCb util.ErrorCb) Loop { |
| return newLoopWithRunner(fatalCb, sampler.Run) |
| } |
| |
| // sampleRunner should run the sampling script once, producing a single |
| // VDataPoint and passing it to cb. |
| type sampleRunner func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error |
| |
| // Constructor for testing with mock sampleRunner. |
| func newLoopWithRunner(fatalCb util.ErrorCb, runner sampleRunner) Loop { |
| return &loopImpl{ |
| tasks: make(map[string]measureTask, 16), |
| runner: runner, |
| fatalCb: fatalCb, |
| } |
| } |
| |
| // Register implements Loop.Register. |
| func (ml *loopImpl) Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) { |
| ctx, cancel := context.WithCancel(ctx) |
| |
| oldTask, hasOldTask := ml.tasks[streamKey.Key()] |
| if hasOldTask { |
| // Cancel existing worker, if any. |
| oldTask.cancel() |
| } |
| |
| // Start new worker that first waits for the existing worker to exit, then |
| // starts measuring. |
| waitLoop := util.AsyncRun(func() error { |
| if hasOldTask { |
| // Wait for existing worker, if any, to exit. |
| _ = oldTask.wait() |
| } |
| |
| return ml.measureWorker(ctx, streamKey, samDef, dataCb) |
| }, ml.fatalCb) |
| |
| // Register new worker handle. |
| ml.tasks[streamKey.Key()] = measureTask{ |
| cancel: cancel, |
| wait: waitLoop, |
| } |
| } |
| |
| // getDelay computes the delay from now to samDef.Start or, if it has passed, |
| // the next multiple of samDef.Interval since samDef.Start. |
| func getDelay(samDef *sbmodel.SamplerDef) time.Duration { |
| now := time.Now() |
| if now.Before(samDef.Start) { |
| // Start time has not yet passed, wait until start time. |
| return samDef.Start.Sub(now) |
| } else { |
| // Start time has passed, wait until next multiple of interval since the |
| // start time. |
| diff := now.Sub(samDef.Start).Nanoseconds() |
| intv := samDef.Interval.Nanoseconds() |
| return time.Duration(intv - (diff % intv)) |
| } |
| } |
| |
| // measureWorker runs the measuring script on every samDef.Interval, starting |
| // from samDef.Start, and passes the result to dataCb. Measure intervals are |
| // skipped if the measuring script or dataCb is still running. The worker exits |
| // when ctx is cancelled or the measuring script or dataCb return an error. |
| func (ml *loopImpl) measureWorker(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) error { |
| for { |
| delay := getDelay(samDef) |
| // Wait for delay. If not cancelled while waiting, proceed to measure. |
| select { |
| case <-ctx.Done(): |
| return nil |
| case <-time.After(delay): |
| } |
| |
| // Run the measuring script, passing the result to dataCb. |
| if err := ml.runner(ctx, samDef, func(res *sampler.MeasureResult) error { |
| return dataCb(ctx, &sbmodel.KDataPoint{ |
| DevId: streamKey.DevId, |
| StreamId: streamKey.StreamId, |
| Timestamp: res.Time, |
| }, res.Data) |
| }); err != nil { |
| return err |
| } |
| } |
| } |
| |
| // Unregister implements Loop.Unregister. |
| func (ml *loopImpl) Unregister(streamKey *sbmodel.KStreamDef) { |
| if task, hasTask := ml.tasks[streamKey.Key()]; hasTask { |
| task.cancel() |
| } |
| // task is left in ml.tasks to allow WaitAll or a new worker to wait on it. |
| } |
| |
| // WaitAll implements Loop.WaitAll. |
| func (ml *loopImpl) WaitAll() { |
| for _, t := range ml.tasks { |
| _ = t.wait() |
| } |
| } |
| |
| // Worker handles for cancellation purposes. |
| type measureTask struct { |
| cancel func() |
| wait func() error |
| } |