| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package sampler implements an asynchronous worker for a single run of the |
| // sampling script. |
| package sampler |
| |
| import ( |
| "bytes" |
| "fmt" |
| "os/exec" |
| "strconv" |
| "strings" |
| "syscall" |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/x/sensorlog_lite/internal/config" |
| "v.io/x/sensorlog_lite/internal/sbmodel" |
| ) |
| |
| type MeasureCallback func(result *MeasureResult) error |
| |
| type MeasureResult struct { |
| Time time.Time |
| Data sbmodel.VDataPoint |
| } |
| |
| // Run synchronously runs the sampler script, producing a single data point or |
| // error message and passing it to the callback (also run synchronously). Fatal |
| // errors (including callback errors) are returned from Run. |
| // The script is terminated using SIGINT (followed by SIGKILL) to the process |
| // group if the context is cancelled. The callback will be called at most once |
| // per Run call (may not be called if cancelled or on fatal error). |
| func Run(ctx *context.T, samDef *sbmodel.SamplerDef, cb MeasureCallback) error { |
| var out bytes.Buffer |
| // Run sampling script using bash and capture output. |
| sh := exec.Command("setsid", "bash") |
| sh.Stdin = bytes.NewBufferString(samDef.Script) |
| sh.Stdout = &out |
| // TODO(ivanpi): Fail on any stderr output? |
| // TODO(ivanpi): Use end instead of begin timestamp? |
| timestamp := time.Now() |
| // Failure to start the script is a fatal error. |
| if err := sh.Start(); err != nil { |
| return fmt.Errorf("failed starting measuring script: %v", err) |
| } |
| waiter := make(chan error, 1) |
| // Start script asynchronously. |
| go func() { |
| waiter <- sh.Wait() |
| }() |
| // Wait for script exit or cancellation. |
| // TODO(ivanpi): Add script timeout. |
| select { |
| case <-ctx.Done(): |
| // If cancelled, send SIGINT to script process group. |
| syscall.Kill(-sh.Process.Pid, syscall.SIGINT) |
| // Wait for script to exit. After a delay, if not exited, send SIGKILL |
| // and wait. |
| select { |
| case <-waiter: |
| case <-time.After(config.ScriptKillDelay): |
| syscall.Kill(-sh.Process.Pid, syscall.SIGKILL) |
| <-waiter |
| } |
| return nil |
| case err := <-waiter: |
| // If script has exited, parse data point or get error. |
| if err != nil { |
| return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Script error: %v", err)}}) |
| } |
| measured, err := strconv.ParseFloat(strings.TrimSpace(out.String()), 64) |
| if err != nil { |
| return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Parse error: %v", err)}}) |
| } |
| return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointValue{Value: measured}}) |
| } |
| } |