Merge branch 'master' of /tmp/experimental into ivanpi-sensorlog-merge
diff --git a/go/src/v.io/x/sensorlog/Makefile b/go/src/v.io/x/sensorlog/Makefile
new file mode 100644
index 0000000..d880055
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/Makefile
@@ -0,0 +1,30 @@
+PATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite/bin:$(JIRI_ROOT)/release/go/bin:$(PATH)
+SHELL := /bin/bash -euo pipefail
+export GOPATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite:$(GOPATH)
+export VDLPATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite/src:$(VDLPATH)
+
+.DELETE_ON_ERROR:
+
+.PHONY: all
+all: measured slcli
+
+.PHONY: measured
+measured:
+ jiri go install v.io/x/sensorlog_lite/measured
+
+.PHONY: slcli
+slcli:
+ jiri go install v.io/x/sensorlog_lite/slcli
+
+.PHONY: run-deps
+run-deps:
+ jiri go install v.io/x/ref/services/mounttable/mounttabled
+ jiri go install v.io/x/ref/services/syncbase/syncbased
+ jiri go install v.io/x/ref/services/agent/vbecome
+ jiri go install v.io/x/ref/cmd/principal
+
+.PHONY: test
+test:
+ jiri go test v.io/x/sensorlog_lite/...
+ # TODO(ivanpi): Add jiri command to run integration tests.
+ jiri go test v.io/x/sensorlog_lite/internal/client -run=TestV23* -v23.tests
diff --git a/go/src/v.io/x/sensorlog/README.md b/go/src/v.io/x/sensorlog/README.md
new file mode 100644
index 0000000..734d822
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/README.md
@@ -0,0 +1,88 @@
+# Sensor Log Lite
+
+Sensor Log Lite is an example Syncbase application for measuring streams of
+time series data on a group of devices.
+
+A Sensor Log Lite system consists of a master and any number of measuring
+devices. The master device runs Syncbase and the command line client
+(`slcli`), while each measuring device runs Syncbase and the measuring
+daemon (`measured`). The client controls the measuring daemon through
+Syncbase.
+
+# Measuring data
+
+Each measuring device runs an instance of `measured`, which samples data
+points for one or more measuring streams. A stream is an ordered sequence
+of data points that are sampled by running a shell script at a set
+frequency. The `slcli` tool can be used to write the sampling configuration
+to the master Syncbase instance, which is then synced to the appropriate
+measuring device Syncbase. Measured data points are synced back to the
+master Syncbase, where they can be examined using `slcli`.
+
+### Starting `measured`
+
+In the instructions below, replace `<creds>` with the path to
+Vanadium credentials obtained using `principal seekblessings`.
+
+`measured`, along with the required `mounttabled` and `syncbased` services,
+can be started by:
+
+ $ V23_CREDENTIALS=<creds> SL_DEVID=dev1 ./scripts/run_measured.sh
+
+By default, it starts a local mounttable at port 8707 (override using
+`$SL_IPADDR_PORT`), mounts it at
+`/ns.dev.v.io:8101/users/<email-from-blessing>/sl/measured/<devid>`,
+starts a Syncbase instance mounted in the local mounttable at
+`sl/measured/<devid>`, and a measuring daemon against the Syncbase instance.
+
+### Using the client
+
+Before using the client, the master `syncbased` must be started by:
+
+ $ V23_CREDENTIALS=<creds> ./scripts/run_cli_syncbased.sh
+
+By default, it starts a local mounttable at port 8202 (override using
+`$SL_IPADDR_PORT`) and starts a Syncbase instance mounted in it at
+`sl/client/main` (override using `$SL_DEVID`).
+
+Running
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh <args>
+
+will invoke the `slcli` tool with blessings and flags set appropriately
+for a master stack run with the same environment.
+
+### Adding a device
+
+Each measuring device creates a syncgroup which can be joined by a master
+device. Using the above default configuration, run:
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh device add /ns.dev.v.io:8101/users/<email-from-blessing>/sl/measured/dev1/sl/measured/dev1/syncbased dev1
+
+### Creating a stream
+
+Once a device has been added, streams can be configured on it to start
+sampling data. The sampling script is expected to output a single floating
+point value and exit with a zero status code on every invocation, otherwise
+an error is logged instead of data.
+
+For example, to sample the Answer to Life, the Universe, and Everything
+every two seconds:
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh stream create dev1 stream42 2s <<< "echo 42;"
+
+### Listing data
+
+To list all data sampled on the stream, run
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list dev1 stream42
+
+To keep listing newly measured data until `slcli` is killed, add `-follow`:
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list -follow dev1 stream42
+
+### Debugging
+
+The internal state of the master Syncbase can be examined using `sb`:
+
+ $ V23_CREDENTIALS=<creds> ${JIRI_ROOT}/release/go/bin/vbecome -name sl:client:main ${JIRI_ROOT}/release/go/bin/sb -service /$(dig $(hostname) +short):8202/sl/client/main/syncbased sh sensorlog_lite sldb
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
new file mode 100644
index 0000000..ee266e3
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for measuring device configuration.
+
+package client
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// AddDevice joins the syncgroup of the measuring device identified by devId,
+// expected to be published at sgPublishSb, and makes it available for stream
+// configuration.
+func AddDevice(ctx *context.T, db nosql.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
+ if err := keyutil.ValidateId(devId); err != nil {
+ return nil, fmt.Errorf("invalid devId: %v", err)
+ }
+ devKey := sbmodel.KDeviceCfg{
+ DevId: devId,
+ }
+ if desc == "" {
+ desc = "device:" + devKey.Key()
+ }
+ devVal := sbmodel.VDeviceCfg{
+ Desc: desc,
+ SgPublishSb: sgPublishSb,
+ }
+ devSgName := config.SyncgroupName(sgPublishSb, devKey.Key())
+ devRow := db.Table(devKey.Table()).Row(devKey.Key())
+
+ if exists, err := devRow.Exists(ctx); err != nil {
+ return nil, err
+ } else if exists {
+ return nil, verror.New(verror.ErrExist, ctx, "Device '"+devKey.Key()+"' was already added")
+ }
+
+ // TODO(ivanpi): Lack of atomicity here can result in a duplicate join, not
+ // really relevant until syncgroup Leave is implemented.
+ sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+ if _, err := db.Syncgroup(devSgName).Join(ctx, sgMemberInfo); err != nil {
+ return nil, err
+ }
+
+ return &devKey, devRow.Put(ctx, &devVal)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
new file mode 100644
index 0000000..98a9182
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
@@ -0,0 +1,187 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+ "bytes"
+ "fmt"
+ "reflect"
+ "sort"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/x/ref/lib/signals"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test/modules"
+ "v.io/x/ref/test/v23tests"
+ slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+ "v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+//go:generate jiri test generate
+
+const (
+ dummyScript = "echo 42;"
+ dummyInterval = "1s"
+)
+
+func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
+ // Start a 'global' mounttable.
+ globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
+ // Mount the local mounttable in the global one.
+ localMT := naming.Join(globalMT, "localmt")
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0", "--name="+localMT)
+
+ clientSb := "sb/client"
+ clientCreds, _ := t.Shell().NewChildCredentials("client")
+ clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+ cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+ `{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ measuredSb := "sb/measured"
+ measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+ measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+ cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+ `{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ time.Sleep(1 * time.Second)
+
+ // Initialize measuring device syncgroup using the global mounttable alias
+ // of measured Syncbase for publishing.
+ devId := "measured1"
+ publishSb := naming.Join(localMT, measuredSb)
+ sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root/client", publishSb, globalMT)
+
+ // Add measuring device to client, joining its syncgroup.
+ sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+ // Allow time for syncgroup metadata sync.
+ time.Sleep(3 * time.Second)
+
+ // After syncgroup has been joined, sync can use local mounttable. Kill the
+ // global one.
+ if err := globalMTHandle.Kill(syscall.SIGINT); err != nil {
+ t.Fatalf("failed to kill global mounttable: %v", err)
+ }
+ if err := globalMTHandle.Shutdown(nil, nil); err != nil {
+ t.Fatalf("failed to shutdown global mounttable: %v", err)
+ }
+
+ streamIds := []string{"str1", "str2", "str3"}
+
+ // Create stream before starting measured watcher.
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)
+
+ time.Sleep(1 * time.Second)
+
+ measureWatcher, err := t.Shell().StartWithOpts(
+ t.Shell().DefaultStartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 60*time.Second), nil,
+ runWatchForStreams, append([]string{measuredSb, devId}, streamIds...)...)
+ if err != nil {
+ t.Fatalf("failed to start measureWatcher: %v", err)
+ }
+
+ // Create more streams.
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)
+
+ time.Sleep(1 * time.Second)
+
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)
+
+ // Allow time for sync.
+ time.Sleep(3 * time.Second)
+
+ // Kill will stop the watcher and cause it to verify that seen streams match
+ // expectations.
+ if err := syscall.Kill(measureWatcher.Pid(), syscall.SIGINT); err != nil {
+ t.Fatalf("failed to kill measureWatcher: %v", err)
+ }
+ var stdout, stderr bytes.Buffer
+ if err := measureWatcher.Shutdown(&stdout, &stderr); err != nil {
+ t.Errorf("failed to shutdown measureWatcher: %v")
+ }
+ t.Logf("measureWatcher stdout:\n%s", stdout.String())
+ t.Logf("measureWatcher stderr:\n%s", stderr.String())
+}
+
+var runInitSyncgroup = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, admin, publishSb, sgMT := args[0], args[1], args[2], args[3], args[4]
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+ if err != nil {
+ return fmt.Errorf("failed initializing measured database: %v", err)
+ }
+
+ sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
+ if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
+ return fmt.Errorf("failed initializing measured syncgroup: %v", err)
+ }
+ if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
+ return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
+ }
+
+ return nil
+}, "runInitSyncgroup")
+
+var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, expectStreams := args[0], args[1], args[2:]
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+ if err != nil {
+ return fmt.Errorf("failed opening measured database: %v", err)
+ }
+
+ gotStreams := make([]string, 0, len(expectStreams))
+
+ ctx, stop := context.WithCancel(ctx)
+ wait := util.AsyncRun(func() error {
+ return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+ if got, want := key.DevId, devId; got != want {
+ return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+ }
+ gotStreams = append(gotStreams, key.StreamId)
+ return nil
+ })
+ }, func(err error) {
+ stop()
+ })
+
+ // Wait for kill, then stop watcher.
+ <-signals.ShutdownOnSignals(nil)
+ stop()
+
+ if err := wait(); err != nil {
+ return err
+ }
+
+ // Compare streams seen by watcher to expectations.
+ sort.Strings(gotStreams)
+ sort.Strings(expectStreams)
+ if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+ }
+
+ return nil
+}, "runWatchForStreams")
+
+func startAdditionalMT(t *v23tests.T, args ...string) (string, *v23tests.Invocation) {
+ bin := t.BuildV23Pkg("v.io/x/ref/services/mounttable/mounttabled")
+ inv := bin.Start(args...)
+ return inv.ExpectVar("NAME"), inv
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/doc.go b/go/src/v.io/x/sensorlog/internal/client/doc.go
new file mode 100644
index 0000000..2f09780
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/doc.go
@@ -0,0 +1,8 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package client implements Sensor Log client methods, intended to run
+// against the master device Syncbase. It supports adding measuring devices,
+// configuring streams, and listing measured data.
+package client
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
new file mode 100644
index 0000000..e2aedb0
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for listing stream data.
+
+package client
+
+import (
+ "fmt"
+ "sort"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/services/watch"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// ListStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each.
+// TODO(ivanpi): Allow specifying time interval.
+func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+ _, err := listStreamData(ctx, db, streamKey, listCb)
+ return err
+}
+
+// FollowStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each. It keeps listing new data
+// points until ctx is cancelled.
+// TODO(ivanpi): Allow specifying time interval.
+func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+ tableName := sbmodel.KDataPoint{}.Table()
+ dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+ resMark, err := listStreamData(ctx, db, streamKey, listCb)
+ if err != nil {
+ return err
+ }
+
+ // Watch for new DataPoints.
+ ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+ if err != nil {
+ return err
+ }
+ defer ws.Cancel()
+
+ trans := make([]*dataPoint, 0, 16)
+ for ws.Advance() {
+ c := ws.Change()
+ var elem dataPoint
+ if err := elem.Key.Parse(c.Row); err != nil {
+ return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ switch c.ChangeType {
+ case nosql.PutChange:
+ if err := c.Value(&elem.Val); err != nil {
+ return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
+ }
+ // In the absence of overlapping syncgroups or partial ACLs, the order of
+ // writes is preserved by sync. However, several writes may be grouped
+ // into a single batch, and the order is not preserved within a batch.
+ // Each batch is manually sorted before being emitted to the callback.
+ trans = append(trans, &elem)
+ if !c.Continued {
+ sort.Stable(dataPointSort(trans))
+ for _, elem := range trans {
+ if err := listCb(&elem.Key, elem.Val); err != nil {
+ return err
+ }
+ }
+ trans = trans[:0]
+ }
+ case nosql.DeleteChange:
+ // no-op
+ }
+ }
+ return ws.Err()
+}
+
+// listStreamData implements listing (scanning over) existing stream data. It
+// also returns the resume marker to allow watching for future data.
+func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
+ var resMark watch.ResumeMarker
+ tableName := sbmodel.KDataPoint{}.Table()
+ dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+ bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ return resMark, err
+ }
+ defer bdb.Abort(ctx)
+
+ resMark, err = bdb.GetResumeMarker(ctx)
+ if err != nil {
+ return resMark, err
+ }
+
+ streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+ if exists, err := streamRow.Exists(ctx); err != nil {
+ return resMark, err
+ } else if !exists {
+ return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+ }
+
+ sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KDataPoint{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ var val sbmodel.VDataPoint
+ if err := sstr.Value(&val); err != nil {
+ return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+ }
+ if err := listCb(key, val); err != nil {
+ return resMark, err
+ }
+ }
+ return resMark, sstr.Err()
+}
+
+type dataPoint struct {
+ Key sbmodel.KDataPoint
+ Val sbmodel.VDataPoint
+}
+
+// dataPointSort implements sorting a dataPoint slice by timestamp.
+type dataPointSort []*dataPoint
+
+func (s dataPointSort) Len() int { return len(s) }
+func (s dataPointSort) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
diff --git a/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
new file mode 100644
index 0000000..8558501
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+ "bytes"
+ "fmt"
+ "strconv"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/naming"
+ "v.io/x/ref"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test/modules"
+ "v.io/x/ref/test/v23tests"
+ "v.io/x/sensorlog_lite/internal/client"
+ slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+//go:generate jiri test generate
+
+func V23TestStreamConfigAndList(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)
+
+ clientSb := "sb/client"
+ clientCreds, _ := t.Shell().NewChildCredentials("client")
+ clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+ cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+ `{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ measuredSb := "sb/measured"
+ measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+ measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+ cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+ `{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ time.Sleep(1 * time.Second)
+
+ // Start measured.
+ measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured")
+ devId := "measured1"
+ publishSb := naming.Join(mtName, measuredSb)
+ measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
+ measured := measuredBin.WithStartOpts(measuredOpts).Start(
+ "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client",
+ "-publish-sb="+publishSb)
+
+ time.Sleep(3 * time.Second)
+
+ // Add measuring device to client, joining its syncgroup.
+ sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+ // Create streams.
+ streamId1, result1, interval1 := "str1", "42", "2s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")
+
+ time.Sleep(1 * time.Second)
+
+ streamId2, result2, interval2 := "str2", "47", "1s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")
+
+ // streamId has another streamId as prefix to verify ListDataStreams prefix
+ // handling.
+ streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")
+
+ // Allow time for sync and at least 3 measurements.
+ time.Sleep(10 * time.Second)
+
+ // Kill will gracefully stop measured.
+ if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
+ t.Fatalf("failed to kill measured: %v", err)
+ }
+ var stdout, stderr bytes.Buffer
+ if err := measured.Shutdown(&stdout, &stderr); err != nil {
+ t.Errorf("failed to shutdown measured: %v")
+ }
+ t.Logf("measured stdout:\n%s", stdout.String())
+ t.Logf("measured stderr:\n%s", stderr.String())
+
+ // Check that both streams have at least 3 measurements synced back to
+ // client device Syncbase.
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
+}
+
+var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]
+
+ expectVal, err := strconv.ParseFloat(expectValStr, 64)
+ if err != nil {
+ return err
+ }
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+ if err != nil {
+ return fmt.Errorf("failed opening measured database: %v", err)
+ }
+
+ count := 0
+ streamKey := &sbmodel.KStreamDef{
+ DevId: devId,
+ StreamId: streamId,
+ }
+ if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ if got, want := key.StreamId, streamId; got != want {
+ return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
+ }
+ switch val := val.(type) {
+ case sbmodel.VDataPointValue:
+ if val.Value != expectVal {
+ return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
+ }
+ default:
+ return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
+ }
+ count++
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ if count < 3 {
+ return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
+ }
+
+ return nil
+}, "runListStreamData")
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
new file mode 100644
index 0000000..74a9fb5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -0,0 +1,78 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for stream configuration.
+
+package client
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// CreateStream writes the configuration for a new sampling stream to Syncbase.
+// The target device must have been configured. streamId must be unique for
+// the target device.
+// TODO(ivanpi): Make start time configurable.
+func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
+ if err := keyutil.ValidateId(streamId); err != nil {
+ return nil, fmt.Errorf("invalid streamId: %v", err)
+ }
+ if strings.TrimSpace(script) == "" {
+ return nil, fmt.Errorf("sampling script cannot be empty")
+ }
+ if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
+ return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
+ }
+ stmKey := sbmodel.KStreamDef{
+ DevId: devKey.DevId,
+ StreamId: streamId,
+ }
+ if desc == "" {
+ desc = "stream:" + stmKey.Key()
+ }
+ stmVal := sbmodel.VStreamDef{
+ Desc: desc,
+ Sampler: sbmodel.SamplerDef{
+ Script: script,
+ Start: time.Now(),
+ Interval: interval,
+ },
+ Enabled: true,
+ }
+
+ if err := nosql.RunInBatch(ctx, db, nosql_wire.BatchOptions{}, func(db nosql.BatchDatabase) error {
+ devRow := db.Table(devKey.Table()).Row(devKey.Key())
+ stmRow := db.Table(stmKey.Table()).Row(stmKey.Key())
+
+ if exists, err := devRow.Exists(ctx); err != nil {
+ return err
+ } else if !exists {
+ return verror.New(verror.ErrNoExist, ctx, "Device '"+devKey.Key()+"' does not exist")
+ }
+
+ if exists, err := stmRow.Exists(ctx); err != nil {
+ return err
+ } else if exists {
+ return verror.New(verror.ErrExist, ctx, "Stream '"+stmKey.Key()+"' already exists")
+ }
+
+ return stmRow.Put(ctx, stmVal)
+ }); err != nil {
+ return nil, err
+ }
+
+ return &stmKey, nil
+}
+
+// TODO(ivanpi): Implement pausing and resuming streams.
diff --git a/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
new file mode 100644
index 0000000..80b6f5e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
@@ -0,0 +1,71 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package testutil implements testing modules for common client operations.
+package testutil
+
+import (
+ "fmt"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/verror"
+ "v.io/x/ref/test/modules"
+ "v.io/x/sensorlog_lite/internal/client"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// Required parameters: sbService, devId, publishSb
+var RunAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, publishSb := args[0], args[1], args[2]
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed initializing master database: %v", err)
+ }
+
+ if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
+ return fmt.Errorf("AddDevice %s failed: %v", devId, err)
+ }
+ if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+ return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
+ }
+
+ return nil
+}, "runAddDevice")
+
+// Required parameters: sbService, devId, streamId, script, interval
+var RunCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, streamId, script, intervalStr := args[0], args[1], args[2], args[3], args[4]
+
+ interval, err := time.ParseDuration(intervalStr)
+ if err != nil {
+ return err
+ }
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed opening master database: %v", err)
+ }
+
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, "", interval, ""); err == nil {
+ return fmt.Errorf("CreateStream %s with empty script should have failed", streamId)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, 42*time.Nanosecond, ""); err == nil {
+ return fmt.Errorf("CreateStream %s with tiny duration should have failed", streamId)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); err != nil {
+ return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+ return fmt.Errorf("repeat CreateStream %s should have failed with ErrExist, got: %v", streamId, err)
+ }
+
+ return nil
+}, "runCreateStream")
diff --git a/go/src/v.io/x/sensorlog/internal/client/v23_test.go b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
new file mode 100644
index 0000000..d826a88
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
@@ -0,0 +1,32 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+
+package client_test
+
+import (
+ "os"
+ "testing"
+
+ "v.io/x/ref/test/modules"
+ "v.io/x/ref/test/v23tests"
+)
+
+func TestMain(m *testing.M) {
+ modules.DispatchAndExitIfChild()
+ cleanup := v23tests.UseSharedBinDir()
+ r := m.Run()
+ cleanup()
+ os.Exit(r)
+}
+
+func TestV23DeviceAddAndStreamCreate(t *testing.T) {
+ v23tests.RunTest(t, V23TestDeviceAddAndStreamCreate)
+}
+
+func TestV23StreamConfigAndList(t *testing.T) {
+ v23tests.RunTest(t, V23TestStreamConfigAndList)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
new file mode 100644
index 0000000..a948ae2
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Sensor Log configuration constants and default flag values.
+package config
+
+import (
+ "time"
+
+ "v.io/v23/naming"
+)
+
+const (
+ DefaultSbService = "syncbase"
+
+ AppName = "sensorlog_lite"
+ DBName = "sldb"
+
+ SyncPriority = 42
+
+ TimeOutputFormat = "2006-01-02 15:04:05.000"
+
+ // Delay between SIGINT and SIGKILL when stopping measuring script.
+ ScriptKillDelay = 100 * time.Millisecond
+
+ // Both Syncbase write and sampling script slowness impose this limit;
+ // sampling with mocked out script and Syncbase write supports 200 µs.
+ MinSamplingInterval = 10 * time.Millisecond
+)
+
+func SyncgroupName(publishService, devId string) string {
+ return naming.Join(publishService, "%%sync", "sllite", "dev", devId)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/doc.go b/go/src/v.io/x/sensorlog/internal/measure/doc.go
new file mode 100644
index 0000000..c7e18f3
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/doc.go
@@ -0,0 +1,8 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package measure implements Sensor Log measured methods, intended to run
+// against the measured device Syncbase. It supports configuring the device
+// syncgroup and watching for stream configuration changes.
+package measure
diff --git a/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go
new file mode 100644
index 0000000..8f854a2
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go
@@ -0,0 +1,157 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package runloop implements an array of data measuring workers, one per
+// stream, periodically running the configured sampling scripts.
+package runloop
+
+import (
+ "time"
+
+ "v.io/v23/context"
+ "v.io/x/sensorlog_lite/internal/measure/sampler"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+type DataCallback func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// Loop is a pool of data measuring workers, at most one per stream.
+// NOTE: Loop is not thread-safe. Register and WaitAll should not be called
+// concurrently.
+type Loop interface {
+ // Register starts a measuring worker for the stream specified by streamKey
+ // and samDef, cancelling and replacing any existing worker for the same
+ // stream.
+ // The worker executes the sampling script every integer number of Intervals
+ // starting from Start, calling cb with the measured VDataPoint.
+ // The sampling script and the callback are guaranteed to each have at most
+ // one instance running in parallel.
+ // The new worker will not start until any existing worker for the same
+ // stream has exited.
+ Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback)
+
+ // Unregister cancels any existing measuring worker for the stream specified
+ // by streamKey.
+ Unregister(streamKey *sbmodel.KStreamDef)
+
+ // WaitAll waits for all currently running measuring workers to exit.
+ WaitAll()
+}
+
+type loopImpl struct {
+ tasks map[string]measureTask
+ runner sampleRunner
+ fatalCb util.ErrorCb
+}
+
+// NewLoop returns a new measuring worker pool.
+func NewLoop(fatalCb util.ErrorCb) Loop {
+ return newLoopWithRunner(fatalCb, sampler.Run)
+}
+
+// sampleRunner should run the sampling script once, producing a single
+// VDataPoint and passing it to cb.
+type sampleRunner func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error
+
+// Constructor for testing with mock sampleRunner.
+func newLoopWithRunner(fatalCb util.ErrorCb, runner sampleRunner) Loop {
+ return &loopImpl{
+ tasks: make(map[string]measureTask, 16),
+ runner: runner,
+ fatalCb: fatalCb,
+ }
+}
+
+// Register implements Loop.Register.
+func (ml *loopImpl) Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) {
+ ctx, cancel := context.WithCancel(ctx)
+
+ oldTask, hasOldTask := ml.tasks[streamKey.Key()]
+ if hasOldTask {
+ // Cancel existing worker, if any.
+ oldTask.cancel()
+ }
+
+ // Start new worker that first waits for the existing worker to exit, then
+ // starts measuring.
+ waitLoop := util.AsyncRun(func() error {
+ if hasOldTask {
+ // Wait for existing worker, if any, to exit.
+ _ = oldTask.wait()
+ }
+
+ return ml.measureWorker(ctx, streamKey, samDef, dataCb)
+ }, ml.fatalCb)
+
+ // Register new worker handle.
+ ml.tasks[streamKey.Key()] = measureTask{
+ cancel: cancel,
+ wait: waitLoop,
+ }
+}
+
+// getDelay computes the delay from now to samDef.Start or, if it has passed,
+// the next multiple of samDef.Interval since samDef.Start.
+func getDelay(samDef *sbmodel.SamplerDef) time.Duration {
+ now := time.Now()
+ if now.Before(samDef.Start) {
+ // Start time has not yet passed, wait until start time.
+ return samDef.Start.Sub(now)
+ } else {
+ // Start time has passed, wait until next multiple of interval since the
+ // start time.
+ diff := now.Sub(samDef.Start).Nanoseconds()
+ intv := samDef.Interval.Nanoseconds()
+ return time.Duration(intv - (diff % intv))
+ }
+}
+
+// measureWorker runs the measuring script on every samDef.Interval, starting
+// from samDef.Start, and passes the result to dataCb. Measure intervals are
+// skipped if the measuring script or dataCb is still running. The worker exits
+// when ctx is cancelled or the measuring script or dataCb return an error.
+func (ml *loopImpl) measureWorker(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) error {
+ for {
+ delay := getDelay(samDef)
+ // Wait for delay. If not cancelled while waiting, proceed to measure.
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-time.After(delay):
+ }
+
+ // Run the measuring script, passing the result to dataCb.
+ if err := ml.runner(ctx, samDef, func(res *sampler.MeasureResult) error {
+ return dataCb(ctx, &sbmodel.KDataPoint{
+ DevId: streamKey.DevId,
+ StreamId: streamKey.StreamId,
+ Timestamp: res.Time,
+ }, res.Data)
+ }); err != nil {
+ return err
+ }
+ }
+}
+
+// Unregister implements Loop.Unregister.
+func (ml *loopImpl) Unregister(streamKey *sbmodel.KStreamDef) {
+ if task, hasTask := ml.tasks[streamKey.Key()]; hasTask {
+ task.cancel()
+ }
+ // task is left in ml.tasks to allow WaitAll or a new worker to wait on it.
+}
+
+// WaitAll implements Loop.WaitAll.
+func (ml *loopImpl) WaitAll() {
+ for _, t := range ml.tasks {
+ _ = t.wait()
+ }
+}
+
+// Worker handles for cancellation purposes.
+type measureTask struct {
+ cancel func()
+ wait func() error
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go
new file mode 100644
index 0000000..4c56b6a
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go
@@ -0,0 +1,341 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runloop
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ tu "v.io/x/ref/test/testutil"
+ "v.io/x/sensorlog_lite/internal/measure/sampler"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+// Time delay quantum. It is assumed that several goroutines can reliably
+// execute trivial code and yield within this time.
+// TODO(ivanpi): Replace with logical clock that counts active goroutines.
+const qdelay = 100 * time.Millisecond
+
+// delayWriter implements a DataCallback that waits for a configurable delay
+// before writing the received VDataError. It also allows asserting that the
+// expected data has been written.
+type delayWriter struct {
+ delay time.Duration
+ mu sync.Mutex
+ output string
+}
+
+// write sleeps for delay and, if not cancelled, writes the VDataError.
+// Sleep is not interrupted by cancellation.
+func (dw *delayWriter) write(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ time.Sleep(dw.delay)
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ }
+ dw.mu.Lock()
+ defer dw.mu.Unlock()
+ if msg, ok := val.(sbmodel.VDataPointError); ok {
+ dw.output += string(msg.Value)
+ } else {
+ return fmt.Errorf("writer expected VDataError, got: %v", val)
+ }
+ return nil
+}
+
+// assert asserts that the written VDataErrors, concatenated, equal expected.
+func (dw *delayWriter) assert(t *testing.T, expected string) {
+ dw.mu.Lock()
+ defer dw.mu.Unlock()
+ if got, want := dw.output, expected; got != want {
+ t.Error(tu.FormatLogLine(2, "unexpected output: got %q, want %q", got, want))
+ }
+}
+
+// newEchoDelaySampler returns a sampleRunner that expects mock scripts of the
+// form '<delay>|<output>'. The sampleRunner sleeps for <delay> and, if not
+// cancelled, calls cb with the VDataError of the form '<output>@<time>;',
+// where <time> is the time when the sampleRunner was started, expressed as the
+// number of whole qdelay intervals since start.
+func newEchoDelaySampler(start time.Time) sampleRunner {
+ return func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error {
+ timestamp := time.Now()
+ parts := strings.SplitN(samDef.Script, "|", 2)
+ delay, err := time.ParseDuration(parts[0])
+ if err != nil {
+ panic(err)
+ }
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-time.After(delay):
+ }
+ timeDelta := timestamp.Sub(start).Nanoseconds() / time.Duration(qdelay).Nanoseconds()
+ result := fmt.Sprintf("%s@%d;", parts[1], timeDelta)
+ return cb(&sampler.MeasureResult{
+ Time: timestamp,
+ Data: sbmodel.VDataPointError{Value: result},
+ })
+ }
+}
+
+// newFatalErrorCb returns an ErrorCb that fails the test and cancels the
+// context on any error.
+func newFatalErrorCb(t *testing.T, ctxCancel func()) util.ErrorCb {
+ return func(err error) {
+ t.Errorf("measuring fatal error: %v", err)
+ ctxCancel()
+ }
+}
+
+// streamKey returns a test KStreamCfg for the streamId.
+func streamKey(streamId string) *sbmodel.KStreamDef {
+ return &sbmodel.KStreamDef{
+ DevId: "test",
+ StreamId: streamId,
+ }
+}
+
+// samplerDef returns a test SamplerDef with a mock script as expected by an
+// echoDelaySampler.
+func samplerDef(start time.Time, intv time.Duration, out string, delay time.Duration) *sbmodel.SamplerDef {
+ return &sbmodel.SamplerDef{
+ Script: fmt.Sprintf("%v|%s", delay, out),
+ Start: start,
+ Interval: intv,
+ }
+}
+
+// scheduler allows scheduling functions with delays and waiting for all
+// scheduled functions to finish (using wg). Cancel ctx to cancel pending
+// functions. Functions are run under mutex (not concurrently).
+type scheduler struct {
+ ctx *context.T
+ wg sync.WaitGroup
+ mu sync.Mutex
+}
+
+// schedule runs the specified function after the specified delay.
+func (sc *scheduler) schedule(delay time.Duration, run func()) {
+ sc.wg.Add(1)
+ go func() {
+ defer sc.wg.Done()
+ select {
+ case <-time.After(delay):
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ run()
+ case <-sc.ctx.Done():
+ }
+ }()
+}
+
+// Tests that samplers run with correct time offsets until unregistered.
+func TestLoopUnregister(t *testing.T) {
+ ctx, ctxCancel := context.RootContext()
+ defer ctxCancel()
+ sc := scheduler{ctx: ctx}
+ start := time.Now()
+ ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+ // Start at -3, measuring every 2, active 0 to 10 -> 1, 3, 5, 7, 9
+ dwPast := &delayWriter{delay: 0}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
+ })
+ sc.schedule(10*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+ // Start at 3, measuring every 2, active 0 to 6 -> 3, 5
+ dwFuture := &delayWriter{delay: 0}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(3*qdelay), 2*qdelay, "47", 0), dwFuture.write)
+ })
+ sc.schedule(6*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+ sc.wg.Wait()
+ ml.WaitAll()
+
+ dwPast.assert(t, "42@1;42@3;42@5;42@7;42@9;") // time 1, 3, 5, 7, 9
+ dwFuture.assert(t, "47@3;47@5;") // time 3, 5
+}
+
+// Tests that samplers run with correct time offsets until the context is
+// cancelled.
+func TestLoopCancel(t *testing.T) {
+ ctx, ctxCancel := context.RootContext()
+ defer ctxCancel()
+ sc := scheduler{ctx: ctx}
+ start := time.Now()
+ ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+ // Start at -3, measuring every 2, active 2 to 8 (cancel) -> 3, 5, 7
+ dwPast := &delayWriter{delay: 0}
+ sc.schedule(2*qdelay, func() {
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
+ })
+
+ // Start at 1, measuring every 2, active 0 to 8 (cancel) -> 1, 3, 5, 7
+ dwFuture := &delayWriter{delay: 0}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 0), dwFuture.write)
+ })
+
+ sc.schedule(8*qdelay, ctxCancel)
+
+ sc.wg.Wait()
+ ml.WaitAll()
+
+ dwPast.assert(t, "42@3;42@5;42@7;") // time 3, 5, 7
+ dwFuture.assert(t, "47@1;47@3;47@5;47@7;") // time 1, 3, 5, 7
+}
+
+// Tests that no new measurement for a stream is started until the previous
+// measurement has been written, skipping intervals if necessary. No data
+// points should be written after the sampler is unregistered.
+func TestLoopSlow(t *testing.T) {
+ ctx, ctxCancel := context.RootContext()
+ defer ctxCancel()
+ sc := scheduler{ctx: ctx}
+ start := time.Now()
+ ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+ // Start at 1, measuring every 3, active 0 to 14, sample takes 3, write takes 1:
+ // 1 -> finish at 5
+ // 4 skipped
+ // 7 -> finish at 11
+ // 10 skipped
+ // 13 -> would finish at 17, cancelled during sample
+ dwSlowSample := &delayWriter{delay: 1 * qdelay}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 3*qdelay), dwSlowSample.write)
+ })
+ sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+ // Start at 1, measuring every 3, active 0 to 18, sample takes 1, write takes 3:
+ // 1 -> finish at 5
+ // 4 skipped
+ // 7 -> finish at 11
+ // 10 skipped
+ // 13 -> finish at 17
+ // 16 -> would finish at 20, cancelled during write
+ dwSlowWrite := &delayWriter{delay: 3 * qdelay}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 3*qdelay, "47", 1*qdelay), dwSlowWrite.write)
+ })
+ sc.schedule(18*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+ sc.wg.Wait()
+ ml.WaitAll()
+
+ dwSlowSample.assert(t, "42@1;42@7;") // time 1, 7
+ dwSlowWrite.assert(t, "47@1;47@7;47@13;") // time 1, 7, 13
+}
+
+// Tests that re-registering an already registered stream cancels and waits for
+// the existing workers to stop before starting updated ones.
+func TestLoopReregister(t *testing.T) {
+ ctx, ctxCancel := context.RootContext()
+ defer ctxCancel()
+ sc := scheduler{ctx: ctx}
+ start := time.Now()
+ ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+ // 42: Start at 1, measuring every 2, active 0 to 6 -> 1, 3, 5
+ // 84: Start at 9, measuring every 3, active 6 to 14 -> 9, 12
+ dwNoDelay := &delayWriter{delay: 0}
+ sc.schedule(0*qdelay, func() {
+ ml.Unregister(streamKey("s1"))
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 2*qdelay, "42", 0), dwNoDelay.write)
+ })
+ sc.schedule(6*qdelay, func() {
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(9*qdelay), 3*qdelay, "84", 0), dwNoDelay.write)
+ })
+ sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+ // 1 -> sample until 6
+ // 3 skipped
+ // 5 skipped
+ // 7 -> sample would take until 12, cancelled
+ // re-Register at 8, effective at 8 (sample wait is interruptible)
+ // 9 -> sample until 13
+ // 12 skipped
+ // 15 -> sample until 19
+ // 18 skipped
+ dwSlowSample := &delayWriter{delay: 0}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 5*qdelay), dwSlowSample.write)
+ })
+ sc.schedule(8*qdelay, func() {
+ ml.Unregister(streamKey("s2"))
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 3*qdelay, "94", 4*qdelay), dwSlowSample.write)
+ })
+ sc.schedule(20*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+ // 1 -> write until 6
+ // 4 skipped
+ // 7 -> write would take until 12, cancelled
+ // re-Register starts at 8, effective at 12 (write wait is non-interruptible)
+ // 13 -> write until 18
+ // 15 skipped
+ // 18 -> would write until 23, cancelled
+ // 20 skipped
+ dwSlowWrite := &delayWriter{delay: 5 * qdelay}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(1*qdelay), 3*qdelay, "foo", 0), dwSlowWrite.write)
+ })
+ sc.schedule(8*qdelay, func() {
+ ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(-1*qdelay), 2*qdelay, "bar", 0), dwSlowWrite.write)
+ })
+ sc.schedule(21*qdelay, func() { ml.Unregister(streamKey("s3")) })
+
+ sc.wg.Wait()
+ ml.WaitAll()
+
+ dwNoDelay.assert(t, "42@1;42@3;42@5;"+"84@9;84@12;") // 42: time 1, 3, 5; 84: time 9, 12
+ dwSlowSample.assert(t, "47@1;"+"94@9;94@15;") // 47: time 1; 94: time 9, 15
+ dwSlowWrite.assert(t, "foo@1;"+"bar@13;") // foo: time 1; bar: time 13
+}
+
+// Tests that returning an error from the writer calls the fatal eror callback.
+func TestWriteError(t *testing.T) {
+ ctx, ctxCancel := context.RootContext()
+ defer ctxCancel()
+ sc := scheduler{ctx: ctx}
+ start := time.Now()
+ errSentinel := fmt.Errorf("errSentinel")
+ ml := newLoopWithRunner(func(err error) {
+ if got, want := err, errSentinel; got != want {
+ t.Errorf("%v unexpected fatal error: got %v, want %v", time.Now().Sub(start).Nanoseconds()/time.Duration(qdelay).Nanoseconds(), got, want)
+ }
+ ctxCancel()
+ }, newEchoDelaySampler(start))
+
+ // 1 -> write start at 1, finish at 3
+ // 4 -> write start at 4, finish at 6
+ // 7 -> write start at 7, cancelled at 8 (error)
+ dwNoDelay := &delayWriter{delay: 2 * qdelay}
+ sc.schedule(0*qdelay, func() {
+ ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 0), dwNoDelay.write)
+ })
+
+ // error: Start at 6, measure delay 2 -> write error at 8
+ sc.schedule(2*qdelay, func() {
+ ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 4*qdelay, "47", 2*qdelay),
+ func(_ *context.T, _ *sbmodel.KDataPoint, _ sbmodel.VDataPoint) error {
+ return errSentinel
+ })
+ })
+
+ sc.wg.Wait()
+ ml.WaitAll()
+
+ dwNoDelay.assert(t, "42@1;42@4;") // 42: time 1, 4
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
new file mode 100644
index 0000000..ee54ec1
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sampler implements an asynchronous worker for a single run of the
+// sampling script.
+package sampler
+
+import (
+ "bytes"
+ "fmt"
+ "os/exec"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+type MeasureCallback func(result *MeasureResult) error
+
+type MeasureResult struct {
+ Time time.Time
+ Data sbmodel.VDataPoint
+}
+
+// Run synchronously runs the sampler script, producing a single data point or
+// error message and passing it to the callback (also run synchronously). Fatal
+// errors (including callback errors) are returned from Run.
+// The script is terminated using SIGINT (followed by SIGKILL) to the process
+// group if the context is cancelled. The callback will be called at most once
+// per Run call (may not be called if cancelled or on fatal error).
+func Run(ctx *context.T, samDef *sbmodel.SamplerDef, cb MeasureCallback) error {
+ var out bytes.Buffer
+ // Run sampling script using bash and capture output.
+ sh := exec.Command("setsid", "bash")
+ sh.Stdin = bytes.NewBufferString(samDef.Script)
+ sh.Stdout = &out
+ // TODO(ivanpi): Fail on any stderr output?
+ // TODO(ivanpi): Use end instead of begin timestamp?
+ timestamp := time.Now()
+ // Failure to start the script is a fatal error.
+ if err := sh.Start(); err != nil {
+ return fmt.Errorf("failed starting measuring script: %v", err)
+ }
+ waiter := make(chan error, 1)
+ // Start script asynchronously.
+ go func() {
+ waiter <- sh.Wait()
+ }()
+ // Wait for script exit or cancellation.
+ // TODO(ivanpi): Add script timeout.
+ select {
+ case <-ctx.Done():
+ // If cancelled, send SIGINT to script process group.
+ syscall.Kill(-sh.Process.Pid, syscall.SIGINT)
+ // Wait for script to exit. After a delay, if not exited, send SIGKILL
+ // and wait.
+ select {
+ case <-waiter:
+ case <-time.After(config.ScriptKillDelay):
+ syscall.Kill(-sh.Process.Pid, syscall.SIGKILL)
+ <-waiter
+ }
+ return nil
+ case err := <-waiter:
+ // If script has exited, parse data point or get error.
+ if err != nil {
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Script error: %v", err)}})
+ }
+ measured, err := strconv.ParseFloat(strings.TrimSpace(out.String()), 64)
+ if err != nil {
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Parse error: %v", err)}})
+ }
+ return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointValue{Value: measured}})
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
new file mode 100644
index 0000000..04466c4
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
@@ -0,0 +1,146 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sampler_test
+
+import (
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ tu "v.io/x/ref/test/testutil"
+ "v.io/x/sensorlog_lite/internal/measure/sampler"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// Runs the sampler script and verifies that the callback was called with the
+// expected value or error.
+func runSamplerTest(t *testing.T, samDef *sbmodel.SamplerDef) sbmodel.VDataPoint {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ var gotRes *sampler.MeasureResult
+ start := time.Now()
+ if err := sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+ gotRes = res
+ return nil
+ }); err != nil {
+ t.Fatal(tu.FormatLogLine(3, "unexpected sampling error: %v", err))
+ }
+ end := time.Now()
+
+ if gotRes == nil {
+ t.Fatal(tu.FormatLogLine(3, "sampler callback was not called"))
+ }
+
+ if gotRes.Time.Before(start) || gotRes.Time.After(end) {
+ t.Error(tu.FormatLogLine(3, "invalid time: %v, expected between %v and %v", gotRes.Time, start, end))
+ }
+
+ return gotRes.Data
+}
+
+func runSamplerTestForValue(t *testing.T, samDef *sbmodel.SamplerDef, expectVal float64) {
+ res := runSamplerTest(t, samDef)
+ switch res := res.(type) {
+ case sbmodel.VDataPointValue:
+ if res.Value != expectVal {
+ t.Error(tu.FormatLogLine(2, "unexpected value, got: %v, want: %v", res.Value, expectVal))
+ }
+ default:
+ t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want value: %v", res, expectVal))
+ }
+}
+
+func runSamplerTestForError(t *testing.T, samDef *sbmodel.SamplerDef, expectErrPrefix string) {
+ res := runSamplerTest(t, samDef)
+ switch res := res.(type) {
+ case sbmodel.VDataPointError:
+ if !strings.HasPrefix(res.Value, expectErrPrefix) {
+ t.Error(tu.FormatLogLine(2, "unexpected error, got: %v, want prefix: %v", res.Value, expectErrPrefix))
+ }
+ default:
+ t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want error with prefix: %v", res, expectErrPrefix))
+ }
+}
+
+func TestSamplerRun(t *testing.T) {
+ runSamplerTestForValue(t, &sbmodel.SamplerDef{
+ Script: `echo 42`,
+ Start: time.Now(),
+ Interval: time.Second, // 7.5M years overflows time.Duration
+ }, 42.0)
+
+ runSamplerTestForValue(t, &sbmodel.SamplerDef{
+ Script: `
+for i in $(seq 10000); do :; done
+printf "%f\n" -12.73
+exit 0
+`,
+ Start: time.Now(),
+ Interval: time.Millisecond,
+ }, -12.73)
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42; exit 1`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Script error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `exit 0`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42foo`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+
+ runSamplerTestForError(t, &sbmodel.SamplerDef{
+ Script: `echo 42; echo 47`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }, "Parse error")
+}
+
+func TestSamplerCancel(t *testing.T) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ // The script sleeps for 10 seconds, not interruptible by SIGINT/SIGTERM.
+ samDef := &sbmodel.SamplerDef{
+ Script: `
+trap 'sleep 10' INT TERM
+sleep 10
+echo 42
+`,
+ Start: time.Now(),
+ Interval: time.Second,
+ }
+ // Asynchronously run sampler, cancelling shortly after.
+ done := make(chan error, 1)
+ go func() {
+ done <- sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+ t.Errorf("worker was cancelled, result callback should not have been called, got: %v", res.Data)
+ return nil
+ })
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ cancelTime := time.Now()
+ cancel()
+
+ if err := <-done; err != nil {
+ t.Errorf("unexpected sampling error: %v", err)
+ }
+ // Cancel should have killed the script using SIGKILL a short time after the
+ // script failed to exit on SIGINT.
+ if cancelTime.Add(1 * time.Second).Before(time.Now()) {
+ t.Errorf("sampling script took more than a second to stop")
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
new file mode 100644
index 0000000..1ee3909
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for syncgroup management.
+
+package measure
+
+import (
+ "fmt"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/security/access"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// InitSyncgroup creates the syncgroup for the measuring device devId, giving
+// full configuration access to admin. The syncgroup uses sgPublishSb and
+// sgMountTables for publishing (create/join) and syncing, respectively.
+// InitSyncgroup must not be called concurrently for the same devId, or
+// retried with different parameters for the same devId, otherwise behaviour
+// is unspecified.
+func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
+ if err := keyutil.ValidateId(devId); err != nil {
+ return fmt.Errorf("invalid devId: %v", err)
+ }
+
+ sgName := config.SyncgroupName(sgPublishSb, devId)
+ // Check for syncgroup. If it already exists, we have nothing to do.
+ if sgs, err := db.GetSyncgroupNames(ctx); err != nil {
+ return err
+ } else {
+ for _, sg := range sgs {
+ if sg == sgName {
+ return nil
+ }
+ }
+ }
+
+ // Both measured and admin client have full permissions on the syncgroup.
+ sgAcl := access.Permissions{}
+ sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.AllTypicalTags()...)
+ sbutil.AddPermsForPattern(&sgAcl, admin, access.AllTypicalTags()...)
+
+ // Maps all syncgroup prefixes to ACLs.
+ prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
+
+ // StreamDef : <devId>
+ // Admin client has full permissions, measured drops to readonly.
+ prefixStreamDef := nosql_wire.TableRow{
+ TableName: sbmodel.KStreamDef{}.Table(),
+ Row: devId,
+ }
+ aclStreamDef := access.Permissions{}
+ sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
+ sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
+ prefixSpec[prefixStreamDef] = aclStreamDef
+
+ // DataPoint : <devId>
+ // Admin client has full permissions, measured drops to read/write.
+ prefixDataPoint := nosql_wire.TableRow{
+ TableName: sbmodel.KDataPoint{}.Table(),
+ Row: devId,
+ }
+ aclDataPoint := access.Permissions{}
+ sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
+ sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+ prefixSpec[prefixDataPoint] = aclDataPoint
+
+ var prefixes []nosql_wire.TableRow
+ // Apply prefix ACLs to all syncgroup prefixes.
+ for prefix, prefixAcl := range prefixSpec {
+ // Ignore ErrNoAccess, assume we already dropped permissions.
+ err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, nosql.Prefix(prefix.Row), prefixAcl)
+ if err != nil && verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ return err
+ }
+ prefixes = append(prefixes, prefix)
+ }
+
+ sgSpec := nosql_wire.SyncgroupSpec{
+ Description: fmt.Sprintf("measured-main-%s", devId),
+ Perms: sgAcl,
+ Prefixes: prefixes,
+ MountTables: sgMountTables,
+ }
+ sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+
+ return db.Syncgroup(sgName).Create(ctx, sgSpec, sgMemberInfo)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
new file mode 100644
index 0000000..d447fa6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+ "bytes"
+ "reflect"
+ "testing"
+
+ "v.io/v23/security/access"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+func TestCreateSyncgroup(t *testing.T) {
+ _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as measured.
+ db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, sbmodel.MeasuredTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ devId := "measured1"
+ admin := "root:two"
+ syncMts := []string{}
+
+ // Creating the syncgroup should succeed.
+ if err := measure.InitSyncgroup(ctxMeasured, db, devId, admin, sbName, syncMts); err != nil {
+ t.Fatalf("InitSyncgroup failed: %v", err)
+ }
+
+ sgName := config.SyncgroupName(sbName, devId)
+ if sgs, err := db.GetSyncgroupNames(ctxMeasured); err != nil {
+ t.Fatalf("GetSyncgroupNames failed: %v", err)
+ } else if got, want := sgs, []string{sgName}; !reflect.DeepEqual(got, want) {
+ t.Errorf("GetSyncgroupNames got: %v, want: %v", got, want)
+ }
+
+ // Creating the syncgroup should be idempotent.
+ if err := measure.InitSyncgroup(ctxMeasured, db, devId, admin, sbName, syncMts); err != nil {
+ t.Errorf("InitSyncgroup should be idempotent, retry failed: %v", err)
+ }
+
+ // measured should have dropped privileges on <StreamDefTable>/<devId>.
+ expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
+ "Admin":{"In":["root:two"]},
+ "Read":{"In":["root:two", "root:one"]},
+ "Write":{"In":["root:two"]},
+ "Debug":{"In":["root:two"]},
+ "Resolve":{"In":["root:two", "root:one"]}
+ }`))
+ if err != nil {
+ t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+ }
+ sgDataTable := db.Table(sbmodel.KStreamDef{}.Table())
+ if gotPerms, err := sgDataTable.GetPrefixPermissions(ctxMeasured, devId); err != nil {
+ t.Errorf("GetPrefixPermissions failed: %v", err)
+ } else if got, want := gotPerms[0].Perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
+ t.Errorf("Unexpected permissions on streamdef/<devId>: got %v, want %v", got, want)
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
new file mode 100644
index 0000000..6f7897e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for configuration change watching.
+
+package measure
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/services/watch"
+ "v.io/v23/syncbase/nosql"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// RegisterWorker is called with the key and value of a new or modified
+// StreamDef. Thread safety is not required.
+type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
+
+// WatchForStreams watches Syncbase for new and modified stream definitions
+// for the specified measuring device and calls the register callback.
+// If a malformed StreamDef key or value is encountered, or register returns
+// an error, measured exits with an error.
+// WatchForStreams is synchronous and runs until the context is cancelled or
+// an error is encountered.
+func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+ tableName := sbmodel.KStreamDef{}.Table()
+ watchPrefix := keyutil.Join(devId, "")
+ var resMark watch.ResumeMarker
+
+ // BeginBatch scoped using function with deferred Abort.
+ if err := func() error {
+ bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ return err
+ }
+ defer bdb.Abort(ctx)
+
+ resMark, err = bdb.GetResumeMarker(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Register samplers for all existing StreamDefs.
+ sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KStreamDef{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+ }
+ val := &sbmodel.VStreamDef{}
+ if err := sstr.Value(val); err != nil {
+ return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
+ }
+ if err := register(key, val); err != nil {
+ return err
+ }
+ }
+ return sstr.Err()
+ }(); err != nil {
+ return err
+ }
+
+ // Watch for StreamDef changes and register samplers as needed.
+ ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+ if err != nil {
+ return err
+ }
+ defer ws.Cancel()
+
+ for ws.Advance() {
+ c := ws.Change()
+ key := &sbmodel.KStreamDef{}
+ if err := key.Parse(c.Row); err != nil {
+ return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+ }
+ switch c.ChangeType {
+ case nosql.PutChange:
+ val := &sbmodel.VStreamDef{}
+ if err := c.Value(val); err != nil {
+ return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
+ }
+ if err := register(key, val); err != nil {
+ return err
+ }
+ case nosql.DeleteChange:
+ return fmt.Errorf("StreamDef delete is not supported")
+ }
+ }
+ return ws.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
new file mode 100644
index 0000000..b28cb78
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+ "fmt"
+ "reflect"
+ "sort"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/syncbase/nosql"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ tu "v.io/x/ref/test/testutil"
+ "v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var watchTables = []sbmodel.TableSpec{
+ {Prototype: &sbmodel.KStreamDef{}},
+}
+
+func TestWatchForStreams(t *testing.T) {
+ _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as measured. Keep write permission on StreamDef.
+ db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ devId := "dev1"
+ otherDev := "dev2"
+ expectStreams := []string{"str1", "str2", "str3"}
+ gotStreams := make([]string, 0, len(expectStreams))
+
+ time.Sleep(1 * time.Second)
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
+ putStreamDef(t, ctxMeasured, db, otherDev, "foo")
+
+ // Watch should call the callback for all devId streams, whether they were
+ // written before or after starting watch. Streams for other devices should
+ // not be returned.
+ time.Sleep(1 * time.Second)
+ stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+ if got, want := key.DevId, devId; got != want {
+ return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+ }
+ gotStreams = append(gotStreams, val.Desc)
+ return nil
+ })
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])
+
+ time.Sleep(1 * time.Second)
+ putStreamDef(t, ctxMeasured, db, otherDev, "bar")
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err != nil {
+ t.Fatalf("watcher exited with error: %v", err)
+ }
+
+ sort.Strings(gotStreams)
+ sort.Strings(expectStreams)
+ if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+ t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+ }
+}
+
+func TestWatchError(t *testing.T) {
+ _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as measured. Keep write permission on StreamDef.
+ db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ // Watch should return the callback error.
+ devId := "dev1"
+ cbErr := fmt.Errorf("callbackError")
+ stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+ return cbErr
+ })
+ // Put anything to trigger callback.
+ putStreamDef(t, ctxMeasured, db, devId, "foo")
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err != cbErr {
+ t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
+ }
+
+ // Watch should return an error when a malformed key is encountered.
+ devId = "dev2"
+ stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+ return nil
+ })
+ putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err == nil {
+ t.Errorf("watch should have failed on malformed key")
+ }
+}
+
+func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+ ctx, stop = context.WithCancel(ctx)
+ wait = util.AsyncRun(func() error {
+ return measure.WatchForStreams(ctx, db, devId, register)
+ }, nil)
+ return stop, wait
+}
+
+func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+ key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+ val := sbmodel.VStreamDef{Desc: desc}
+ if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
+ t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
new file mode 100644
index 0000000..4286bff
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
@@ -0,0 +1,65 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for building and parsing Sensor Log data keys.
+package keyutil
+
+import (
+ "crypto/rand"
+ "encoding/hex"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const Sep = '$'
+
+// Join joins key parts into key.
+func Join(parts ...string) string {
+ return strings.Join(parts, string(Sep))
+}
+
+// Split splits key into parts and verifies that the number of parts matches
+// the expected number, returning an error otherwise.
+func Split(key string, expectParts int) ([]string, error) {
+ parts := strings.Split(key, string(Sep))
+ if gotParts := len(parts); expectParts != gotParts {
+ return nil, fmt.Errorf("invalid key %q: expected %d parts, got %d", key, expectParts, gotParts)
+ }
+ return parts, nil
+}
+
+func StringifyTime(t time.Time) string {
+ return fmt.Sprintf("%016x", t.UnixNano())
+}
+
+func ParseTime(t string) (time.Time, error) {
+ nsec, err := strconv.ParseInt(t, 16, 64)
+ if err != nil {
+ return time.Time{}, fmt.Errorf("failed parsing time: %v", err)
+ }
+ return time.Unix(0, nsec), nil
+}
+
+// ValidateId returns an error if the argument is not a valid identifier.
+func ValidateId(id string) error {
+ if id == "" {
+ return fmt.Errorf("cannot be empty")
+ }
+ if strings.Contains(id, string(Sep)) {
+ return fmt.Errorf("cannot contain %q", Sep)
+ }
+ return nil
+}
+
+// UUID generates a random UUID.
+func UUID() string {
+ uuid := make([]byte, 16)
+ if _, err := rand.Read(uuid); err != nil {
+ panic(fmt.Errorf("rng failed: %v", err))
+ }
+ // TODO(ivanpi): Use base64 once Syncbase keys are less restricted.
+ return hex.EncodeToString(uuid)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
new file mode 100644
index 0000000..32167f6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -0,0 +1,87 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Implements PersistentDataKey for each K<T> type, as described in types.vdl.
+
+package sbmodel
+
+import (
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// PersistentDataKey is a type encapsulating data from the row key of a
+// top-level value type persisted to Syncbase.
+type PersistentDataKey interface {
+ // Table returns the name of the Syncbase table for the data type.
+ Table() string
+ // Key returns the row key for the value.
+ Key() string
+ // Parse parses the row key for the value into self. Returns an error if key
+ // is malformed.
+ Parse(key string) error
+}
+
+// devicecfg : <DevId>
+func (_ KDeviceCfg) Table() string { return "devicecfg" }
+func (k *KDeviceCfg) Key() string { return keyutil.Join(k.DevId) }
+func (k *KDeviceCfg) Parse(key string) error {
+ parts, err := keyutil.Split(key, 1)
+ if err != nil {
+ return err
+ }
+ k.DevId = parts[0]
+ return nil
+}
+
+// streamdef : <DevId>/<StreamId>
+func (_ KStreamDef) Table() string { return "streamdef" }
+func (k *KStreamDef) Key() string { return keyutil.Join(k.DevId, k.StreamId) }
+func (k *KStreamDef) Parse(key string) error {
+ parts, err := keyutil.Split(key, 2)
+ if err != nil {
+ return err
+ }
+ k.DevId, k.StreamId = parts[0], parts[1]
+ return nil
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+func (k KDataPoint) Table() string {
+ return "sdata"
+}
+func (k *KDataPoint) Key() string {
+ return keyutil.Join(k.DevId, k.StreamId, keyutil.StringifyTime(k.Timestamp))
+}
+func (k *KDataPoint) Parse(key string) error {
+ parts, err := keyutil.Split(key, 3)
+ if err != nil {
+ return err
+ }
+ timestamp, err := keyutil.ParseTime(parts[2])
+ if err != nil {
+ return err
+ }
+ k.DevId, k.StreamId, k.Timestamp = parts[0], parts[1], timestamp
+ return nil
+}
+
+// TableSpec defines a Syncbase table, encapsulating a key prototype and
+// permissions.
+type TableSpec struct {
+ Prototype PersistentDataKey
+ ReadOnly bool
+}
+
+// All top-level types persisted to master device Syncbase.
+var MasterTables = []TableSpec{
+ {Prototype: &KDeviceCfg{}},
+ {Prototype: &KStreamDef{}},
+ {Prototype: &KDataPoint{}},
+}
+
+// All top-level types persisted to measured Syncbase.
+var MeasuredTables = []TableSpec{
+ {Prototype: &KStreamDef{}, ReadOnly: true},
+ {Prototype: &KDataPoint{}},
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
new file mode 100644
index 0000000..5f729bf
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Syncbase data model for Sensor Log.
+//
+// Every type <T> stored in Syncbase is defined as a pair of types, K<T> and
+// V<T>, representing data stored in the key and value, respectively, in a
+// single table. K<T> types satisfy the PersistentDataKey interface, supporting
+// conversion to and from the row key.
+package sbmodel
+
+import (
+ "time"
+)
+
+// devicecfg : <DevId>
+// Measuring device handle. Master only.
+type VDeviceCfg struct {
+ // Human-readable, not necessarily unique description of the device.
+ Desc string
+ // Syncbase instance publishing the syncgroup created by the device.
+ SgPublishSb string
+}
+
+type KDeviceCfg struct {
+ DevId string
+}
+
+// streamdef : <DevId>/<StreamId>
+// Configures a stream of data to be measured.
+type VStreamDef struct {
+ // Human-readable, not necessarily unique description of the stream.
+ Desc string
+ // Sampling configuration.
+ Sampler SamplerDef
+ // Flag to start and stop sampling.
+ Enabled bool
+}
+
+type KStreamDef struct {
+ DevId string
+ StreamId string
+}
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+ // Shell script executed after every Interval, starting from Start.
+ // It should output a single data point, if available. A non-zero exit
+ // status or failure to parse the value will produce an error instead.
+ Script string
+ Start time.Time
+ Interval time.Duration
+ // TODO(ivanpi): Add timeout.
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+ DevId string
+ StreamId string
+ Timestamp time.Time
+}
+
+type VDataPoint union {
+ Value float64
+ Error string
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
new file mode 100644
index 0000000..3f1c289
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
@@ -0,0 +1,147 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+// Syncbase data model for Sensor Log.
+//
+// Every type <T> stored in Syncbase is defined as a pair of types, K<T> and
+// V<T>, representing data stored in the key and value, respectively, in a
+// single table. K<T> types satisfy the PersistentDataKey interface, supporting
+// conversion to and from the row key.
+package sbmodel
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "time"
+ _ "v.io/v23/vdlroot/time"
+)
+
+// devicecfg : <DevId>
+// Measuring device handle. Master only.
+type VDeviceCfg struct {
+ // Human-readable, not necessarily unique description of the device.
+ Desc string
+ // Syncbase instance publishing the syncgroup created by the device.
+ SgPublishSb string
+}
+
+func (VDeviceCfg) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDeviceCfg"`
+}) {
+}
+
+type KDeviceCfg struct {
+ DevId string
+}
+
+func (KDeviceCfg) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDeviceCfg"`
+}) {
+}
+
+// streamdef : <DevId>/<StreamId>
+// Configures a stream of data to be measured.
+type VStreamDef struct {
+ // Human-readable, not necessarily unique description of the stream.
+ Desc string
+ // Sampling configuration.
+ Sampler SamplerDef
+ // Flag to start and stop sampling.
+ Enabled bool
+}
+
+func (VStreamDef) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VStreamDef"`
+}) {
+}
+
+type KStreamDef struct {
+ DevId string
+ StreamId string
+}
+
+func (KStreamDef) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KStreamDef"`
+}) {
+}
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+ // Shell script executed after every Interval, starting from Start.
+ // It should output a single data point, if available. A non-zero exit
+ // status or failure to parse the value will produce an error instead.
+ Script string
+ Start time.Time
+ Interval time.Duration
+}
+
+func (SamplerDef) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.SamplerDef"`
+}) {
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+ DevId string
+ StreamId string
+ Timestamp time.Time
+}
+
+func (KDataPoint) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDataPoint"`
+}) {
+}
+
+type (
+ // VDataPoint represents any single field of the VDataPoint union type.
+ VDataPoint interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the VDataPoint union type.
+ __VDLReflect(__VDataPointReflect)
+ }
+ // VDataPointValue represents field Value of the VDataPoint union type.
+ VDataPointValue struct{ Value float64 }
+ // VDataPointError represents field Error of the VDataPoint union type.
+ VDataPointError struct{ Value string }
+ // __VDataPointReflect describes the VDataPoint union type.
+ __VDataPointReflect struct {
+ Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDataPoint"`
+ Type VDataPoint
+ Union struct {
+ Value VDataPointValue
+ Error VDataPointError
+ }
+ }
+)
+
+func (x VDataPointValue) Index() int { return 0 }
+func (x VDataPointValue) Interface() interface{} { return x.Value }
+func (x VDataPointValue) Name() string { return "Value" }
+func (x VDataPointValue) __VDLReflect(__VDataPointReflect) {}
+
+func (x VDataPointError) Index() int { return 1 }
+func (x VDataPointError) Interface() interface{} { return x.Value }
+func (x VDataPointError) Name() string { return "Error" }
+func (x VDataPointError) __VDLReflect(__VDataPointReflect) {}
+
+func init() {
+ vdl.Register((*VDeviceCfg)(nil))
+ vdl.Register((*KDeviceCfg)(nil))
+ vdl.Register((*VStreamDef)(nil))
+ vdl.Register((*KStreamDef)(nil))
+ vdl.Register((*SamplerDef)(nil))
+ vdl.Register((*KDataPoint)(nil))
+ vdl.Register((*VDataPoint)(nil))
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
new file mode 100644
index 0000000..f166f99
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sbmodel_test
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type keyTest interface {
+ Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey)
+}
+
+// validKeyTest verifies that the key is correctly parsed and built.
+type validKeyTest struct {
+ keyStr string
+ keyParsed sbmodel.PersistentDataKey
+}
+
+func (kt *validKeyTest) Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey) {
+ if err := keyPrototype.Parse(kt.keyStr); err != nil {
+ t.Errorf("key %q parse failed: %v", kt.keyStr, err)
+ return
+ }
+ if got, want := keyPrototype, kt.keyParsed; !reflect.DeepEqual(got, want) {
+ t.Errorf("incorrect key parse: got %v, want %v", got, want)
+ }
+ if got, want := kt.keyParsed.Table(), keyPrototype.Table(); got != want {
+ t.Errorf("incorrect parsed key table: got %v, want %v", got, want)
+ }
+ if got, want := kt.keyParsed.Key(), kt.keyStr; got != want {
+ t.Errorf("incorrect key build: got %s, want %s", got, want)
+ }
+}
+
+// invalidKeyTest verifies that malformed key parsing fails.
+type invalidKeyTest struct {
+ keyStr string
+}
+
+func (kt *invalidKeyTest) Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey) {
+ if err := keyPrototype.Parse(kt.keyStr); err == nil {
+ t.Errorf("key %q parse should have failed", kt.keyStr)
+ }
+}
+
+func TestDeviceCfgKeys(t *testing.T) {
+ tests := []keyTest{
+ &validKeyTest{
+ keyStr: keyutil.Join("foo"),
+ keyParsed: &sbmodel.KDeviceCfg{
+ DevId: "foo",
+ },
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("foo", "bar"),
+ },
+ }
+ for _, kt := range tests {
+ kt.Run(t, &sbmodel.KDeviceCfg{})
+ }
+}
+
+func TestStreamDefKeys(t *testing.T) {
+ tests := []keyTest{
+ &validKeyTest{
+ keyStr: keyutil.Join("pc", "cputemp"),
+ keyParsed: &sbmodel.KStreamDef{
+ DevId: "pc",
+ StreamId: "cputemp",
+ },
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("pc"),
+ },
+ }
+ for _, kt := range tests {
+ kt.Run(t, &sbmodel.KStreamDef{})
+ }
+}
+
+func TestDataPointKeys(t *testing.T) {
+ tests := []keyTest{
+ &validKeyTest{
+ keyStr: keyutil.Join("meter", "amps", "00000022fcde41b2"),
+ keyParsed: &sbmodel.KDataPoint{
+ DevId: "meter",
+ StreamId: "amps",
+ Timestamp: time.Unix(0, 0x22fcde41b2),
+ },
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("meter", "amps"),
+ },
+ &invalidKeyTest{
+ keyStr: keyutil.Join("meter", "amps", "2.17"),
+ },
+ }
+ for _, kt := range tests {
+ kt.Run(t, &sbmodel.KDataPoint{})
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
new file mode 100644
index 0000000..1bc1784
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for creating and opening the database in Syncbase.
+package sbutil
+
+import (
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ "v.io/v23/syncbase"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
+// instance, creating it if missing, initializing specified tables.
+func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (nosql.Database, error) {
+ aclFull := access.Permissions{}
+ // Allow everyone to resolve to allow joining syncgroups.
+ AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
+ // Restrict other permissions to self.
+ AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin, access.Debug)
+
+ aclReadOnly := access.Permissions{}
+ // Allow everyone to resolve to allow joining syncgroups.
+ AddPermsForPattern(&aclReadOnly, string(security.AllPrincipals), access.Resolve)
+ // Restrict other permissions to self, except Write.
+ AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin, access.Debug)
+
+ sbs := syncbase.NewService(sbService)
+ app := sbs.App(config.AppName)
+ if err := createIfMissing(ctx, app, aclFull); err != nil {
+ return nil, err
+ }
+
+ // TODO(ivanpi): Add schema version.
+ db := app.NoSQLDatabase(config.DBName, nil)
+ if err := createIfMissing(ctx, db, aclFull); err != nil {
+ return nil, err
+ }
+
+ // TODO(ivanpi): Add table schemas when available.
+ for _, ts := range tables {
+ tb := db.Table(ts.Prototype.Table())
+ acl := aclReadOnly
+ if !ts.ReadOnly {
+ acl = aclFull
+ }
+ if err := createIfMissing(ctx, tb, acl); err != nil {
+ return nil, err
+ }
+ }
+
+ return db, nil
+}
+
+// creatable is satisfied by Syncbase hierarchy layers (app, db, table) that
+// can be created and tested for existence.
+type creatable interface {
+ Create(ctx *context.T, acl access.Permissions) error
+ Exists(ctx *context.T) (bool, error)
+}
+
+// createIfMissing checks if the given creatable layer exists and, if not,
+// creates it.
+// TODO(ivanpi): Syncbase client helpers for MustExist / CreateIfMissing.
+func createIfMissing(ctx *context.T, target creatable, acl access.Permissions) error {
+ if exists, err := target.Exists(ctx); err != nil {
+ return err
+ } else if exists {
+ return nil
+ }
+ if err := target.Create(ctx, acl); err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+ return err
+ }
+ return nil
+}
+
+// AddPermsForPrincipal adds to the ACL all specified permissions for all
+// default blessings of the provided principal.
+func AddPermsForPrincipal(acl *access.Permissions, principal security.Principal, tags ...access.Tag) {
+ for _, pattern := range security.DefaultBlessingPatterns(principal) {
+ AddPermsForPattern(acl, string(pattern), tags...)
+ }
+}
+
+// AddPermsForPattern adds to the ACL all specified permissions for the
+// specified pattern.
+func AddPermsForPattern(acl *access.Permissions, pattern string, tags ...access.Tag) {
+ for _, tag := range tags {
+ acl.Add(security.BlessingPattern(pattern), string(tag))
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
new file mode 100644
index 0000000..8b4155a
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -0,0 +1,127 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sbutil_test
+
+import (
+ "bytes"
+ "reflect"
+ "testing"
+
+ "v.io/v23/security/access"
+ "v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+func TestCreateOrOpenDB(t *testing.T) {
+ _, ctxOwner, sbName, rootPrincipal, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+ defer cleanup()
+ ctxGuest := sbtu.NewCtx(ctxOwner, rootPrincipal, "two")
+
+ // Try to open app/db (create both) as guest, fail with ErrNoAccess.
+ if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Errorf("CreateOrOpenDB should have failed with ErrNoAccess, got error: %v", err)
+ }
+ // Open app/db (create both) as owner.
+ dbOwner, err := sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MasterTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+ // Open existing app/db as guest.
+ if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); err != nil {
+ t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+ // Destroy db (but not app) to simulate interrupted creation.
+ if err := dbOwner.Destroy(ctxOwner); err != nil {
+ t.Errorf("dbOwner.Destroy should have succeeded, got error: %v", err)
+ }
+ // Try to open app/db (create db) as guest, fail with ErrNoAccess.
+ if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Errorf("CreateOrOpenDB should have failed with ErrNoAccess, got error: %v", err)
+ }
+ // Open app/db (recreate db) as owner.
+ dbOwner, err = sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MasterTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+ // Open recreated app/db as guest.
+ dbGuest, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables)
+ if err != nil {
+ t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+ // Expect db permissions with full access for owner, resolve only for others.
+ expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
+ "Admin":{"In":["root:one"]},
+ "Read":{"In":["root:one"]},
+ "Write":{"In":["root:one"]},
+ "Debug":{"In":["root:one"]},
+ "Resolve":{"In":["..."]}
+ }`))
+ if err != nil {
+ t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+ }
+ if perms, _, err := dbOwner.GetPermissions(ctxOwner); err != nil {
+ t.Errorf("GetPermissions should have succeeded, got error: %v", err)
+ } else if got, want := perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
+ t.Errorf("Unexpected database permissions: got %v, want %v", got, want)
+ }
+ // Check that all tables exist.
+ for _, ts := range sbmodel.MasterTables {
+ tb := dbGuest.Table(ts.Prototype.Table())
+ if exists, err := tb.Exists(ctxGuest); err != nil || !exists {
+ t.Errorf("Expected table %s to exist, got: %v (error: %v)", tb.Name(), exists, err)
+ }
+ }
+}
+
+func TestTablePermissions(t *testing.T) {
+ _, ctxOwner, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as owner.
+ dbOwner, err := sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MeasuredTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ expectPermsFull, err := access.ReadPermissions(bytes.NewBufferString(`{
+ "Admin":{"In":["root:one"]},
+ "Read":{"In":["root:one"]},
+ "Write":{"In":["root:one"]},
+ "Debug":{"In":["root:one"]},
+ "Resolve":{"In":["..."]}
+ }`))
+ if err != nil {
+ t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+ }
+ expectPermsReadOnly, err := access.ReadPermissions(bytes.NewBufferString(`{
+ "Admin":{"In":["root:one"]},
+ "Read":{"In":["root:one"]},
+ "Debug":{"In":["root:one"]},
+ "Resolve":{"In":["..."]}
+ }`))
+ if err != nil {
+ t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+ }
+
+ // Check that all tables have correct permissions (full or readonly).
+ for _, ts := range sbmodel.MeasuredTables {
+ tb := dbOwner.Table(ts.Prototype.Table())
+ if exists, err := tb.Exists(ctxOwner); err != nil || !exists {
+ t.Errorf("Expected table %s to exist, got: %v (error: %v)", tb.Name(), exists, err)
+ }
+ want := expectPermsFull
+ if ts.ReadOnly {
+ want = expectPermsReadOnly
+ }
+ if got, err := tb.GetPermissions(ctxOwner); err != nil {
+ t.Errorf("GetPermissions should have succeeded, got error: %v", err)
+ } else if got, want = got.Normalize(), want.Normalize(); !reflect.DeepEqual(got, want) {
+ t.Errorf("Unexpected table %s permissions: got %v, want %v", tb.Name(), got, want)
+ }
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util.go b/go/src/v.io/x/sensorlog/internal/util/util.go
new file mode 100644
index 0000000..e01c63e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for running tasks asynchronously.
+package util
+
+import (
+ "v.io/v23/verror"
+)
+
+type Task func() error
+type ErrorCb func(err error)
+
+// AsyncRun asynchronously starts task and returns a function that can be used
+// to wait for task completion, returning the error if any. In addition, if the
+// task returns a non-nil error, onError is called with the error. onError can
+// be nil. ErrCanceled is ignored (treated as a nil error).
+func AsyncRun(task Task, onError ErrorCb) (wait func() error) {
+ var err error
+ done := make(chan error, 1)
+ go func() {
+ defer close(done)
+ err = task()
+ // Ignore ErrCanceled.
+ if verror.ErrorID(err) == verror.ErrCanceled.ID {
+ err = nil
+ }
+ // Call onError if provided and err is not nil.
+ if err != nil && onError != nil {
+ onError(err)
+ }
+ }()
+ return func() error {
+ <-done
+ return err
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util_test.go b/go/src/v.io/x/sensorlog/internal/util/util_test.go
new file mode 100644
index 0000000..90a820f
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util_test.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+ errFoo = errors.New("errFoo")
+ errBar = errors.New("errBar")
+)
+
+func TestAsyncRun(t *testing.T) {
+ wait1 := util.AsyncRun(func() error {
+ time.Sleep(200 * time.Microsecond)
+ return nil
+ }, func(err error) {
+ t.Errorf("unexpected callback with error: %v", err)
+ })
+
+ var ecb2 error
+ wait2 := util.AsyncRun(func() error {
+ time.Sleep(100 * time.Microsecond)
+ return errFoo
+ }, func(err error) {
+ ecb2 = err
+ })
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+ if wgot, cbgot, want := wait2(), ecb2, errFoo; wgot != want || cbgot != want {
+ t.Errorf("wait2 unexpected error: wait got %v, cb got %v, want %v", wgot, cbgot, want)
+ }
+}
+
+func TestAsyncRunNoCallback(t *testing.T) {
+ wait1 := util.AsyncRun(func() error {
+ time.Sleep(200 * time.Microsecond)
+ return nil
+ }, nil)
+
+ wait2 := util.AsyncRun(func() error {
+ time.Sleep(100 * time.Microsecond)
+ return errBar
+ }, nil)
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+ if wgot, want := wait2(), errBar; wgot != want {
+ t.Errorf("wait2 unexpected error: got %v, want %v", wgot, want)
+ }
+}
+
+func TestAsyncRunIgnoreErrCancelled(t *testing.T) {
+ ctx, cancel := context.RootContext()
+
+ wait1 := util.AsyncRun(func() error {
+ <-ctx.Done()
+ return verror.New(verror.ErrCanceled, ctx)
+ }, func(err error) {
+ t.Errorf("unexpected callback with error: %v", err)
+ })
+
+ cancel()
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
new file mode 100644
index 0000000..7c75976
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -0,0 +1,113 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// measured is the Sensor Log Lite measuring daemon. Runs on any device,
+// sampling data points and writing them to Syncbase. Sampling configuration
+// is read from Syncbase as written by the client.
+package main
+
+import (
+ "flag"
+ "os"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/lib/signals"
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/measure/runloop"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+ flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.")
+ flagDevId = flag.String("devid", "", "DevId to be claimed by this measured. Must be specified.")
+ // Flags below are only applied the first time measured is run against a Syncbase service with the same devid.
+ flagAdmin = flag.String("admin", "", "Blessing of admin user allowed to join the syncgroup. Must be specified.")
+ flagPublishSb = flag.String("publish-sb", "", "Syncbase service to publish the syncgroup at. Must be absolute. Must be specified. The syncgroup is published as '"+config.SyncgroupName("<publish-sb>", "<devid>")+"'.")
+ flagPublishMt = flag.String("publish-mt", "", "Additional mounttable to use for sync rendezvous in addition to namespace roots. Optional.")
+)
+
+func main() {
+ os.Exit(runMain())
+}
+
+func runMain() int {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ if *flagDevId == "" {
+ vlog.Errorf("-devid must be specified")
+ return 1
+ }
+ if *flagAdmin == "" {
+ vlog.Errorf("-admin must be specified")
+ return 1
+ }
+ if !naming.Rooted(*flagPublishSb) {
+ vlog.Errorf("-publish-sb must be rooted")
+ return 1
+ }
+ publishMts := v23.GetNamespace(ctx).Roots()
+ if *flagPublishMt != "" {
+ publishMts = append(publishMts, *flagPublishMt)
+ }
+
+ db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MeasuredTables)
+ if err != nil {
+ vlog.Errorf("Failed opening Syncbase db: %v", err)
+ return 1
+ }
+ vlog.VI(0).Infof("measured connected to %s", db.FullName())
+
+ if err := measure.InitSyncgroup(ctx, db, *flagDevId, *flagAdmin, *flagPublishSb, publishMts); err != nil {
+ vlog.Errorf("Failed initializing syncgroup: %v", err)
+ return 1
+ }
+
+ ctx, stop := context.WithCancel(ctx)
+
+ ml := runloop.NewLoop(func(err error) {
+ vlog.Errorf("Measure loop failed: %v", err)
+ stop()
+ })
+
+ writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ return db.Table(key.Table()).Put(ctx, key.Key(), val)
+ }
+
+ waitWatch := util.AsyncRun(func() error {
+ return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+ if val.Enabled {
+ ml.Register(ctx, key, &val.Sampler, writer)
+ } else {
+ ml.Unregister(key)
+ }
+ return nil
+ })
+ }, func(err error) {
+ vlog.Errorf("Watch failed: %v", err)
+ stop()
+ })
+
+ defer func() {
+ _ = waitWatch()
+ ml.WaitAll()
+ }()
+
+ select {
+ case <-signals.ShutdownOnSignals(nil):
+ stop()
+ vlog.VI(0).Infof("Exiting on signal")
+ return 0
+ case <-ctx.Done():
+ vlog.VI(0).Infof("Exiting on error")
+ return 1
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh b/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh
new file mode 100755
index 0000000..e8ca090
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Starts an instance of master device syncbased and required services.
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_IPADDR_PORT, SL_TMPDIR
+function main() {
+ local -r PREFIX="${SL_PREFIX:-sl/client}"
+ local -r DEVID="${SL_DEVID:-main}"
+ local -r NAME="${PREFIX}/${DEVID}"
+ local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8202}"
+ local -r TMPDIR="${SL_TMPDIR:-sltmp}/${NAME}"
+
+ mkdir -p "${TMPDIR}"
+ trap "kill_child_processes; exit 1" ERR EXIT
+ run_mounttabled "${NAME}" "${IPADDR_PORT}"
+ run_syncbased "/${IPADDR_PORT}" "${NAME}" "${TMPDIR}"
+ # Wait for signal.
+ while true; do
+ sleep 10
+ done
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/scripts/run_measured.sh b/go/src/v.io/x/sensorlog/scripts/run_measured.sh
new file mode 100755
index 0000000..f034099
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/run_measured.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Starts an instance of measured and required services.
+#
+# mounttabled is started locally at IPADDR:PORT, mounting itself to the global
+# mounttable at $GLOBAL_MT/users/$USER/$PREFIX/$DEVID. syncbased is started in
+# the local mounttable at $PREFIX/$DEVID/syncbased.
+# measured is started, creating a syncgroup published at the local syncbased as
+# reached through the global mounttable:
+# $GLOBAL_MT/users/$USER/$PREFIX/$DEVID/$PREFIX/$DEVID/syncbased
+# Once the syncgroup is joined, sync is configured to use either the global or
+# the local mounttable (unless the local mounttable IPADDR:PORT changes).
+# measured drops most permissions on prefixes in the syncgroup. Full admin
+# permissions are granted to $ADMIN (as a blessing extension of the same
+# default blessing as the one running this script).
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_ADMIN, SL_IPADDR_PORT, SL_USER, SL_GLOBAL_MT, SL_TMPDIR
+function main() {
+ local -r PREFIX="${SL_PREFIX:-sl/measured}"
+ local -r DEVID="${SL_DEVID:-$(gen_uuid)}"
+ local -r ADMIN="${SL_ADMIN:-sl/client}"
+ local -r NAME="${PREFIX}/${DEVID}"
+ local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8707}"
+ local -r USER="${SL_USER:-$(get_user_email)}"
+ local -r GLOBAL_MT="${SL_GLOBAL_MT:-/ns.dev.v.io:8101}"
+ local -r TMPDIR="${SL_TMPDIR:-sltmp}/${NAME}"
+
+ mkdir -p "${TMPDIR}"
+ trap "kill_child_processes; exit 1" ERR EXIT
+ run_mounttabled "${NAME}" "${IPADDR_PORT}" "${GLOBAL_MT}/users/${USER}"
+ run_syncbased "/${IPADDR_PORT}" "${NAME}" "${TMPDIR}"
+ run_measured "/${IPADDR_PORT}" "${NAME}" "${DEVID}" "${ADMIN}" \
+ "${GLOBAL_MT}/users/${USER}/${NAME}/${NAME}/syncbased" \
+ "${GLOBAL_MT}/users/${USER}"
+ # Wait for signal.
+ while true; do
+ sleep 10
+ done
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
new file mode 100644
index 0000000..c503c91
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
@@ -0,0 +1,131 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Functions for starting the Sensor Log daemon and required services with
+# appropriate blessings. Expected to be run with V23_CREDENTIALS or through
+# an agent. NAME parameters are used both in service names and in blessing
+# extensions.
+
+set -eu
+
+# Default to one week timeout.
+readonly TIMEOUT='168h'
+
+# Kills all child processes of the current process.
+function kill_child_processes() {
+ kill -TERM -- -"${BASHPID}" || true
+ sleep 1
+ kill -KILL -- -"${BASHPID}" || true
+}
+export -f kill_child_processes
+
+# Generates a hex-encoded 16-byte random UUID.
+function gen_uuid() {
+ head -c 256 /dev/urandom | sha256sum | cut -c 1-32
+}
+export -f gen_uuid
+
+readonly BLESSING_CHAIN_SEPARATOR=':'
+
+# Converts name to blessing extension.
+# name_to_blessing NAME
+function name_to_blessing() {
+ sed -e "s,/,${BLESSING_CHAIN_SEPARATOR},g" <<< "$@"
+}
+export -f name_to_blessing
+
+# Gets first default blessing for the principal set in the environment.
+function get_blessing_root() {
+ "${JIRI_ROOT}"/release/go/bin/principal dump -s | cut -d ' ' -f 1 | cut -d ',' -f 1
+}
+export -f get_blessing_root
+
+# Extracts email address from the blessing obtained by get_blessing_root.
+function get_user_email() {
+ get_blessing_root | tr "${BLESSING_CHAIN_SEPARATOR}" '\n' | grep '@' | head -n 1
+}
+export -f get_user_email
+
+# Starts mounttabled at IPADDR:PORT. If $GLOBAL_MOUNT is provided, the
+# mounttable mounts itself under $GLOBAL_MOUNT/$NAME.
+# run_mounttabled NAME IPADDR:PORT [GLOBAL_MOUNT]
+function run_mounttabled() {
+ local -r NAME="$1"
+ local -r IPADDR_PORT="$2"
+ local GLOBAL_MOUNT="${3:-}"
+ if [[ -n "${GLOBAL_MOUNT}" ]]; then
+ GLOBAL_MOUNT="${GLOBAL_MOUNT}/${NAME}"
+ fi
+ # TODO(ivanpi): Lock down mounttable permissions.
+ "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+ -name="$(name_to_blessing "${NAME}/mounttabled")" \
+ "${JIRI_ROOT}"/release/go/bin/mounttabled -v23.tcp.address "${IPADDR_PORT}" \
+ -name="${GLOBAL_MOUNT}" \
+ &
+ sleep 1
+}
+export -f run_mounttabled
+
+# Starts syncbased with permissions other than resolve restricted to
+# <blessing_root>:$NAME.
+# run_syncbased MT NAME TMPDIR
+function run_syncbased() {
+ local -r MT="$1"
+ local -r NAME="$2"
+ local -r TMPDIR="$3"
+ local -r DEF_BLESSING_RUNNER="$(name_to_blessing "$(get_blessing_root)/${NAME}")"
+ local -r PERMISSIONS_LITERAL="{\
+\"Admin\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Read\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Write\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Debug\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Resolve\":{\"In\":[\"...\"]} \
+}"
+ "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+ -name "$(name_to_blessing "${NAME}/syncbased")" \
+ "${JIRI_ROOT}"/release/go/bin/syncbased -v23.namespace.root "${MT}" -name "${NAME}/syncbased" \
+ -engine leveldb -root-dir "${TMPDIR}/${NAME}/syncbased" \
+ -v23.permissions.literal "${PERMISSIONS_LITERAL}" \
+ &
+ sleep 1
+}
+export -f run_syncbased
+
+# Starts measured that uses $PUBLISH_SB to publish the syncgroup. Expects a
+# syncbase instance to have been started at $MT with the same $NAME. If
+# $PUBLISH_MT is provided, the syncgroup is advertised there in addition to
+# the local mounttable.
+# run_measured MT NAME DEVID ADMIN PUBLISH_SB [PUBLISH_MT]
+function run_measured() {
+ local -r MT="$1"
+ local -r NAME="$2"
+ local -r DEVID="$3"
+ local -r ADMIN="$4"
+ local -r PUBLISH_SB="$5"
+ local -r PUBLISH_MT="${6:-}"
+ local -r DEF_BLESSING_ADMIN="$(name_to_blessing "$(get_blessing_root)/${ADMIN}")"
+ "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+ -name="$(name_to_blessing "${NAME}")" \
+ "${JIRI_ROOT}"/experimental/projects/sensorlog_lite/bin/measured -v23.namespace.root "${MT}" \
+ -service "${NAME}/syncbased" -devid="${DEVID}" -admin="${DEF_BLESSING_ADMIN}" \
+ -publish-sb "${PUBLISH_SB}" -publish-mt="${PUBLISH_MT}" \
+ -alsologtostderr \
+ &
+ sleep 1
+}
+export -f run_measured
+
+# Runs slcli against master Syncbase with specified $NAME.
+# run_slcli MT NAME [args...]
+function run_slcli() {
+ local -r MT="$1"
+ local -r NAME="$2"
+ shift 2
+ "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+ -name "$(name_to_blessing "${NAME}")" \
+ "${JIRI_ROOT}"/experimental/projects/sensorlog_lite/bin/slcli -v23.namespace.root "${MT}" \
+ -service "${NAME}/syncbased" "$@"
+}
+export -f run_slcli
diff --git a/go/src/v.io/x/sensorlog/scripts/slcli.sh b/go/src/v.io/x/sensorlog/scripts/slcli.sh
new file mode 100755
index 0000000..04fe39c
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/slcli.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Runs slcli against master device Syncbase.
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_IPADDR_PORT
+function main() {
+ local -r PREFIX="${SL_PREFIX:-sl/client}"
+ local -r DEVID="${SL_DEVID:-main}"
+ local -r NAME="${PREFIX}/${DEVID}"
+ local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8202}"
+ run_slcli "/${IPADDR_PORT}" "${NAME}" "$@"
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/slcli/device.go b/go/src/v.io/x/sensorlog/slcli/device.go
new file mode 100644
index 0000000..ee658f0
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/device.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli device configuration.
+
+package main
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/v23cmd"
+ "v.io/x/sensorlog_lite/internal/client"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+var cmdSLDevice = &cmdline.Command{
+ Name: "device",
+ Short: "Manage measuring devices",
+ Long: `
+Add measuring devices.
+
+TODO(ivanpi): List.
+`,
+ Children: []*cmdline.Command{cmdSLDeviceAdd /*, cmdSLDeviceList */},
+}
+
+var cmdSLDeviceAdd = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runSLDeviceAdd),
+ Name: "add",
+ Short: "Add new measuring device",
+ Long: `
+Adds a new measuring device and outputs its identifier.
+`,
+ ArgsName: "<publish_sb> <device_id> [<device_desc>]",
+ ArgsLong: `
+<publish_sb> is the rooted name of the Syncbase instance where the device
+ syncgroup is published.
+
+<device_id> is the identifier of the device to add.
+
+<device_desc> is a human-readable description of the device.
+ It doesn't need to be unique.
+`,
+}
+
+func runSLDeviceAdd(ctx *context.T, env *cmdline.Env, args []string) error {
+ if len(args) < 2 || len(args) > 3 {
+ return env.UsageErrorf("expects between 2 and 3 arguments")
+ }
+ sgPublishSb := args[0]
+ devId := args[1]
+ desc := ""
+ if len(args) > 2 {
+ desc = args[2]
+ }
+
+ db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed opening Syncbase db: %v", err)
+ }
+
+ devKey, err := client.AddDevice(ctx, db, devId, sgPublishSb, desc)
+ if err != nil {
+ return err
+ }
+
+ fmt.Fprintf(env.Stdout, "%s\n", devKey.DevId)
+
+ return nil
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/list.go b/go/src/v.io/x/sensorlog/slcli/list.go
new file mode 100644
index 0000000..3a87fa5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/list.go
@@ -0,0 +1,85 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli measured data listing support.
+
+package main
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/v23cmd"
+ "v.io/x/sensorlog_lite/internal/client"
+ "v.io/x/sensorlog_lite/internal/config"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var cmdSLList = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runSLList),
+ Name: "list",
+ Short: "List measured data",
+ Long: `
+Prints all measured data points for the given stream and device, sorted
+chronologically.
+`,
+ ArgsName: "<device_id> <stream_id>",
+ ArgsLong: `
+<device_id> and <stream_id> specify the data stream to print the data points
+from.
+`,
+}
+
+var (
+ flagFollow bool
+)
+
+func init() {
+ cmdSLList.Flags.BoolVar(&flagFollow, "follow", false, "Follow updates until killed, similar to 'tail -f'.")
+}
+
+// TODO(ivanpi): Add time interval querying and aggregation functions.
+func runSLList(ctx *context.T, env *cmdline.Env, args []string) error {
+ if len(args) != 2 {
+ return env.UsageErrorf("expects exactly 2 arguments")
+ }
+ streamKey := &sbmodel.KStreamDef{
+ DevId: args[0],
+ StreamId: args[1],
+ }
+
+ dataPrintCb := func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
+ return nil
+ }
+
+ db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed opening Syncbase db: %v", err)
+ }
+
+ if !flagFollow {
+ return client.ListStreamData(ctx, db, streamKey, dataPrintCb)
+ }
+
+ ctx, stop := context.WithCancel(ctx)
+
+ waitFollow := util.AsyncRun(func() error {
+ return client.FollowStreamData(ctx, db, streamKey, dataPrintCb)
+ }, func(err error) {
+ stop()
+ })
+
+ select {
+ case <-signals.ShutdownOnSignals(nil):
+ stop()
+ case <-ctx.Done():
+ }
+
+ return waitFollow()
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/slcli.go b/go/src/v.io/x/sensorlog/slcli/slcli.go
new file mode 100644
index 0000000..20c65f6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/slcli.go
@@ -0,0 +1,39 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli is the Sensor Log Lite command line configuration tool and client.
+// Must be run against master device Syncbase.
+// Supports configuring sampling streams by writing configuration to Syncbase,
+// which is then read by measured on the target device.
+// Also supports listing measured data.
+// TODO(ivanpi): Add data querying and graph plotting.
+package main
+
+import (
+ "flag"
+ "regexp"
+
+ "v.io/x/lib/cmdline"
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/sensorlog_lite/internal/config"
+)
+
+var (
+ flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.")
+)
+
+var cmdSensorLogLite = &cmdline.Command{
+ Name: "slcli",
+ Short: "Sensor Log Lite command line configuration tool and client",
+ Long: `
+Command line interface for Sensor Log Lite, used for listing data and
+manipulating configuration via a master device Syncbase.
+`,
+ Children: []*cmdline.Command{cmdSLDevice, cmdSLStream, cmdSLList},
+}
+
+func main() {
+ cmdline.HideGlobalFlagsExcept(regexp.MustCompile(`^((service)|(v23\.namespace\.root)|(v23\.credentials))$`))
+ cmdline.Main(cmdSensorLogLite)
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/stream.go b/go/src/v.io/x/sensorlog/slcli/stream.go
new file mode 100644
index 0000000..23c75bc
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/stream.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli stream configuration.
+
+package main
+
+import (
+ "fmt"
+ "io/ioutil"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/v23cmd"
+ "v.io/x/sensorlog_lite/internal/client"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+var cmdSLStream = &cmdline.Command{
+ Name: "stream",
+ Short: "Manage sampling streams",
+ Long: `
+Create sampling streams.
+
+TODO(ivanpi): Enable/disable, list.
+`,
+ Children: []*cmdline.Command{cmdSLStreamCreate /*, cmdSLStreamEnable, cmdSLStreamDisable, cmdSLStreamList*/},
+}
+
+var cmdSLStreamCreate = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runSLStreamCreate),
+ Name: "create",
+ Short: "Create new data stream",
+ Long: `
+Creates a new sampling stream and outputs its identifier.
+
+The sampling script is read from stdin. It must be a bash script that prints
+a single floating point result per invocation and exits with a zero status,
+otherwise an error is logged instead of the data point. The script will be
+run on the specified device at the specified frequency whenever the measuring
+daemon is running on the device, as long as the stream is enabled.
+`,
+ ArgsName: "<device_id> <stream_id> <interval> [<stream_desc>]",
+ ArgsLong: `
+<device_id> is the identifier of the device to use for sampling.
+
+<stream_id> is the identifier of the stream to be created. It must be unique
+ per measuring device.
+
+<interval> is the time interval between two subsequent invocations of the
+ measuring script. Measurements may be skipped if the interval is
+ too short compared to the device and sampling script speed.
+
+<stream_desc> is a human-readable description of the stream.
+ It doesn't need to be unique.
+`,
+}
+
+func runSLStreamCreate(ctx *context.T, env *cmdline.Env, args []string) error {
+ if len(args) < 3 || len(args) > 4 {
+ return env.UsageErrorf("expects between 3 and 4 arguments")
+ }
+ devKey := &sbmodel.KDeviceCfg{
+ DevId: args[0],
+ }
+ streamId := args[1]
+ interval, err := time.ParseDuration(args[2])
+ if err != nil {
+ return fmt.Errorf("failed parsing interval %q: %v", args[2], err)
+ }
+ desc := ""
+ if len(args) > 3 {
+ desc = args[3]
+ }
+
+ script, err := ioutil.ReadAll(env.Stdin)
+ if err != nil {
+ return err
+ }
+
+ db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed opening Syncbase db: %v", err)
+ }
+
+ streamKey, err := client.CreateStream(ctx, db, devKey, streamId, string(script), interval, desc)
+ if err != nil {
+ return err
+ }
+
+ fmt.Fprintf(env.Stdout, "%s\n", streamKey.StreamId)
+
+ return nil
+}