sensorlog_lite: Sampling script runner.

Method for a single sampling script run, with cancellation support.
VDL types for measured data.

Change-Id: I816ee3a5135a8fda54b3624fd7783332d10235c0
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 1ce9f9a..7451931 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -6,6 +6,8 @@
 package config
 
 import (
+	"time"
+
 	"v.io/v23/naming"
 )
 
@@ -16,6 +18,8 @@
 	DBName  = "sldb"
 
 	SyncPriority = 42
+
+	ScriptKillDelay = 100 * time.Millisecond
 )
 
 func SyncgroupName(publishService, devId string) string {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
new file mode 100644
index 0000000..ee54ec1
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sampler implements an asynchronous worker for a single run of the
+// sampling script.
+package sampler
+
+import (
+	"bytes"
+	"fmt"
+	"os/exec"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+type MeasureCallback func(result *MeasureResult) error
+
+type MeasureResult struct {
+	Time time.Time
+	Data sbmodel.VDataPoint
+}
+
+// Run synchronously runs the sampler script, producing a single data point or
+// error message and passing it to the callback (also run synchronously). Fatal
+// errors (including callback errors) are returned from Run.
+// The script is terminated using SIGINT (followed by SIGKILL) to the process
+// group if the context is cancelled. The callback will be called at most once
+// per Run call (may not be called if cancelled or on fatal error).
+func Run(ctx *context.T, samDef *sbmodel.SamplerDef, cb MeasureCallback) error {
+	var out bytes.Buffer
+	// Run sampling script using bash and capture output.
+	sh := exec.Command("setsid", "bash")
+	sh.Stdin = bytes.NewBufferString(samDef.Script)
+	sh.Stdout = &out
+	// TODO(ivanpi): Fail on any stderr output?
+	// TODO(ivanpi): Use end instead of begin timestamp?
+	timestamp := time.Now()
+	// Failure to start the script is a fatal error.
+	if err := sh.Start(); err != nil {
+		return fmt.Errorf("failed starting measuring script: %v", err)
+	}
+	waiter := make(chan error, 1)
+	// Start script asynchronously.
+	go func() {
+		waiter <- sh.Wait()
+	}()
+	// Wait for script exit or cancellation.
+	// TODO(ivanpi): Add script timeout.
+	select {
+	case <-ctx.Done():
+		// If cancelled, send SIGINT to script process group.
+		syscall.Kill(-sh.Process.Pid, syscall.SIGINT)
+		// Wait for script to exit. After a delay, if not exited, send SIGKILL
+		// and wait.
+		select {
+		case <-waiter:
+		case <-time.After(config.ScriptKillDelay):
+			syscall.Kill(-sh.Process.Pid, syscall.SIGKILL)
+			<-waiter
+		}
+		return nil
+	case err := <-waiter:
+		// If script has exited, parse data point or get error.
+		if err != nil {
+			return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Script error: %v", err)}})
+		}
+		measured, err := strconv.ParseFloat(strings.TrimSpace(out.String()), 64)
+		if err != nil {
+			return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Parse error: %v", err)}})
+		}
+		return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointValue{Value: measured}})
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
new file mode 100644
index 0000000..04466c4
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
@@ -0,0 +1,146 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sampler_test
+
+import (
+	"strings"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	tu "v.io/x/ref/test/testutil"
+	"v.io/x/sensorlog_lite/internal/measure/sampler"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// Runs the sampler script and verifies that the callback was called with the
+// expected value or error.
+func runSamplerTest(t *testing.T, samDef *sbmodel.SamplerDef) sbmodel.VDataPoint {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	var gotRes *sampler.MeasureResult
+	start := time.Now()
+	if err := sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+		gotRes = res
+		return nil
+	}); err != nil {
+		t.Fatal(tu.FormatLogLine(3, "unexpected sampling error: %v", err))
+	}
+	end := time.Now()
+
+	if gotRes == nil {
+		t.Fatal(tu.FormatLogLine(3, "sampler callback was not called"))
+	}
+
+	if gotRes.Time.Before(start) || gotRes.Time.After(end) {
+		t.Error(tu.FormatLogLine(3, "invalid time: %v, expected between %v and %v", gotRes.Time, start, end))
+	}
+
+	return gotRes.Data
+}
+
+func runSamplerTestForValue(t *testing.T, samDef *sbmodel.SamplerDef, expectVal float64) {
+	res := runSamplerTest(t, samDef)
+	switch res := res.(type) {
+	case sbmodel.VDataPointValue:
+		if res.Value != expectVal {
+			t.Error(tu.FormatLogLine(2, "unexpected value, got: %v, want: %v", res.Value, expectVal))
+		}
+	default:
+		t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want value: %v", res, expectVal))
+	}
+}
+
+func runSamplerTestForError(t *testing.T, samDef *sbmodel.SamplerDef, expectErrPrefix string) {
+	res := runSamplerTest(t, samDef)
+	switch res := res.(type) {
+	case sbmodel.VDataPointError:
+		if !strings.HasPrefix(res.Value, expectErrPrefix) {
+			t.Error(tu.FormatLogLine(2, "unexpected error, got: %v, want prefix: %v", res.Value, expectErrPrefix))
+		}
+	default:
+		t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want error with prefix: %v", res, expectErrPrefix))
+	}
+}
+
+func TestSamplerRun(t *testing.T) {
+	runSamplerTestForValue(t, &sbmodel.SamplerDef{
+		Script:   `echo 42`,
+		Start:    time.Now(),
+		Interval: time.Second, // 7.5M years overflows time.Duration
+	}, 42.0)
+
+	runSamplerTestForValue(t, &sbmodel.SamplerDef{
+		Script: `
+for i in $(seq 10000); do :; done
+printf "%f\n" -12.73
+exit 0
+`,
+		Start:    time.Now(),
+		Interval: time.Millisecond,
+	}, -12.73)
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42; exit 1`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Script error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `exit 0`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42foo`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42; echo 47`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+}
+
+func TestSamplerCancel(t *testing.T) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	// The script sleeps for 10 seconds, not interruptible by SIGINT/SIGTERM.
+	samDef := &sbmodel.SamplerDef{
+		Script: `
+trap 'sleep 10' INT TERM
+sleep 10
+echo 42
+`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}
+	// Asynchronously run sampler, cancelling shortly after.
+	done := make(chan error, 1)
+	go func() {
+		done <- sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+			t.Errorf("worker was cancelled, result callback should not have been called, got: %v", res.Data)
+			return nil
+		})
+	}()
+
+	time.Sleep(100 * time.Millisecond)
+	cancelTime := time.Now()
+	cancel()
+
+	if err := <-done; err != nil {
+		t.Errorf("unexpected sampling error: %v", err)
+	}
+	// Cancel should have killed the script using SIGKILL a short time after the
+	// script failed to exit on SIGINT.
+	if cancelTime.Add(1 * time.Second).Before(time.Now()) {
+		t.Errorf("sampling script took more than a second to stop")
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
index 408f8c4..4286bff 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
@@ -9,7 +9,9 @@
 	"crypto/rand"
 	"encoding/hex"
 	"fmt"
+	"strconv"
 	"strings"
+	"time"
 )
 
 const Sep = '$'
@@ -29,6 +31,18 @@
 	return parts, nil
 }
 
+func StringifyTime(t time.Time) string {
+	return fmt.Sprintf("%016x", t.UnixNano())
+}
+
+func ParseTime(t string) (time.Time, error) {
+	nsec, err := strconv.ParseInt(t, 16, 64)
+	if err != nil {
+		return time.Time{}, fmt.Errorf("failed parsing time: %v", err)
+	}
+	return time.Unix(0, nsec), nil
+}
+
 // ValidateId returns an error if the argument is not a valid identifier.
 func ValidateId(id string) error {
 	if id == "" {
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
index a8579c8..32167f6 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -46,6 +46,26 @@
 	return nil
 }
 
+// sdata : <DevId>/<StreamId>/<Timestamp>
+func (k KDataPoint) Table() string {
+	return "sdata"
+}
+func (k *KDataPoint) Key() string {
+	return keyutil.Join(k.DevId, k.StreamId, keyutil.StringifyTime(k.Timestamp))
+}
+func (k *KDataPoint) Parse(key string) error {
+	parts, err := keyutil.Split(key, 3)
+	if err != nil {
+		return err
+	}
+	timestamp, err := keyutil.ParseTime(parts[2])
+	if err != nil {
+		return err
+	}
+	k.DevId, k.StreamId, k.Timestamp = parts[0], parts[1], timestamp
+	return nil
+}
+
 // TableSpec defines a Syncbase table, encapsulating a key prototype and
 // permissions.
 type TableSpec struct {
@@ -57,9 +77,11 @@
 var MasterTables = []TableSpec{
 	{Prototype: &KDeviceCfg{}},
 	{Prototype: &KStreamDef{}},
+	{Prototype: &KDataPoint{}},
 }
 
 // All top-level types persisted to measured Syncbase.
 var MeasuredTables = []TableSpec{
 	{Prototype: &KStreamDef{}, ReadOnly: true},
+	{Prototype: &KDataPoint{}},
 }
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
index 4ddee43..5f729bf 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
@@ -10,6 +10,10 @@
 // conversion to and from the row key.
 package sbmodel
 
+import (
+  "time"
+)
+
 // devicecfg : <DevId>
 // Measuring device handle. Master only.
 type VDeviceCfg struct {
@@ -25,15 +29,40 @@
 
 // streamdef : <DevId>/<StreamId>
 // Configures a stream of data to be measured.
-// TODO(ivanpi): Add actual sampling script and parameters.
 type VStreamDef struct {
   // Human-readable, not necessarily unique description of the stream.
   Desc string
+  // Sampling configuration.
+  Sampler SamplerDef
   // Flag to start and stop sampling.
   Enabled bool
 }
 
 type KStreamDef struct {
-  DevId string
+  DevId    string
   StreamId string
 }
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+  // Shell script executed after every Interval, starting from Start.
+  // It should output a single data point, if available. A non-zero exit
+  // status or failure to parse the value will produce an error instead.
+  Script   string
+  Start    time.Time
+  Interval time.Duration
+  // TODO(ivanpi): Add timeout.
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+  DevId     string
+  StreamId  string
+  Timestamp time.Time
+}
+
+type VDataPoint union {
+  Value float64
+  Error string
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
index 355db3c..3f1c289 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
@@ -16,6 +16,10 @@
 import (
 	// VDL system imports
 	"v.io/v23/vdl"
+
+	// VDL user imports
+	"time"
+	_ "v.io/v23/vdlroot/time"
 )
 
 // devicecfg : <DevId>
@@ -43,10 +47,11 @@
 
 // streamdef : <DevId>/<StreamId>
 // Configures a stream of data to be measured.
-// TODO(ivanpi): Add actual sampling script and parameters.
 type VStreamDef struct {
 	// Human-readable, not necessarily unique description of the stream.
 	Desc string
+	// Sampling configuration.
+	Sampler SamplerDef
 	// Flag to start and stop sampling.
 	Enabled bool
 }
@@ -66,9 +71,77 @@
 }) {
 }
 
+// Sampling script and polling frequency.
+type SamplerDef struct {
+	// Shell script executed after every Interval, starting from Start.
+	// It should output a single data point, if available. A non-zero exit
+	// status or failure to parse the value will produce an error instead.
+	Script   string
+	Start    time.Time
+	Interval time.Duration
+}
+
+func (SamplerDef) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.SamplerDef"`
+}) {
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+	DevId     string
+	StreamId  string
+	Timestamp time.Time
+}
+
+func (KDataPoint) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDataPoint"`
+}) {
+}
+
+type (
+	// VDataPoint represents any single field of the VDataPoint union type.
+	VDataPoint interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the VDataPoint union type.
+		__VDLReflect(__VDataPointReflect)
+	}
+	// VDataPointValue represents field Value of the VDataPoint union type.
+	VDataPointValue struct{ Value float64 }
+	// VDataPointError represents field Error of the VDataPoint union type.
+	VDataPointError struct{ Value string }
+	// __VDataPointReflect describes the VDataPoint union type.
+	__VDataPointReflect struct {
+		Name  string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDataPoint"`
+		Type  VDataPoint
+		Union struct {
+			Value VDataPointValue
+			Error VDataPointError
+		}
+	}
+)
+
+func (x VDataPointValue) Index() int                       { return 0 }
+func (x VDataPointValue) Interface() interface{}           { return x.Value }
+func (x VDataPointValue) Name() string                     { return "Value" }
+func (x VDataPointValue) __VDLReflect(__VDataPointReflect) {}
+
+func (x VDataPointError) Index() int                       { return 1 }
+func (x VDataPointError) Interface() interface{}           { return x.Value }
+func (x VDataPointError) Name() string                     { return "Error" }
+func (x VDataPointError) __VDLReflect(__VDataPointReflect) {}
+
 func init() {
 	vdl.Register((*VDeviceCfg)(nil))
 	vdl.Register((*KDeviceCfg)(nil))
 	vdl.Register((*VStreamDef)(nil))
 	vdl.Register((*KStreamDef)(nil))
+	vdl.Register((*SamplerDef)(nil))
+	vdl.Register((*KDataPoint)(nil))
+	vdl.Register((*VDataPoint)(nil))
 }
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
index 12453de..f166f99 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
@@ -7,6 +7,7 @@
 import (
 	"reflect"
 	"testing"
+	"time"
 
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
@@ -30,6 +31,9 @@
 	if got, want := keyPrototype, kt.keyParsed; !reflect.DeepEqual(got, want) {
 		t.Errorf("incorrect key parse: got %v, want %v", got, want)
 	}
+	if got, want := kt.keyParsed.Table(), keyPrototype.Table(); got != want {
+		t.Errorf("incorrect parsed key table: got %v, want %v", got, want)
+	}
 	if got, want := kt.keyParsed.Key(), kt.keyStr; got != want {
 		t.Errorf("incorrect key build: got %s, want %s", got, want)
 	}
@@ -80,3 +84,25 @@
 		kt.Run(t, &sbmodel.KStreamDef{})
 	}
 }
+
+func TestDataPointKeys(t *testing.T) {
+	tests := []keyTest{
+		&validKeyTest{
+			keyStr: keyutil.Join("meter", "amps", "00000022fcde41b2"),
+			keyParsed: &sbmodel.KDataPoint{
+				DevId:     "meter",
+				StreamId:  "amps",
+				Timestamp: time.Unix(0, 0x22fcde41b2),
+			},
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("meter", "amps"),
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("meter", "amps", "2.17"),
+		},
+	}
+	for _, kt := range tests {
+		kt.Run(t, &sbmodel.KDataPoint{})
+	}
+}