sensorlog_lite: Integrate measured components. End-to-end test.

Connected stream configuration watcher to measure runloop.
Added DataPoint to syncgroup prefixes.
Added ListStreamData client function.
End-to-end integration test with bidirectional sync (client adds device,
configures stream, waits, lists measured data).

Change-Id: I5b3d6033d5a810e28482d04866c87596bbe0aecd
diff --git a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
index 5e33dc6..98a9182 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
@@ -15,13 +15,12 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/naming"
-	"v.io/v23/verror"
 	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/generic"
 	sbtu "v.io/x/ref/services/syncbase/testutil"
 	"v.io/x/ref/test/modules"
 	"v.io/x/ref/test/v23tests"
-	"v.io/x/sensorlog_lite/internal/client"
+	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
 	"v.io/x/sensorlog_lite/internal/measure"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbutil"
@@ -30,6 +29,11 @@
 
 //go:generate jiri test generate
 
+const (
+	dummyScript   = "echo 42;"
+	dummyInterval = "1s"
+)
+
 func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
 	// Start a 'global' mounttable.
 	globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
@@ -60,7 +64,7 @@
 	sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root/client", publishSb, globalMT)
 
 	// Add measuring device to client, joining its syncgroup.
-	sbtu.RunClient(t, clientCreds, runAddDevice, clientSb, devId, publishSb)
+	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
 
 	// Allow time for syncgroup metadata sync.
 	time.Sleep(3 * time.Second)
@@ -77,7 +81,7 @@
 	streamIds := []string{"str1", "str2", "str3"}
 
 	// Create stream before starting measured watcher.
-	sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[0])
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)
 
 	time.Sleep(1 * time.Second)
 
@@ -89,11 +93,11 @@
 	}
 
 	// Create more streams.
-	sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[1])
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)
 
 	time.Sleep(1 * time.Second)
 
-	sbtu.RunClient(t, clientCreds, runCreateStream, clientSb, devId, streamIds[2])
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)
 
 	// Allow time for sync.
 	time.Sleep(3 * time.Second)
@@ -132,43 +136,6 @@
 	return nil
 }, "runInitSyncgroup")
 
-var runAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
-	sbService, devId, publishSb := args[0], args[1], args[2]
-
-	ctx, cleanup := v23.Init()
-	defer cleanup()
-	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
-	if err != nil {
-		return fmt.Errorf("failed initializing master database: %v", err)
-	}
-
-	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
-		return fmt.Errorf("AddDevice %s failed: %v", devId, err)
-	}
-	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
-		return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
-	}
-
-	return nil
-}, "runAddDevice")
-
-var runCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
-	sbService, devId, streamId := args[0], args[1], args[2]
-
-	ctx, cleanup := v23.Init()
-	defer cleanup()
-	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
-	if err != nil {
-		return fmt.Errorf("failed opening master database: %v", err)
-	}
-
-	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, ""); err != nil {
-		return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
-	}
-
-	return nil
-}, "runCreateStream")
-
 var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
 	sbService, devId, expectStreams := args[0], args[1], args[2:]
 
diff --git a/go/src/v.io/x/sensorlog/internal/client/doc.go b/go/src/v.io/x/sensorlog/internal/client/doc.go
index 3663147..2f09780 100644
--- a/go/src/v.io/x/sensorlog/internal/client/doc.go
+++ b/go/src/v.io/x/sensorlog/internal/client/doc.go
@@ -4,6 +4,5 @@
 
 // Package client implements Sensor Log client methods, intended to run
 // against the master device Syncbase. It supports adding measuring devices,
