sensorlog_lite: Sampling script runner.
Method for a single sampling script run, with cancellation support.
VDL types for measured data.
Change-Id: I816ee3a5135a8fda54b3624fd7783332d10235c0
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 1ce9f9a..7451931 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -6,6 +6,8 @@
package config
import (
+ "time"
+
"v.io/v23/naming"
)
@@ -16,6 +18,8 @@
DBName = "sldb"
SyncPriority = 42
+
+ ScriptKillDelay = 100 * time.Millisecond
)
func SyncgroupName(publishService, devId string) string {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
new file mode 100644
index 0000000..ee54ec1
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sampler implements an asynchronous worker for a single run of the
+// sampling script.
+package sampler
+
+import (
+ "bytes"
+ "fmt"
+ "os/exec"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+type MeasureCallback func(result *MeasureResult) error
+
+type MeasureResult struct {
+ Time time.Time
+ Data sbmodel.VDataPoint
+}
+
+// Run synchronously runs the sampler script, producing a single data point or
+// error message and passing it to the callback (also run synchronously). Fatal
+// errors (including callback errors) are returned from Run.
+// The script is terminated using SIGINT (followed by SIGKILL) to the process
+// group if the context is cancelled. The callback will be called at most once
+// per Run call (may not be called if cancelled or on fatal error).
+func Run(ctx *context.T, samDef *sbmodel.SamplerDef, cb MeasureCallback) error {
+ var out bytes.Buffer
+ // Run sampling script using bash and capture output.
+ sh := exec.Command("setsid", "bash")
+ sh.Stdin = bytes.NewBufferString(samDef.Script)
+ sh.Stdout = &out
+ // TODO(ivanpi): Fail on any stderr output?
+ // TODO(ivanpi): Use end instead of begin timestamp?
+ timestamp := time.Now()
+ // Failure to start the script is a fatal error.
+ if err := sh.Start(); err != nil {
+ return fmt.Errorf("failed starting measuring script: %v", err)
+ }
+ waiter := make(chan error, 1)
+ // Start script asynchronously.
+ go func() {
+ waiter <- sh.Wait()
+ }()
+ // Wait for script exit or cancellation.
+ // TODO(ivanpi): Add script timeout.
+ select {
+ case <-ctx.Done():
+ // If cancelled, send SIGINT to script process group.
+ syscall.Kill(-sh.Process.Pid, syscall.SIGINT)
+ // Wait for script to exit. After a delay, if not exited, send SIGKILL
+ // and wait.
+ select {
+ case <-waiter:
+ case <-time.After(config.ScriptKillDelay):
+ syscall.Kill(-sh.Process.Pid, syscall.SIGKILL)
+ <-waiter
+ }
+ return nil
+ case err := <-waiter:
+ // If script has exited, parse data point or get error.
+ if err != nil {
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Script error: %v", err)}})
+ }
+ measured, err := strconv.ParseFloat(strings.TrimSpace(out.String()), 64)
+ if err != nil {
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Parse error: %v", err)}})
+ }
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointValue{Value: measured}})
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
new file mode 100644
index 0000000..04466c4
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
@@ -0,0 +1,146 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sampler_test
+
+import (
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ tu "v.io/x/ref/test/testutil"
+ "v.io/x/sensorlog_lite/internal/measure/sampler"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// Runs the sampler script and verifies that the callback was called with the
+// expected value or error.
+func runSamplerTest(t *testing.T, samDef *sbmodel.SamplerDef) sbmodel.VDataPoint {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ var gotRes *sampler.MeasureResult
+ start := time.Now()
+ if err := sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+ gotRes = res
+ return nil
+ }); err != nil {
+ t.Fatal(tu.FormatLogLine(3, "unexpected sampling error: %v", err))
+ }
+ end := time.Now()
+
+ if gotRes == nil {
+ t.Fatal(tu.FormatLogLine(3, "sampler callback was not called"))
+ }
+
+ if gotRes.Time.Before(start) || gotRes.Time.After(end) {
+ t.Error(tu.FormatLogLine(3, "invalid time: %v, expected between %v and %v", gotRes.Time, start, end))
+ }
+
+ return gotRes.Data
+}
+
+func runSamplerTestForValue(t *testing.T, samDef *sbmodel.SamplerDef, expectVal float64) {
+ res := runSamplerTest(t, samDef)
+ switch res := res.(type) {
+ case sbmodel.VDataPointValue:
+ if res.Value != expectVal {
+ t.Error(tu.FormatLogLine(2, "unexpected value, got: %v, want: %v", res.Value, expectVal))
+ }
+ default:
+ t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want value: %v", res, expectVal))
+ }
+}
+
+func runSamplerTestForError(t *testing.T, samDef *sbmodel.SamplerDef, expectErrPrefix string) {
+ res := runSamplerTest(t, samDef)
+ switch res := res.(type) {
+ case sbmodel.VDataPointError:
+ if !strings.HasPrefix(res.Value, expectErrPrefix) {
+ t.Error(tu.FormatLogLine(2, "unexpected error, got: %v, want prefix: %v", res.Value, expectErrPrefix))
+ }
+ default:
+ t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want error with prefix: %v", res, expectErrPrefix))
+ }
+}
+
+func TestSamplerRun(t *testing.T) {
+ runSamplerTestForValue(t, &sbmodel.SamplerDef{
+ Script: `echo 42`,
+ Start: time.Now(),
+ Interval: time.Second, // 7.5M years overflows time.Duration
+ }, 42.0)
+
+ runSamplerTestForValue(t, &sbmodel.SamplerDef{
+ Script: `
+for i in $(seq 10000); do :; done
+printf "%f\n" -12.73
+exit 0
+`,
+ Start: time.Now(),
+ Interval: time.Millisecond,
+ }, -12.73)
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42; exit 1`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Script error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `exit 0`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42foo`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42; echo 47`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+}
+
+func TestSamplerCancel(t *testing.T) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ // The script sleeps for 10 seconds, not interruptible by SIGINT/SIGTERM.
+ samDef := &sbmodel.SamplerDef{
+ Script: `
+trap 'sleep 10' INT TERM
+sleep 10
+echo 42
+`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }
+ // Asynchronously run sampler, cancelling shortly after.
+ done := make(chan error, 1)
+ go func() {
+ done <- sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+ t.Errorf("worker was cancelled, result callback should not have been called, got: %v", res.Data)
+ return nil
+ })
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ cancelTime := time.Now()
+ cancel()
+
+ if err := <-done; err != nil {
+ t.Errorf("unexpected sampling error: %v", err)
+ }
+ // Cancel should have killed the script using SIGKILL a short time after the
+ // script failed to exit on SIGINT.
+ if cancelTime.Add(1 * time.Second).Before(time.Now()) {
+ t.Errorf("sampling script took more than a second to stop")
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
index 408f8c4..4286bff 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
@@ -9,7 +9,9 @@
"crypto/rand"
"encoding/hex"
"fmt"
+ "strconv"
"strings"
+ "time"
)
const Sep = '$'
@@ -29,6 +31,18 @@
return parts, nil
}
+func StringifyTime(t time.Time) string {
+ return fmt.Sprintf("%016x", t.UnixNano())
+}
+
+func ParseTime(t string) (time.Time, error) {
+ nsec, err := strconv.ParseInt(t, 16, 64)
+ if err != nil {
+ return time.Time{}, fmt.Errorf("failed parsing time: %v", err)
+ }
+ return time.Unix(0, nsec), nil
+}
+
// ValidateId returns an error if the argument is not a valid identifier.
func ValidateId(id string) error {
if id == "" {
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
index a8579c8..32167f6 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -46,6 +46,26 @@
return nil
}
+// sdata : <DevId>/<StreamId>/<Timestamp>
+func (k KDataPoint) Table() string {
+ return "sdata"
+}
+func (k *KDataPoint) Key() string {
+ return keyutil.Join(k.DevId, k.StreamId, keyutil.StringifyTime(k.Timestamp))
+}
+func (k *KDataPoint) Parse(key string) error {
+ parts, err := keyutil.Split(key, 3)
+ if err != nil {
+ return err
+ }
+ timestamp, err := keyutil.ParseTime(parts[2])
+ if err != nil {
+ return err
+ }
+ k.DevId, k.StreamId, k.Timestamp = parts[0], parts[1], timestamp
+ return nil
+}
+
// TableSpec defines a Syncbase table, encapsulating a key prototype and
// permissions.
type TableSpec struct {
@@ -57,9 +77,11 @@
var MasterTables = []TableSpec{
{Prototype: &KDeviceCfg{}},
{Prototype: &KStreamDef{}},
+ {Prototype: &KDataPoint{}},
}
// All top-level types persisted to measured Syncbase.
var MeasuredTables = []TableSpec{
{Prototype: &KStreamDef{}, ReadOnly: true},
+ {Prototype: &KDataPoint{}},
}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
index 4ddee43..5f729bf 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
@@ -10,6 +10,10 @@
// conversion to and from the row key.
package sbmodel
+import (
+ "time"
+)
+
// devicecfg : <DevId>
// Measuring device handle. Master only.
type VDeviceCfg struct {
@@ -25,15 +29,40 @@
// streamdef : <DevId>/<StreamId>
// Configures a stream of data to be measured.
-// TODO(ivanpi): Add actual sampling script and parameters.
type VStreamDef struct {
// Human-readable, not necessarily unique description of the stream.
Desc string
+ // Sampling configuration.
+ Sampler SamplerDef
// Flag to start and stop sampling.
Enabled bool
}
type KStreamDef struct {
- DevId string
+ DevId string
StreamId string
}
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+ // Shell script executed after every Interval, starting from Start.
+ // It should output a single data point, if available. A non-zero exit
+ // status or failure to parse the value will produce an error instead.
+ Script string
+ Start time.Time
+ Interval time.Duration
+ // TODO(ivanpi): Add timeout.
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+ DevId string
+ StreamId string
+ Timestamp time.Time
+}
+
+type VDataPoint union {
+ Value float64
+ Error string
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
index 355db3c..3f1c289 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
@@ -16,6 +16,10 @@
import (
// VDL system imports
"v.io/v23/vdl"
+
+ // VDL user imports
+ "time"
+ _ "v.io/v23/vdlroot/time"
)
// devicecfg : <DevId>
@@ -43,10 +47,11 @@
// streamdef : <DevId>/<StreamId>
// Configures a stream of data to be measured.
-// TODO(ivanpi): Add actual sampling script and parameters.
type VStreamDef struct {
// Human-readable, not necessarily unique description of the stream.
Desc string
+ // Sampling configuration.
+ Sampler SamplerDef
// Flag to start and stop sampling.
Enabled bool
}
@@ -66,9 +71,77 @@
}) {
}
+// Sampling script and polling frequency.
+type SamplerDef struct {
+ // Shell script executed after every Interval, starting from Start.
+ // It should output a single data point, if available. A non-zero exit
+ // status or failure to parse the value will produce an error instead.
+ Script string
+ Start time.Time
+ Interval time.Duration
+}
+
+func (SamplerDef) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.SamplerDef"`
+}) {
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+ DevId string
+ StreamId string
+ Timestamp time.Time
+}
+
+func (KDataPoint) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDataPoint"`
+}) {
+}
+
+type (
+ // VDataPoint represents any single field of the VDataPoint union type.
+ VDataPoint interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the VDataPoint union type.
+ __VDLReflect(__VDataPointReflect)
+ }
+ // VDataPointValue represents field Value of the VDataPoint union type.
+ VDataPointValue struct{ Value float64 }
+ // VDataPointError represents field Error of the VDataPoint union type.
+ VDataPointError struct{ Value string }
+ // __VDataPointReflect describes the VDataPoint union type.
+ __VDataPointReflect struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDataPoint"`
+ Type VDataPoint
+ Union struct {
+ Value VDataPointValue
+ Error VDataPointError
+ }
+ }
+)
+
+func (x VDataPointValue) Index() int { return 0 }
+func (x VDataPointValue) Interface() interface{} { return x.Value }
+func (x VDataPointValue) Name() string { return "Value" }
+func (x VDataPointValue) __VDLReflect(__VDataPointReflect) {}
+
+func (x VDataPointError) Index() int { return 1 }
+func (x VDataPointError) Interface() interface{} { return x.Value }
+func (x VDataPointError) Name() string { return "Error" }
+func (x VDataPointError) __VDLReflect(__VDataPointReflect) {}
+
func init() {
vdl.Register((*VDeviceCfg)(nil))
vdl.Register((*KDeviceCfg)(nil))
vdl.Register((*VStreamDef)(nil))
vdl.Register((*KStreamDef)(nil))
+ vdl.Register((*SamplerDef)(nil))
+ vdl.Register((*KDataPoint)(nil))
+ vdl.Register((*VDataPoint)(nil))
}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
index 12453de..f166f99 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
@@ -7,6 +7,7 @@
import (
"reflect"
"testing"
+ "time"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
@@ -30,6 +31,9 @@
if got, want := keyPrototype, kt.keyParsed; !reflect.DeepEqual(got, want) {
t.Errorf("incorrect key parse: got %v, want %v", got, want)
}
+ if got, want := kt.keyParsed.Table(), keyPrototype.Table(); got != want {
+ t.Errorf("incorrect parsed key table: got %v, want %v", got, want)
+ }
if got, want := kt.keyParsed.Key(), kt.keyStr; got != want {
t.Errorf("incorrect key build: got %s, want %s", got, want)
}
@@ -80,3 +84,25 @@
kt.Run(t, &sbmodel.KStreamDef{})
}
}
+
+func TestDataPointKeys(t *testing.T) {
+ tests := []keyTest{
+ &validKeyTest{
+ keyStr: keyutil.Join("meter", "amps", "00000022fcde41b2"),
+ keyParsed: &sbmodel.KDataPoint{
+ DevId: "meter",
+ StreamId: "amps",
+ Timestamp: time.Unix(0, 0x22fcde41b2),
+ },
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("meter", "amps"),
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("meter", "amps", "2.17"),
+ },
+ }
+ for _, kt := range tests {
+ kt.Run(t, &sbmodel.KDataPoint{})
+ }
+}