sensorlog_lite: Integrate measured components. End-to-end test.
Connected stream configuration watcher to measure runloop.
Added DataPoint to syncgroup prefixes.
Added ListStreamData client function.
End-to-end integration test with bidirectional sync (client adds device,
configures stream, waits, lists measured data).
Change-Id: I5b3d6033d5a810e28482d04866c87596bbe0aecd
diff --git a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
index 5e33dc6..98a9182 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
@@ -15,13 +15,12 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
- "v.io/v23/verror"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/generic"
sbtu "v.io/x/ref/services/syncbase/testutil"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
- "v.io/x/sensorlog_lite/internal/client"
+ slltu "v.io/x/sensorlog_lite/internal/client/testutil"
"v.io/x/sensorlog_lite/internal/measure"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbutil"
@@ -30,6 +29,11 @@
//go:generate jiri test generate
+const (
+ dummyScript = "echo 42;"
+ dummyInterval = "1s"
+)
+
func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
// Start a 'global' mounttable.
globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
@@ -60,7 +64,7 @@
sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root/client", publishSb, globalMT)
// Add measuring device to client, joining its syncgroup.
- sbtu.RunClient(t, clientCreds, runAddDevice, clientSb, devId, publishSb)
+ sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
// Allow time for syncgroup metadata sync.
time.Sleep(3 * time.Second)
@@ -77,7 +81,7 @@
streamIds := []string{"str1", "str2", "str3"}
// Create stream before starting measured watcher.
- sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[0])
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)
time.Sleep(1 * time.Second)
@@ -89,11 +93,11 @@
}
// Create more streams.
- sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[1])
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)
time.Sleep(1 * time.Second)
- sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[2])
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)
// Allow time for sync.
time.Sleep(3 * time.Second)
@@ -132,43 +136,6 @@
return nil
}, "runInitSyncgroup")
-var runAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
- sbService, devId, publishSb := args[0], args[1], args[2]
-
- ctx, cleanup := v23.Init()
- defer cleanup()
- db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
- if err != nil {
- return fmt.Errorf("failed initializing master database: %v", err)
- }
-
- if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
- return fmt.Errorf("AddDevice %s failed: %v", devId, err)
- }
- if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
- return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
- }
-
- return nil
-}, "runAddDevice")
-
-var runCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
- sbService, devId, streamId := args[0], args[1], args[2]
-
- ctx, cleanup := v23.Init()
- defer cleanup()
- db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
- if err != nil {
- return fmt.Errorf("failed opening master database: %v", err)
- }
-
- if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, ""); err != nil {
- return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
- }
-
- return nil
-}, "runCreateStream")
-
var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
sbService, devId, expectStreams := args[0], args[1], args[2:]
diff --git a/go/src/v.io/x/sensorlog/internal/client/doc.go b/go/src/v.io/x/sensorlog/internal/client/doc.go
index 3663147..2f09780 100644
--- a/go/src/v.io/x/sensorlog/internal/client/doc.go
+++ b/go/src/v.io/x/sensorlog/internal/client/doc.go
@@ -4,6 +4,5 @@
// Package client implements Sensor Log client methods, intended to run
// against the master device Syncbase. It supports adding measuring devices,
-// configuring streams, and querying measured data.
-// TODO(ivanpi): Implement querying measured data.
+// configuring streams, and listing measured data.
package client
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
new file mode 100644
index 0000000..ebf55ea
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -0,0 +1,58 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for listing stream data.
+
+package client
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/syncbase/nosql"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// ListStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each.
+// TODO(ivanpi): Allow specifying time interval.
+func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+ dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+ bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ return err
+ }
+ defer bdb.Abort(ctx)
+
+ streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+ if exists, err := streamRow.Exists(ctx); err != nil {
+ return err
+ } else if !exists {
+ return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+ }
+
+ sstr := bdb.Table(sbmodel.KDataPoint{}.Table()).Scan(ctx, nosql.Prefix(dataPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KDataPoint{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ var val sbmodel.VDataPoint
+ if err := sstr.Value(&val); err != nil {
+ return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+ }
+ if err := listCb(key, val); err != nil {
+ return err
+ }
+ }
+ return sstr.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
new file mode 100644
index 0000000..8558501
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+ "bytes"
+ "fmt"
+ "strconv"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/naming"
+ "v.io/x/ref"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test/modules"
+ "v.io/x/ref/test/v23tests"
+ "v.io/x/sensorlog_lite/internal/client"
+ slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+//go:generate jiri test generate
+
+func V23TestStreamConfigAndList(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)
+
+ clientSb := "sb/client"
+ clientCreds, _ := t.Shell().NewChildCredentials("client")
+ clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+ cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+ `{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ measuredSb := "sb/measured"
+ measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+ measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+ cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+ `{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+ defer cleanup()
+
+ time.Sleep(1 * time.Second)
+
+ // Start measured.
+ measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured")
+ devId := "measured1"
+ publishSb := naming.Join(mtName, measuredSb)
+ measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
+ measured := measuredBin.WithStartOpts(measuredOpts).Start(
+ "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client",
+ "-publish-sb="+publishSb)
+
+ time.Sleep(3 * time.Second)
+
+ // Add measuring device to client, joining its syncgroup.
+ sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+ // Create streams.
+ streamId1, result1, interval1 := "str1", "42", "2s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")
+
+ time.Sleep(1 * time.Second)
+
+ streamId2, result2, interval2 := "str2", "47", "1s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")
+
+ // streamId has another streamId as prefix to verify ListDataStreams prefix
+ // handling.
+ streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
+ sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+ streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")
+
+ // Allow time for sync and at least 3 measurements.
+ time.Sleep(10 * time.Second)
+
+ // Kill will gracefully stop measured.
+ if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
+ t.Fatalf("failed to kill measured: %v", err)
+ }
+ var stdout, stderr bytes.Buffer
+ if err := measured.Shutdown(&stdout, &stderr); err != nil {
+ t.Errorf("failed to shutdown measured: %v")
+ }
+ t.Logf("measured stdout:\n%s", stdout.String())
+ t.Logf("measured stderr:\n%s", stderr.String())
+
+ // Check that both streams have at least 3 measurements synced back to
+ // client device Syncbase.
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
+ sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
+}
+
+var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]
+
+ expectVal, err := strconv.ParseFloat(expectValStr, 64)
+ if err != nil {
+ return err
+ }
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+ if err != nil {
+ return fmt.Errorf("failed opening measured database: %v", err)
+ }
+
+ count := 0
+ streamKey := &sbmodel.KStreamDef{
+ DevId: devId,
+ StreamId: streamId,
+ }
+ if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ if got, want := key.StreamId, streamId; got != want {
+ return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
+ }
+ switch val := val.(type) {
+ case sbmodel.VDataPointValue:
+ if val.Value != expectVal {
+ return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
+ }
+ default:
+ return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
+ }
+ count++
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ if count < 3 {
+ return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
+ }
+
+ return nil
+}, "runListStreamData")
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index adef452..74a9fb5 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -8,11 +8,14 @@
import (
"fmt"
+ "strings"
+ "time"
"v.io/v23/context"
nosql_wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/syncbase/nosql"
"v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/config"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
)
@@ -20,11 +23,17 @@
// CreateStream writes the configuration for a new sampling stream to Syncbase.
// The target device must have been configured. streamId must be unique for
// the target device.
-// TODO(ivanpi): Add actual sampling script.
-func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId, desc string) (*sbmodel.KStreamDef, error) {
+// TODO(ivanpi): Make start time configurable.
+func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
if err := keyutil.ValidateId(streamId); err != nil {
return nil, fmt.Errorf("invalid streamId: %v", err)
}
+ if strings.TrimSpace(script) == "" {
+ return nil, fmt.Errorf("sampling script cannot be empty")
+ }
+ if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
+ return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
+ }
stmKey := sbmodel.KStreamDef{
DevId: devKey.DevId,
StreamId: streamId,
@@ -33,7 +42,12 @@
desc = "stream:" + stmKey.Key()
}
stmVal := sbmodel.VStreamDef{
- Desc: desc,
+ Desc: desc,
+ Sampler: sbmodel.SamplerDef{
+ Script: script,
+ Start: time.Now(),
+ Interval: interval,
+ },
Enabled: true,
}
diff --git a/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
new file mode 100644
index 0000000..80b6f5e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
@@ -0,0 +1,71 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package testutil implements testing modules for common client operations.
+package testutil
+
+import (
+ "fmt"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/verror"
+ "v.io/x/ref/test/modules"
+ "v.io/x/sensorlog_lite/internal/client"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// Required parameters: sbService, devId, publishSb
+var RunAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, publishSb := args[0], args[1], args[2]
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed initializing master database: %v", err)
+ }
+
+ if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
+ return fmt.Errorf("AddDevice %s failed: %v", devId, err)
+ }
+ if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+ return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
+ }
+
+ return nil
+}, "runAddDevice")
+
+// Required parameters: sbService, devId, streamId, script, interval
+var RunCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
+ sbService, devId, streamId, script, intervalStr := args[0], args[1], args[2], args[3], args[4]
+
+ interval, err := time.ParseDuration(intervalStr)
+ if err != nil {
+ return err
+ }
+
+ ctx, cleanup := v23.Init()
+ defer cleanup()
+ db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+ if err != nil {
+ return fmt.Errorf("failed opening master database: %v", err)
+ }
+
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, "", interval, ""); err == nil {
+ return fmt.Errorf("CreateStream %s with empty script should have failed", streamId)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, 42*time.Nanosecond, ""); err == nil {
+ return fmt.Errorf("CreateStream %s with tiny duration should have failed", streamId)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); err != nil {
+ return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
+ }
+ if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+ return fmt.Errorf("repeat CreateStream %s should have failed with ErrExist, got: %v", streamId, err)
+ }
+
+ return nil
+}, "runCreateStream")
diff --git a/go/src/v.io/x/sensorlog/internal/client/v23_test.go b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
index ee78101..4e4a1e3 100644
--- a/go/src/v.io/x/sensorlog/internal/client/v23_test.go
+++ b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
@@ -28,3 +28,7 @@
func TestV23DeviceAddAndStreamCreate(t *testing.T) {
v23tests.RunTest(t, V23TestDeviceAddAndStreamCreate)
}
+
+func TestV23StreamConfigAndList(t *testing.T) {
+ v23tests.RunTest(t, V23TestStreamConfigAndList)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 7451931..8050794 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -19,7 +19,12 @@
SyncPriority = 42
+ // Delay between SIGINT and SIGKILL when stopping measuring script.
ScriptKillDelay = 100 * time.Millisecond
+
+ // Both Syncbase write and sampling script slowness impose this limit;
+ // sampling with mocked out script and Syncbase write supports 200 µs.
+ MinSamplingInterval = 10 * time.Millisecond
)
func SyncgroupName(publishService, devId string) string {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 8f58bb0..1aaa9a9 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -63,6 +63,17 @@
sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
prefixSpec[prefixStreamDef] = aclStreamDef
+ // DataPoint : <devId>
+ // Admin client has full permissions, measured drops to read/write.
+ prefixDataPoint := nosql_wire.SyncgroupPrefix{
+ TableName: sbmodel.KDataPoint{}.Table(),
+ RowPrefix: devId,
+ }
+ aclDataPoint := access.Permissions{}
+ sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
+ sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+ prefixSpec[prefixDataPoint] = aclDataPoint
+
var prefixes []nosql_wire.SyncgroupPrefix
// Apply prefix ACLs to all syncgroup prefixes.
for prefix, prefixAcl := range prefixSpec {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 0975be5..6f7897e 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -14,6 +14,7 @@
"v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
"v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
)
// RegisterWorker is called with the key and value of a new or modified
@@ -28,7 +29,7 @@
// an error is encountered.
func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
tableName := sbmodel.KStreamDef{}.Table()
- watchPrefix := devId
+ watchPrefix := keyutil.Join(devId, "")
var resMark watch.ResumeMarker
// BeginBatch scoped using function with deferred Abort.
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index 4c369c6..b8e3bf7 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -19,6 +19,7 @@
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/sensorlog_lite/internal/config"
"v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/measure/runloop"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbutil"
"v.io/x/sensorlog_lite/internal/util"
@@ -72,9 +73,22 @@
ctx, stop := context.WithCancel(ctx)
+ ml := runloop.NewLoop(func(err error) {
+ vlog.Errorf("Measure loop failed: %v", err)
+ stop()
+ })
+
+ writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ return db.Table(key.Table()).Put(ctx, key.Key(), val)
+ }
+
waitWatch := util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
- ctx.Infof("watch: %s, %v", key.Key(), &val)
+ if val.Enabled {
+ ml.Register(ctx, key, &val.Sampler, writer)
+ } else {
+ ml.Unregister(key)
+ }
return nil
})
}, func(err error) {
@@ -82,7 +96,10 @@
stop()
})
- defer waitWatch()
+ defer func() {
+ _ = waitWatch()
+ ml.WaitAll()
+ }()
select {
case <-signals.ShutdownOnSignals(nil):