-// configuring streams, and querying measured data.
-// TODO(ivanpi): Implement querying measured data.
+// configuring streams, and listing measured data.
 package client
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
new file mode 100644
index 0000000..ebf55ea
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -0,0 +1,58 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for listing stream data.
+
+package client
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// ListStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each.
+// TODO(ivanpi): Allow specifying time interval.
+func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+	if err != nil {
+		return err
+	}
+	defer bdb.Abort(ctx)
+
+	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+	if exists, err := streamRow.Exists(ctx); err != nil {
+		return err
+	} else if !exists {
+		return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+	}
+
+	sstr := bdb.Table(sbmodel.KDataPoint{}.Table()).Scan(ctx, nosql.Prefix(dataPrefix))
+	defer sstr.Cancel()
+
+	for sstr.Advance() {
+		key := &sbmodel.KDataPoint{}
+		if err := key.Parse(sstr.Key()); err != nil {
+			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+		}
+		var val sbmodel.VDataPoint
+		if err := sstr.Value(&val); err != nil {
+			return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+		}
+		if err := listCb(key, val); err != nil {
+			return err
+		}
+	}
+	return sstr.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
new file mode 100644
index 0000000..8558501
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+	"bytes"
+	"fmt"
+	"strconv"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/naming"
+	"v.io/x/ref"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/ref/test/modules"
+	"v.io/x/ref/test/v23tests"
+	"v.io/x/sensorlog_lite/internal/client"
+	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+//go:generate jiri test generate
+
+func V23TestStreamConfigAndList(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)
+
+	clientSb := "sb/client"
+	clientCreds, _ := t.Shell().NewChildCredentials("client")
+	clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+	cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+		`{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	measuredSb := "sb/measured"
+	measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+	measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+	cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+		`{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	time.Sleep(1 * time.Second)
+
+	// Start measured.
+	measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured")
+	devId := "measured1"
+	publishSb := naming.Join(mtName, measuredSb)
+	measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
+	measured := measuredBin.WithStartOpts(measuredOpts).Start(
+		"-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client",
+		"-publish-sb="+publishSb)
+
+	time.Sleep(3 * time.Second)
+
+	// Add measuring device to client, joining its syncgroup.
+	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+	// Create streams.
+	streamId1, result1, interval1 := "str1", "42", "2s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")
+
+	time.Sleep(1 * time.Second)
+
+	streamId2, result2, interval2 := "str2", "47", "1s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")
+
+	// streamId has another streamId as prefix to verify ListDataStreams prefix
+	// handling.
+	streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")
+
+	// Allow time for sync and at least 3 measurements.
+	time.Sleep(10 * time.Second)
+
+	// Kill will gracefully stop measured.
+	if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
+		t.Fatalf("failed to kill measured: %v", err)
+	}
+	var stdout, stderr bytes.Buffer
+	if err := measured.Shutdown(&stdout, &stderr); err != nil {
+		t.Errorf("failed to shutdown measured: %v")
+	}
+	t.Logf("measured stdout:\n%s", stdout.String())
+	t.Logf("measured stderr:\n%s", stderr.String())
+
+	// Check that both streams have at least 3 measurements synced back to
+	// client device Syncbase.
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
+}
+
+var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]
+
+	expectVal, err := strconv.ParseFloat(expectValStr, 64)
+	if err != nil {
+		return err
+	}
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+	if err != nil {
+		return fmt.Errorf("failed opening measured database: %v", err)
+	}
+
+	count := 0
+	streamKey := &sbmodel.KStreamDef{
+		DevId:    devId,
+		StreamId: streamId,
+	}
+	if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		if got, want := key.StreamId, streamId; got != want {
+			return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
+		}
+		switch val := val.(type) {
+		case sbmodel.VDataPointValue:
+			if val.Value != expectVal {
+				return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
+			}
+		default:
+			return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
+		}
+		count++
+		return nil
+	}); err != nil {
+		return err
+	}
+
+	if count < 3 {
+		return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
+	}
+
+	return nil
+}, "runListStreamData")
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index adef452..74a9fb5 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -8,11 +8,14 @@
 
 import (
 	"fmt"
+	"strings"
+	"time"
 
 	"v.io/v23/context"
 	nosql_wire "v.io/v23/services/syncbase/nosql"
 	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/config"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
 )
@@ -20,11 +23,17 @@
 // CreateStream writes the configuration for a new sampling stream to Syncbase.
 // The target device must have been configured. streamId must be unique for
 // the target device.
-// TODO(ivanpi): Add actual sampling script.
-func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId, desc string) (*sbmodel.KStreamDef, error) {
+// TODO(ivanpi): Make start time configurable.
+func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
 	if err := keyutil.ValidateId(streamId); err != nil {
 		return nil, fmt.Errorf("invalid streamId: %v", err)
 	}
+	if strings.TrimSpace(script) == "" {
+		return nil, fmt.Errorf("sampling script cannot be empty")
+	}
+	if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
+		return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
+	}
 	stmKey := sbmodel.KStreamDef{
 		DevId:    devKey.DevId,
 		StreamId: streamId,
@@ -33,7 +42,12 @@
 		desc = "stream:" + stmKey.Key()
 	}
 	stmVal := sbmodel.VStreamDef{
-		Desc:    desc,
+		Desc: desc,
+		Sampler: sbmodel.SamplerDef{
+			Script:   script,
+			Start:    time.Now(),
+			Interval: interval,
+		},
 		Enabled: true,
 	}
 
diff --git a/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
new file mode 100644
index 0000000..80b6f5e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
@@ -0,0 +1,71 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package testutil implements testing modules for common client operations.
+package testutil
+
+import (
+	"fmt"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/verror"
+	"v.io/x/ref/test/modules"
+	"v.io/x/sensorlog_lite/internal/client"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// Required parameters: sbService, devId, publishSb
+var RunAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, publishSb := args[0], args[1], args[2]
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed initializing master database: %v", err)
+	}
+
+	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
+		return fmt.Errorf("AddDevice %s failed: %v", devId, err)
+	}
+	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+		return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
+	}
+
+	return nil
+}, "runAddDevice")
+
+// Required parameters: sbService, devId, streamId, script, interval
+var RunCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, streamId, script, intervalStr := args[0], args[1], args[2], args[3], args[4]
+
+	interval, err := time.ParseDuration(intervalStr)
+	if err != nil {
+		return err
+	}
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed opening master database: %v", err)
+	}
+
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, "", interval, ""); err == nil {
+		return fmt.Errorf("CreateStream %s with empty script should have failed", streamId)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, 42*time.Nanosecond, ""); err == nil {
+		return fmt.Errorf("CreateStream %s with tiny duration should have failed", streamId)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); err != nil {
+		return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+		return fmt.Errorf("repeat CreateStream %s should have failed with ErrExist, got: %v", streamId, err)
+	}
+
+	return nil
+}, "runCreateStream")
diff --git a/go/src/v.io/x/sensorlog/internal/client/v23_test.go b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
index ee78101..4e4a1e3 100644
--- a/go/src/v.io/x/sensorlog/internal/client/v23_test.go
+++ b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
@@ -28,3 +28,7 @@
 func TestV23DeviceAddAndStreamCreate(t *testing.T) {
 	v23tests.RunTest(t, V23TestDeviceAddAndStreamCreate)
 }
+
+func TestV23StreamConfigAndList(t *testing.T) {
+	v23tests.RunTest(t, V23TestStreamConfigAndList)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 7451931..8050794 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -19,7 +19,12 @@
 
 	SyncPriority = 42
 
+	// Delay between SIGINT and SIGKILL when stopping measuring script.
 	ScriptKillDelay = 100 * time.Millisecond
+
+	// Both Syncbase write and sampling script slowness impose this limit;
+	// sampling with mocked out script and Syncbase write supports 200 µs.
+	MinSamplingInterval = 10 * time.Millisecond
 )
 
 func SyncgroupName(publishService, devId string) string {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 8f58bb0..1aaa9a9 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -63,6 +63,17 @@
 	sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
 	prefixSpec[prefixStreamDef] = aclStreamDef
 
+	// DataPoint : <devId>
+	// Admin client has full permissions, measured drops to read/write.
+	prefixDataPoint := nosql_wire.SyncgroupPrefix{
+		TableName: sbmodel.KDataPoint{}.Table(),
+		RowPrefix: devId,
+	}
+	aclDataPoint := access.Permissions{}
+	sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
+	sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+	prefixSpec[prefixDataPoint] = aclDataPoint
+
 	var prefixes []nosql_wire.SyncgroupPrefix
 	// Apply prefix ACLs to all syncgroup prefixes.
 	for prefix, prefixAcl := range prefixSpec {
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 0975be5..6f7897e 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -14,6 +14,7 @@
 	"v.io/v23/services/watch"
 	"v.io/v23/syncbase/nosql"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
 )
 
 // RegisterWorker is called with the key and value of a new or modified
@@ -28,7 +29,7 @@
 // an error is encountered.
 func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
 	tableName := sbmodel.KStreamDef{}.Table()
-	watchPrefix := devId
+	watchPrefix := keyutil.Join(devId, "")
 	var resMark watch.ResumeMarker
 
 	// BeginBatch scoped using function with deferred Abort.
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index 4c369c6..b8e3bf7 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -19,6 +19,7 @@
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/sensorlog_lite/internal/config"
 	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/measure/runloop"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbutil"
 	"v.io/x/sensorlog_lite/internal/util"
@@ -72,9 +73,22 @@
 
 	ctx, stop := context.WithCancel(ctx)
 
+	ml := runloop.NewLoop(func(err error) {
+		vlog.Errorf("Measure loop failed: %v", err)
+		stop()
+	})
+
+	writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		return db.Table(key.Table()).Put(ctx, key.Key(), val)
+	}
+
 	waitWatch := util.AsyncRun(func() error {
 		return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
-			ctx.Infof("watch: %s, %v", key.Key(), &val)
+			if val.Enabled {
+				ml.Register(ctx, key, &val.Sampler, writer)
+			} else {
+				ml.Unregister(key)
+			}
 			return nil
 		})
 	}, func(err error) {
@@ -82,7 +96,10 @@
 		stop()
 	})
 
-	defer waitWatch()
+	defer func() {
+		_ = waitWatch()
+		ml.WaitAll()
+	}()
 
 	select {
 	case <-signals.ShutdownOnSignals(nil):