Merge branch 'master' of /tmp/experimental into ivanpi-sensorlog-merge
diff --git a/go/src/v.io/x/sensorlog/Makefile b/go/src/v.io/x/sensorlog/Makefile
new file mode 100644
index 0000000..d880055
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/Makefile
@@ -0,0 +1,30 @@
+PATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite/bin:$(JIRI_ROOT)/release/go/bin:$(PATH)
+SHELL := /bin/bash -euo pipefail
+export GOPATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite:$(GOPATH)
+export VDLPATH := $(JIRI_ROOT)/experimental/projects/sensorlog_lite/src:$(VDLPATH)
+
+.DELETE_ON_ERROR:
+
+.PHONY: all
+all: measured slcli
+
+.PHONY: measured
+measured:
+	jiri go install v.io/x/sensorlog_lite/measured
+
+.PHONY: slcli
+slcli:
+	jiri go install v.io/x/sensorlog_lite/slcli
+
+.PHONY: run-deps
+run-deps:
+	jiri go install v.io/x/ref/services/mounttable/mounttabled
+	jiri go install v.io/x/ref/services/syncbase/syncbased
+	jiri go install v.io/x/ref/services/agent/vbecome
+	jiri go install v.io/x/ref/cmd/principal
+
+.PHONY: test
+test:
+	jiri go test v.io/x/sensorlog_lite/...
+	# TODO(ivanpi): Add jiri command to run integration tests.
+	jiri go test v.io/x/sensorlog_lite/internal/client -run=TestV23* -v23.tests
diff --git a/go/src/v.io/x/sensorlog/README.md b/go/src/v.io/x/sensorlog/README.md
new file mode 100644
index 0000000..734d822
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/README.md
@@ -0,0 +1,88 @@
+# Sensor Log Lite
+
+Sensor Log Lite is an example Syncbase application for measuring streams of
+time series data on a group of devices.
+
+A Sensor Log Lite system consists of a master and any number of measuring
+devices. The master device runs Syncbase and the command line client
+(`slcli`), while each measuring device runs Syncbase and the measuring
+daemon (`measured`). The client controls the measuring daemon through
+Syncbase.
+
+# Measuring data
+
+Each measuring device runs an instance of `measured`, which samples data
+points for one or more measuring streams. A stream is an ordered sequence
+of data points that are sampled by running a shell script at a set
+frequency. The `slcli` tool can be used to write the sampling configuration
+to the master Syncbase instance, which is then synced to the appropriate
+measuring device Syncbase. Measured data points are synced back to the
+master Syncbase, where they can be examined using `slcli`.
+
+### Starting `measured`
+
+In the instructions below, replace `<creds>` with the path to
+Vanadium credentials obtained using `principal seekblessings`.
+
+`measured`, along with the required `mounttabled` and `syncbased` services,
+can be started by:
+
+    $ V23_CREDENTIALS=<creds> SL_DEVID=dev1 ./scripts/run_measured.sh
+
+By default, it starts a local mounttable at port 8707 (override using
+`$SL_IPADDR_PORT`), mounts it at
+`/ns.dev.v.io:8101/users/<email-from-blessing>/sl/measured/<devid>`,
+starts a Syncbase instance mounted in the local mounttable at
+`sl/measured/<devid>`, and a measuring daemon against the Syncbase instance.
+
+### Using the client
+
+Before using the client, the master `syncbased` must be started by:
+
+    $ V23_CREDENTIALS=<creds> ./scripts/run_cli_syncbased.sh
+
+By default, it starts a local mounttable at port 8202 (override using
+`$SL_IPADDR_PORT`) and starts a Syncbase instance mounted in it at
+`sl/client/main` (override using `$SL_DEVID`).
+
+Running
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh <args>
+
+will invoke the `slcli` tool with blessings and flags set appropriately
+for a master stack run with the same environment.
+
+### Adding a device
+
+Each measuring device creates a syncgroup which can be joined by a master
+device. Using the above default configuration, run:
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh device add /ns.dev.v.io:8101/users/<email-from-blessing>/sl/measured/dev1/sl/measured/dev1/syncbased dev1
+
+### Creating a stream
+
+Once a device has been added, streams can be configured on it to start
+sampling data. The sampling script is expected to output a single floating
+point value and exit with a zero status code on every invocation, otherwise
+an error is logged instead of data.
+
+For example, to sample the Answer to Life, the Universe, and Everything
+every two seconds:
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh stream create dev1 stream42 2s <<< "echo 42;"
+
+### Listing data
+
+To list all data sampled on the stream, run
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list dev1 stream42
+
+To keep listing newly measured data until `slcli` is killed, add `-follow`:
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list -follow dev1 stream42
+
+### Debugging
+
+The internal state of the master Syncbase can be examined using `sb`:
+
+    $ V23_CREDENTIALS=<creds> ${JIRI_ROOT}/release/go/bin/vbecome -name sl:client:main ${JIRI_ROOT}/release/go/bin/sb -service /$(dig $(hostname) +short):8202/sl/client/main/syncbased sh sensorlog_lite sldb
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
new file mode 100644
index 0000000..ee266e3
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for measuring device configuration.
+
+package client
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// AddDevice joins the syncgroup of the measuring device identified by devId,
+// expected to be published at sgPublishSb, and makes it available for stream
+// configuration.
+func AddDevice(ctx *context.T, db nosql.Database, devId, sgPublishSb, desc string) (*sbmodel.KDeviceCfg, error) {
+	if err := keyutil.ValidateId(devId); err != nil {
+		return nil, fmt.Errorf("invalid devId: %v", err)
+	}
+	devKey := sbmodel.KDeviceCfg{
+		DevId: devId,
+	}
+	if desc == "" {
+		desc = "device:" + devKey.Key()
+	}
+	devVal := sbmodel.VDeviceCfg{
+		Desc:        desc,
+		SgPublishSb: sgPublishSb,
+	}
+	devSgName := config.SyncgroupName(sgPublishSb, devKey.Key())
+	devRow := db.Table(devKey.Table()).Row(devKey.Key())
+
+	if exists, err := devRow.Exists(ctx); err != nil {
+		return nil, err
+	} else if exists {
+		return nil, verror.New(verror.ErrExist, ctx, "Device '"+devKey.Key()+"' was already added")
+	}
+
+	// TODO(ivanpi): Lack of atomicity here can result in a duplicate join, not
+	// really relevant until syncgroup Leave is implemented.
+	sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+	if _, err := db.Syncgroup(devSgName).Join(ctx, sgMemberInfo); err != nil {
+		return nil, err
+	}
+
+	return &devKey, devRow.Put(ctx, &devVal)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
new file mode 100644
index 0000000..98a9182
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/device_v23_test.go
@@ -0,0 +1,187 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/x/ref/lib/signals"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/ref/test/modules"
+	"v.io/x/ref/test/v23tests"
+	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+//go:generate jiri test generate
+
+const (
+	dummyScript   = "echo 42;"
+	dummyInterval = "1s"
+)
+
+func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
+	// Start a 'global' mounttable.
+	globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
+	// Mount the local mounttable in the global one.
+	localMT := naming.Join(globalMT, "localmt")
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0", "--name="+localMT)
+
+	clientSb := "sb/client"
+	clientCreds, _ := t.Shell().NewChildCredentials("client")
+	clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+	cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+		`{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	measuredSb := "sb/measured"
+	measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+	measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+	cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+		`{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	time.Sleep(1 * time.Second)
+
+	// Initialize measuring device syncgroup using the global mounttable alias
+	// of measured Syncbase for publishing.
+	devId := "measured1"
+	publishSb := naming.Join(localMT, measuredSb)
+	sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root/client", publishSb, globalMT)
+
+	// Add measuring device to client, joining its syncgroup.
+	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+	// Allow time for syncgroup metadata sync.
+	time.Sleep(3 * time.Second)
+
+	// After syncgroup has been joined, sync can use local mounttable. Kill the
+	// global one.
+	if err := globalMTHandle.Kill(syscall.SIGINT); err != nil {
+		t.Fatalf("failed to kill global mounttable: %v", err)
+	}
+	if err := globalMTHandle.Shutdown(nil, nil); err != nil {
+		t.Fatalf("failed to shutdown global mounttable: %v", err)
+	}
+
+	streamIds := []string{"str1", "str2", "str3"}
+
+	// Create stream before starting measured watcher.
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)
+
+	time.Sleep(1 * time.Second)
+
+	measureWatcher, err := t.Shell().StartWithOpts(
+		t.Shell().DefaultStartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 60*time.Second), nil,
+		runWatchForStreams, append([]string{measuredSb, devId}, streamIds...)...)
+	if err != nil {
+		t.Fatalf("failed to start measureWatcher: %v", err)
+	}
+
+	// Create more streams.
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)
+
+	time.Sleep(1 * time.Second)
+
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)
+
+	// Allow time for sync.
+	time.Sleep(3 * time.Second)
+
+	// Kill will stop the watcher and cause it to verify that seen streams match
+	// expectations.
+	if err := syscall.Kill(measureWatcher.Pid(), syscall.SIGINT); err != nil {
+		t.Fatalf("failed to kill measureWatcher: %v", err)
+	}
+	var stdout, stderr bytes.Buffer
+	if err := measureWatcher.Shutdown(&stdout, &stderr); err != nil {
+		t.Errorf("failed to shutdown measureWatcher: %v")
+	}
+	t.Logf("measureWatcher stdout:\n%s", stdout.String())
+	t.Logf("measureWatcher stderr:\n%s", stderr.String())
+}
+
+var runInitSyncgroup = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, admin, publishSb, sgMT := args[0], args[1], args[2], args[3], args[4]
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+	if err != nil {
+		return fmt.Errorf("failed initializing measured database: %v", err)
+	}
+
+	sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
+	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
+		return fmt.Errorf("failed initializing measured syncgroup: %v", err)
+	}
+	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
+		return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
+	}
+
+	return nil
+}, "runInitSyncgroup")
+
+var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, expectStreams := args[0], args[1], args[2:]
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+	if err != nil {
+		return fmt.Errorf("failed opening measured database: %v", err)
+	}
+
+	gotStreams := make([]string, 0, len(expectStreams))
+
+	ctx, stop := context.WithCancel(ctx)
+	wait := util.AsyncRun(func() error {
+		return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+			if got, want := key.DevId, devId; got != want {
+				return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+			}
+			gotStreams = append(gotStreams, key.StreamId)
+			return nil
+		})
+	}, func(err error) {
+		stop()
+	})
+
+	// Wait for kill, then stop watcher.
+	<-signals.ShutdownOnSignals(nil)
+	stop()
+
+	if err := wait(); err != nil {
+		return err
+	}
+
+	// Compare streams seen by watcher to expectations.
+	sort.Strings(gotStreams)
+	sort.Strings(expectStreams)
+	if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+		return fmt.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+	}
+
+	return nil
+}, "runWatchForStreams")
+
+func startAdditionalMT(t *v23tests.T, args ...string) (string, *v23tests.Invocation) {
+	bin := t.BuildV23Pkg("v.io/x/ref/services/mounttable/mounttabled")
+	inv := bin.Start(args...)
+	return inv.ExpectVar("NAME"), inv
+}
diff --git a/go/src/v.io/x/sensorlog/internal/client/doc.go b/go/src/v.io/x/sensorlog/internal/client/doc.go
new file mode 100644
index 0000000..2f09780
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/doc.go
@@ -0,0 +1,8 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package client implements Sensor Log client methods, intended to run
+// against the master device Syncbase. It supports adding measuring devices,
+// configuring streams, and listing measured data.
+package client
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
new file mode 100644
index 0000000..e2aedb0
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for listing stream data.
+
+package client
+
+import (
+	"fmt"
+	"sort"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/services/watch"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type ListCallback func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// ListStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each.
+// TODO(ivanpi): Allow specifying time interval.
+func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+	_, err := listStreamData(ctx, db, streamKey, listCb)
+	return err
+}
+
+// FollowStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each. It keeps listing new data
+// points until ctx is cancelled.
+// TODO(ivanpi): Allow specifying time interval.
+func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+	tableName := sbmodel.KDataPoint{}.Table()
+	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+	resMark, err := listStreamData(ctx, db, streamKey, listCb)
+	if err != nil {
+		return err
+	}
+
+	// Watch for new DataPoints.
+	ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+	if err != nil {
+		return err
+	}
+	defer ws.Cancel()
+
+	trans := make([]*dataPoint, 0, 16)
+	for ws.Advance() {
+		c := ws.Change()
+		var elem dataPoint
+		if err := elem.Key.Parse(c.Row); err != nil {
+			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+		}
+		switch c.ChangeType {
+		case nosql.PutChange:
+			if err := c.Value(&elem.Val); err != nil {
+				return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
+			}
+			// In the absence of overlapping syncgroups or partial ACLs, the order of
+			// writes is preserved by sync. However, several writes may be grouped
+			// into a single batch, and the order is not preserved within a batch.
+			// Each batch is manually sorted before being emitted to the callback.
+			trans = append(trans, &elem)
+			if !c.Continued {
+				sort.Stable(dataPointSort(trans))
+				for _, elem := range trans {
+					if err := listCb(&elem.Key, elem.Val); err != nil {
+						return err
+					}
+				}
+				trans = trans[:0]
+			}
+		case nosql.DeleteChange:
+			// no-op
+		}
+	}
+	return ws.Err()
+}
+
+// listStreamData implements listing (scanning over) existing stream data. It
+// also returns the resume marker to allow watching for future data.
+func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
+	var resMark watch.ResumeMarker
+	tableName := sbmodel.KDataPoint{}.Table()
+	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+	if err != nil {
+		return resMark, err
+	}
+	defer bdb.Abort(ctx)
+
+	resMark, err = bdb.GetResumeMarker(ctx)
+	if err != nil {
+		return resMark, err
+	}
+
+	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+	if exists, err := streamRow.Exists(ctx); err != nil {
+		return resMark, err
+	} else if !exists {
+		return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+	}
+
+	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+	defer sstr.Cancel()
+
+	for sstr.Advance() {
+		key := &sbmodel.KDataPoint{}
+		if err := key.Parse(sstr.Key()); err != nil {
+			return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+		}
+		var val sbmodel.VDataPoint
+		if err := sstr.Value(&val); err != nil {
+			return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+		}
+		if err := listCb(key, val); err != nil {
+			return resMark, err
+		}
+	}
+	return resMark, sstr.Err()
+}
+
+type dataPoint struct {
+	Key sbmodel.KDataPoint
+	Val sbmodel.VDataPoint
+}
+
+// dataPointSort implements sorting a dataPoint slice by timestamp.
+type dataPointSort []*dataPoint
+
+func (s dataPointSort) Len() int           { return len(s) }
+func (s dataPointSort) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
diff --git a/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
new file mode 100644
index 0000000..8558501
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/measured_v23_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package client_test
+
+import (
+	"bytes"
+	"fmt"
+	"strconv"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/naming"
+	"v.io/x/ref"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/ref/test/modules"
+	"v.io/x/ref/test/v23tests"
+	"v.io/x/sensorlog_lite/internal/client"
+	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+//go:generate jiri test generate
+
+func V23TestStreamConfigAndList(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)
+
+	clientSb := "sb/client"
+	clientCreds, _ := t.Shell().NewChildCredentials("client")
+	clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
+	cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
+		`{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	measuredSb := "sb/measured"
+	measuredCreds, _ := t.Shell().NewChildCredentials("measured")
+	measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
+	cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
+		`{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
+	defer cleanup()
+
+	time.Sleep(1 * time.Second)
+
+	// Start measured.
+	measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured")
+	devId := "measured1"
+	publishSb := naming.Join(mtName, measuredSb)
+	measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
+	measured := measuredBin.WithStartOpts(measuredOpts).Start(
+		"-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client",
+		"-publish-sb="+publishSb)
+
+	time.Sleep(3 * time.Second)
+
+	// Add measuring device to client, joining its syncgroup.
+	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
+
+	// Create streams.
+	streamId1, result1, interval1 := "str1", "42", "2s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")
+
+	time.Sleep(1 * time.Second)
+
+	streamId2, result2, interval2 := "str2", "47", "1s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")
+
+	// streamId has another streamId as prefix to verify ListDataStreams prefix
+	// handling.
+	streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
+	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
+		streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")
+
+	// Allow time for sync and at least 3 measurements.
+	time.Sleep(10 * time.Second)
+
+	// Kill will gracefully stop measured.
+	if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
+		t.Fatalf("failed to kill measured: %v", err)
+	}
+	var stdout, stderr bytes.Buffer
+	if err := measured.Shutdown(&stdout, &stderr); err != nil {
+		t.Errorf("failed to shutdown measured: %v")
+	}
+	t.Logf("measured stdout:\n%s", stdout.String())
+	t.Logf("measured stderr:\n%s", stderr.String())
+
+	// Check that both streams have at least 3 measurements synced back to
+	// client device Syncbase.
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
+	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
+}
+
+var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]
+
+	expectVal, err := strconv.ParseFloat(expectValStr, 64)
+	if err != nil {
+		return err
+	}
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
+	if err != nil {
+		return fmt.Errorf("failed opening measured database: %v", err)
+	}
+
+	count := 0
+	streamKey := &sbmodel.KStreamDef{
+		DevId:    devId,
+		StreamId: streamId,
+	}
+	if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		if got, want := key.StreamId, streamId; got != want {
+			return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
+		}
+		switch val := val.(type) {
+		case sbmodel.VDataPointValue:
+			if val.Value != expectVal {
+				return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
+			}
+		default:
+			return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
+		}
+		count++
+		return nil
+	}); err != nil {
+		return err
+	}
+
+	if count < 3 {
+		return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
+	}
+
+	return nil
+}, "runListStreamData")
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
new file mode 100644
index 0000000..74a9fb5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -0,0 +1,78 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Client methods for stream configuration.
+
+package client
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// CreateStream writes the configuration for a new sampling stream to Syncbase.
+// The target device must have been configured. streamId must be unique for
+// the target device.
+// TODO(ivanpi): Make start time configurable.
+func CreateStream(ctx *context.T, db nosql.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
+	if err := keyutil.ValidateId(streamId); err != nil {
+		return nil, fmt.Errorf("invalid streamId: %v", err)
+	}
+	if strings.TrimSpace(script) == "" {
+		return nil, fmt.Errorf("sampling script cannot be empty")
+	}
+	if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
+		return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
+	}
+	stmKey := sbmodel.KStreamDef{
+		DevId:    devKey.DevId,
+		StreamId: streamId,
+	}
+	if desc == "" {
+		desc = "stream:" + stmKey.Key()
+	}
+	stmVal := sbmodel.VStreamDef{
+		Desc: desc,
+		Sampler: sbmodel.SamplerDef{
+			Script:   script,
+			Start:    time.Now(),
+			Interval: interval,
+		},
+		Enabled: true,
+	}
+
+	if err := nosql.RunInBatch(ctx, db, nosql_wire.BatchOptions{}, func(db nosql.BatchDatabase) error {
+		devRow := db.Table(devKey.Table()).Row(devKey.Key())
+		stmRow := db.Table(stmKey.Table()).Row(stmKey.Key())
+
+		if exists, err := devRow.Exists(ctx); err != nil {
+			return err
+		} else if !exists {
+			return verror.New(verror.ErrNoExist, ctx, "Device '"+devKey.Key()+"' does not exist")
+		}
+
+		if exists, err := stmRow.Exists(ctx); err != nil {
+			return err
+		} else if exists {
+			return verror.New(verror.ErrExist, ctx, "Stream '"+stmKey.Key()+"' already exists")
+		}
+
+		return stmRow.Put(ctx, stmVal)
+	}); err != nil {
+		return nil, err
+	}
+
+	return &stmKey, nil
+}
+
+// TODO(ivanpi): Implement pausing and resuming streams.
diff --git a/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
new file mode 100644
index 0000000..80b6f5e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/testutil/testutil.go
@@ -0,0 +1,71 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package testutil implements testing modules for common client operations.
+package testutil
+
+import (
+	"fmt"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/verror"
+	"v.io/x/ref/test/modules"
+	"v.io/x/sensorlog_lite/internal/client"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// Required parameters: sbService, devId, publishSb
+var RunAddDevice = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, publishSb := args[0], args[1], args[2]
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed initializing master database: %v", err)
+	}
+
+	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); err != nil {
+		return fmt.Errorf("AddDevice %s failed: %v", devId, err)
+	}
+	if _, err := client.AddDevice(ctx, db, devId, publishSb, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+		return fmt.Errorf("repeat AddDevice %s should have failed with ErrExist, got: %v", devId, err)
+	}
+
+	return nil
+}, "runAddDevice")
+
+// Required parameters: sbService, devId, streamId, script, interval
+var RunCreateStream = modules.Register(func(env *modules.Env, args ...string) error {
+	sbService, devId, streamId, script, intervalStr := args[0], args[1], args[2], args[3], args[4]
+
+	interval, err := time.ParseDuration(intervalStr)
+	if err != nil {
+		return err
+	}
+
+	ctx, cleanup := v23.Init()
+	defer cleanup()
+	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed opening master database: %v", err)
+	}
+
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, "", interval, ""); err == nil {
+		return fmt.Errorf("CreateStream %s with empty script should have failed", streamId)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, 42*time.Nanosecond, ""); err == nil {
+		return fmt.Errorf("CreateStream %s with tiny duration should have failed", streamId)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); err != nil {
+		return fmt.Errorf("CreateStream %s failed: %v", streamId, err)
+	}
+	if _, err := client.CreateStream(ctx, db, &sbmodel.KDeviceCfg{DevId: devId}, streamId, script, interval, ""); verror.ErrorID(err) != verror.ErrExist.ID {
+		return fmt.Errorf("repeat CreateStream %s should have failed with ErrExist, got: %v", streamId, err)
+	}
+
+	return nil
+}, "runCreateStream")
diff --git a/go/src/v.io/x/sensorlog/internal/client/v23_test.go b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
new file mode 100644
index 0000000..d826a88
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/client/v23_test.go
@@ -0,0 +1,32 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+
+package client_test
+
+import (
+	"os"
+	"testing"
+
+	"v.io/x/ref/test/modules"
+	"v.io/x/ref/test/v23tests"
+)
+
+func TestMain(m *testing.M) {
+	modules.DispatchAndExitIfChild()
+	cleanup := v23tests.UseSharedBinDir()
+	r := m.Run()
+	cleanup()
+	os.Exit(r)
+}
+
+func TestV23DeviceAddAndStreamCreate(t *testing.T) {
+	v23tests.RunTest(t, V23TestDeviceAddAndStreamCreate)
+}
+
+func TestV23StreamConfigAndList(t *testing.T) {
+	v23tests.RunTest(t, V23TestStreamConfigAndList)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
new file mode 100644
index 0000000..a948ae2
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Sensor Log configuration constants and default flag values.
+package config
+
+import (
+	"time"
+
+	"v.io/v23/naming"
+)
+
+const (
+	DefaultSbService = "syncbase"
+
+	AppName = "sensorlog_lite"
+	DBName  = "sldb"
+
+	SyncPriority = 42
+
+	TimeOutputFormat = "2006-01-02 15:04:05.000"
+
+	// Delay between SIGINT and SIGKILL when stopping measuring script.
+	ScriptKillDelay = 100 * time.Millisecond
+
+	// Both Syncbase write and sampling script slowness impose this limit;
+	// sampling with mocked out script and Syncbase write supports 200 µs.
+	MinSamplingInterval = 10 * time.Millisecond
+)
+
+func SyncgroupName(publishService, devId string) string {
+	return naming.Join(publishService, "%%sync", "sllite", "dev", devId)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/doc.go b/go/src/v.io/x/sensorlog/internal/measure/doc.go
new file mode 100644
index 0000000..c7e18f3
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/doc.go
@@ -0,0 +1,8 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package measure implements Sensor Log measured methods, intended to run
+// against the measured device Syncbase. It supports configuring the device
+// syncgroup and watching for stream configuration changes.
+package measure
diff --git a/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go
new file mode 100644
index 0000000..8f854a2
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop.go
@@ -0,0 +1,157 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package runloop implements an array of data measuring workers, one per
+// stream, periodically running the configured sampling scripts.
+package runloop
+
+import (
+	"time"
+
+	"v.io/v23/context"
+	"v.io/x/sensorlog_lite/internal/measure/sampler"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+type DataCallback func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error
+
+// Loop is a pool of data measuring workers, at most one per stream.
+// NOTE: Loop is not thread-safe. Register and WaitAll should not be called
+// concurrently.
+type Loop interface {
+	// Register starts a measuring worker for the stream specified by streamKey
+	// and samDef, cancelling and replacing any existing worker for the same
+	// stream.
+	// The worker executes the sampling script every integer number of Intervals
+	// starting from Start, calling cb with the measured VDataPoint.
+	// The sampling script and the callback are guaranteed to each have at most
+	// one instance running in parallel.
+	// The new worker will not start until any existing worker for the same
+	// stream has exited.
+	Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback)
+
+	// Unregister cancels any existing measuring worker for the stream specified
+	// by streamKey.
+	Unregister(streamKey *sbmodel.KStreamDef)
+
+	// WaitAll waits for all currently running measuring workers to exit.
+	WaitAll()
+}
+
+type loopImpl struct {
+	tasks   map[string]measureTask
+	runner  sampleRunner
+	fatalCb util.ErrorCb
+}
+
+// NewLoop returns a new measuring worker pool.
+func NewLoop(fatalCb util.ErrorCb) Loop {
+	return newLoopWithRunner(fatalCb, sampler.Run)
+}
+
+// sampleRunner should run the sampling script once, producing a single
+// VDataPoint and passing it to cb.
+type sampleRunner func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error
+
+// Constructor for testing with mock sampleRunner.
+func newLoopWithRunner(fatalCb util.ErrorCb, runner sampleRunner) Loop {
+	return &loopImpl{
+		tasks:   make(map[string]measureTask, 16),
+		runner:  runner,
+		fatalCb: fatalCb,
+	}
+}
+
+// Register implements Loop.Register.
+func (ml *loopImpl) Register(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) {
+	ctx, cancel := context.WithCancel(ctx)
+
+	oldTask, hasOldTask := ml.tasks[streamKey.Key()]
+	if hasOldTask {
+		// Cancel existing worker, if any.
+		oldTask.cancel()
+	}
+
+	// Start new worker that first waits for the existing worker to exit, then
+	// starts measuring.
+	waitLoop := util.AsyncRun(func() error {
+		if hasOldTask {
+			// Wait for existing worker, if any, to exit.
+			_ = oldTask.wait()
+		}
+
+		return ml.measureWorker(ctx, streamKey, samDef, dataCb)
+	}, ml.fatalCb)
+
+	// Register new worker handle.
+	ml.tasks[streamKey.Key()] = measureTask{
+		cancel: cancel,
+		wait:   waitLoop,
+	}
+}
+
+// getDelay computes the delay from now to samDef.Start or, if it has passed,
+// the next multiple of samDef.Interval since samDef.Start.
+func getDelay(samDef *sbmodel.SamplerDef) time.Duration {
+	now := time.Now()
+	if now.Before(samDef.Start) {
+		// Start time has not yet passed, wait until start time.
+		return samDef.Start.Sub(now)
+	} else {
+		// Start time has passed, wait until next multiple of interval since the
+		// start time.
+		diff := now.Sub(samDef.Start).Nanoseconds()
+		intv := samDef.Interval.Nanoseconds()
+		return time.Duration(intv - (diff % intv))
+	}
+}
+
+// measureWorker runs the measuring script on every samDef.Interval, starting
+// from samDef.Start, and passes the result to dataCb. Measure intervals are
+// skipped if the measuring script or dataCb is still running. The worker exits
+// when ctx is cancelled or the measuring script or dataCb return an error.
+func (ml *loopImpl) measureWorker(ctx *context.T, streamKey *sbmodel.KStreamDef, samDef *sbmodel.SamplerDef, dataCb DataCallback) error {
+	for {
+		delay := getDelay(samDef)
+		// Wait for delay. If not cancelled while waiting, proceed to measure.
+		select {
+		case <-ctx.Done():
+			return nil
+		case <-time.After(delay):
+		}
+
+		// Run the measuring script, passing the result to dataCb.
+		if err := ml.runner(ctx, samDef, func(res *sampler.MeasureResult) error {
+			return dataCb(ctx, &sbmodel.KDataPoint{
+				DevId:     streamKey.DevId,
+				StreamId:  streamKey.StreamId,
+				Timestamp: res.Time,
+			}, res.Data)
+		}); err != nil {
+			return err
+		}
+	}
+}
+
+// Unregister implements Loop.Unregister.
+func (ml *loopImpl) Unregister(streamKey *sbmodel.KStreamDef) {
+	if task, hasTask := ml.tasks[streamKey.Key()]; hasTask {
+		task.cancel()
+	}
+	// task is left in ml.tasks to allow WaitAll or a new worker to wait on it.
+}
+
+// WaitAll implements Loop.WaitAll.
+func (ml *loopImpl) WaitAll() {
+	for _, t := range ml.tasks {
+		_ = t.wait()
+	}
+}
+
+// Worker handles for cancellation purposes.
+type measureTask struct {
+	cancel func()
+	wait   func() error
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go
new file mode 100644
index 0000000..4c56b6a
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/runloop/runloop_test.go
@@ -0,0 +1,341 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runloop
+
+import (
+	"fmt"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	tu "v.io/x/ref/test/testutil"
+	"v.io/x/sensorlog_lite/internal/measure/sampler"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+// Time delay quantum. It is assumed that several goroutines can reliably
+// execute trivial code and yield within this time.
+// TODO(ivanpi): Replace with logical clock that counts active goroutines.
+const qdelay = 100 * time.Millisecond
+
+// delayWriter implements a DataCallback that waits for a configurable delay
+// before writing the received VDataError. It also allows asserting that the
+// expected data has been written.
+type delayWriter struct {
+	delay  time.Duration
+	mu     sync.Mutex
+	output string
+}
+
+// write sleeps for delay and, if not cancelled, writes the VDataError.
+// Sleep is not interrupted by cancellation.
+func (dw *delayWriter) write(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+	time.Sleep(dw.delay)
+	select {
+	case <-ctx.Done():
+		return nil
+	default:
+	}
+	dw.mu.Lock()
+	defer dw.mu.Unlock()
+	if msg, ok := val.(sbmodel.VDataPointError); ok {
+		dw.output += string(msg.Value)
+	} else {
+		return fmt.Errorf("writer expected VDataError, got: %v", val)
+	}
+	return nil
+}
+
+// assert asserts that the written VDataErrors, concatenated, equal expected.
+func (dw *delayWriter) assert(t *testing.T, expected string) {
+	dw.mu.Lock()
+	defer dw.mu.Unlock()
+	if got, want := dw.output, expected; got != want {
+		t.Error(tu.FormatLogLine(2, "unexpected output: got %q, want %q", got, want))
+	}
+}
+
+// newEchoDelaySampler returns a sampleRunner that expects mock scripts of the
+// form '<delay>|<output>'. The sampleRunner sleeps for <delay> and, if not
+// cancelled, calls cb with the VDataError of the form '<output>@<time>;',
+// where <time> is the time when the sampleRunner was started, expressed as the
+// number of whole qdelay intervals since start.
+func newEchoDelaySampler(start time.Time) sampleRunner {
+	return func(ctx *context.T, samDef *sbmodel.SamplerDef, cb sampler.MeasureCallback) error {
+		timestamp := time.Now()
+		parts := strings.SplitN(samDef.Script, "|", 2)
+		delay, err := time.ParseDuration(parts[0])
+		if err != nil {
+			panic(err)
+		}
+		select {
+		case <-ctx.Done():
+			return nil
+		case <-time.After(delay):
+		}
+		timeDelta := timestamp.Sub(start).Nanoseconds() / time.Duration(qdelay).Nanoseconds()
+		result := fmt.Sprintf("%s@%d;", parts[1], timeDelta)
+		return cb(&sampler.MeasureResult{
+			Time: timestamp,
+			Data: sbmodel.VDataPointError{Value: result},
+		})
+	}
+}
+
+// newFatalErrorCb returns an ErrorCb that fails the test and cancels the
+// context on any error.
+func newFatalErrorCb(t *testing.T, ctxCancel func()) util.ErrorCb {
+	return func(err error) {
+		t.Errorf("measuring fatal error: %v", err)
+		ctxCancel()
+	}
+}
+
+// streamKey returns a test KStreamCfg for the streamId.
+func streamKey(streamId string) *sbmodel.KStreamDef {
+	return &sbmodel.KStreamDef{
+		DevId:    "test",
+		StreamId: streamId,
+	}
+}
+
+// samplerDef returns a test SamplerDef with a mock script as expected by an
+// echoDelaySampler.
+func samplerDef(start time.Time, intv time.Duration, out string, delay time.Duration) *sbmodel.SamplerDef {
+	return &sbmodel.SamplerDef{
+		Script:   fmt.Sprintf("%v|%s", delay, out),
+		Start:    start,
+		Interval: intv,
+	}
+}
+
+// scheduler allows scheduling functions with delays and waiting for all
+// scheduled functions to finish (using wg). Cancel ctx to cancel pending
+// functions. Functions are run under mutex (not concurrently).
+type scheduler struct {
+	ctx *context.T
+	wg  sync.WaitGroup
+	mu  sync.Mutex
+}
+
+// schedule runs the specified function after the specified delay.
+func (sc *scheduler) schedule(delay time.Duration, run func()) {
+	sc.wg.Add(1)
+	go func() {
+		defer sc.wg.Done()
+		select {
+		case <-time.After(delay):
+			sc.mu.Lock()
+			defer sc.mu.Unlock()
+			run()
+		case <-sc.ctx.Done():
+		}
+	}()
+}
+
+// Tests that samplers run with correct time offsets until unregistered.
+func TestLoopUnregister(t *testing.T) {
+	ctx, ctxCancel := context.RootContext()
+	defer ctxCancel()
+	sc := scheduler{ctx: ctx}
+	start := time.Now()
+	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+	// Start at -3, measuring every 2, active 0 to 10 -> 1, 3, 5, 7, 9
+	dwPast := &delayWriter{delay: 0}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
+	})
+	sc.schedule(10*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+	// Start at 3, measuring every 2, active 0 to 6 -> 3, 5
+	dwFuture := &delayWriter{delay: 0}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(3*qdelay), 2*qdelay, "47", 0), dwFuture.write)
+	})
+	sc.schedule(6*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+	sc.wg.Wait()
+	ml.WaitAll()
+
+	dwPast.assert(t, "42@1;42@3;42@5;42@7;42@9;") // time 1, 3, 5, 7, 9
+	dwFuture.assert(t, "47@3;47@5;")              // time 3, 5
+}
+
+// Tests that samplers run with correct time offsets until the context is
+// cancelled.
+func TestLoopCancel(t *testing.T) {
+	ctx, ctxCancel := context.RootContext()
+	defer ctxCancel()
+	sc := scheduler{ctx: ctx}
+	start := time.Now()
+	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+	// Start at -3, measuring every 2, active 2 to 8 (cancel) -> 3, 5, 7
+	dwPast := &delayWriter{delay: 0}
+	sc.schedule(2*qdelay, func() {
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(-3*qdelay), 2*qdelay, "42", 0), dwPast.write)
+	})
+
+	// Start at 1, measuring every 2, active 0 to 8 (cancel) -> 1, 3, 5, 7
+	dwFuture := &delayWriter{delay: 0}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 0), dwFuture.write)
+	})
+
+	sc.schedule(8*qdelay, ctxCancel)
+
+	sc.wg.Wait()
+	ml.WaitAll()
+
+	dwPast.assert(t, "42@3;42@5;42@7;")        // time 3, 5, 7
+	dwFuture.assert(t, "47@1;47@3;47@5;47@7;") // time 1, 3, 5, 7
+}
+
+// Tests that no new measurement for a stream is started until the previous
+// measurement has been written, skipping intervals if necessary. No data
+// points should be written after the sampler is unregistered.
+func TestLoopSlow(t *testing.T) {
+	ctx, ctxCancel := context.RootContext()
+	defer ctxCancel()
+	sc := scheduler{ctx: ctx}
+	start := time.Now()
+	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+	// Start at 1, measuring every 3, active 0 to 14, sample takes 3, write takes 1:
+	// 1 -> finish at 5
+	// 4 skipped
+	// 7 -> finish at 11
+	// 10 skipped
+	// 13 -> would finish at 17, cancelled during sample
+	dwSlowSample := &delayWriter{delay: 1 * qdelay}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 3*qdelay), dwSlowSample.write)
+	})
+	sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+	// Start at 1, measuring every 3, active 0 to 18, sample takes 1, write takes 3:
+	// 1 -> finish at 5
+	// 4 skipped
+	// 7 -> finish at 11
+	// 10 skipped
+	// 13 -> finish at 17
+	// 16 -> would finish at 20, cancelled during write
+	dwSlowWrite := &delayWriter{delay: 3 * qdelay}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 3*qdelay, "47", 1*qdelay), dwSlowWrite.write)
+	})
+	sc.schedule(18*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+	sc.wg.Wait()
+	ml.WaitAll()
+
+	dwSlowSample.assert(t, "42@1;42@7;")      // time 1, 7
+	dwSlowWrite.assert(t, "47@1;47@7;47@13;") // time 1, 7, 13
+}
+
+// Tests that re-registering an already registered stream cancels and waits for
+// the existing workers to stop before starting updated ones.
+func TestLoopReregister(t *testing.T) {
+	ctx, ctxCancel := context.RootContext()
+	defer ctxCancel()
+	sc := scheduler{ctx: ctx}
+	start := time.Now()
+	ml := newLoopWithRunner(newFatalErrorCb(t, ctxCancel), newEchoDelaySampler(start))
+
+	// 42: Start at 1, measuring every 2, active 0 to 6 -> 1, 3, 5
+	// 84: Start at 9, measuring every 3, active 6 to 14 -> 9, 12
+	dwNoDelay := &delayWriter{delay: 0}
+	sc.schedule(0*qdelay, func() {
+		ml.Unregister(streamKey("s1"))
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 2*qdelay, "42", 0), dwNoDelay.write)
+	})
+	sc.schedule(6*qdelay, func() {
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(9*qdelay), 3*qdelay, "84", 0), dwNoDelay.write)
+	})
+	sc.schedule(14*qdelay, func() { ml.Unregister(streamKey("s1")) })
+
+	// 1 -> sample until 6
+	// 3 skipped
+	// 5 skipped
+	// 7 -> sample would take until 12, cancelled
+	// re-Register at 8, effective at 8 (sample wait is interruptible)
+	// 9 -> sample until 13
+	// 12 skipped
+	// 15 -> sample until 19
+	// 18 skipped
+	dwSlowSample := &delayWriter{delay: 0}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(1*qdelay), 2*qdelay, "47", 5*qdelay), dwSlowSample.write)
+	})
+	sc.schedule(8*qdelay, func() {
+		ml.Unregister(streamKey("s2"))
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 3*qdelay, "94", 4*qdelay), dwSlowSample.write)
+	})
+	sc.schedule(20*qdelay, func() { ml.Unregister(streamKey("s2")) })
+
+	// 1 -> write until 6
+	// 4 skipped
+	// 7 -> write would take until 12, cancelled
+	// re-Register starts at 8, effective at 12 (write wait is non-interruptible)
+	// 13 -> write until 18
+	// 15 skipped
+	// 18 -> would write until 23, cancelled
+	// 20 skipped
+	dwSlowWrite := &delayWriter{delay: 5 * qdelay}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(1*qdelay), 3*qdelay, "foo", 0), dwSlowWrite.write)
+	})
+	sc.schedule(8*qdelay, func() {
+		ml.Register(ctx, streamKey("s3"), samplerDef(start.Add(-1*qdelay), 2*qdelay, "bar", 0), dwSlowWrite.write)
+	})
+	sc.schedule(21*qdelay, func() { ml.Unregister(streamKey("s3")) })
+
+	sc.wg.Wait()
+	ml.WaitAll()
+
+	dwNoDelay.assert(t, "42@1;42@3;42@5;"+"84@9;84@12;") // 42: time 1, 3, 5; 84: time 9, 12
+	dwSlowSample.assert(t, "47@1;"+"94@9;94@15;")        // 47: time 1; 94: time 9, 15
+	dwSlowWrite.assert(t, "foo@1;"+"bar@13;")            // foo: time 1; bar: time 13
+}
+
+// Tests that returning an error from the writer calls the fatal eror callback.
+func TestWriteError(t *testing.T) {
+	ctx, ctxCancel := context.RootContext()
+	defer ctxCancel()
+	sc := scheduler{ctx: ctx}
+	start := time.Now()
+	errSentinel := fmt.Errorf("errSentinel")
+	ml := newLoopWithRunner(func(err error) {
+		if got, want := err, errSentinel; got != want {
+			t.Errorf("%v unexpected fatal error: got %v, want %v", time.Now().Sub(start).Nanoseconds()/time.Duration(qdelay).Nanoseconds(), got, want)
+		}
+		ctxCancel()
+	}, newEchoDelaySampler(start))
+
+	// 1 -> write start at 1, finish at 3
+	// 4 -> write start at 4, finish at 6
+	// 7 -> write start at 7, cancelled at 8 (error)
+	dwNoDelay := &delayWriter{delay: 2 * qdelay}
+	sc.schedule(0*qdelay, func() {
+		ml.Register(ctx, streamKey("s1"), samplerDef(start.Add(1*qdelay), 3*qdelay, "42", 0), dwNoDelay.write)
+	})
+
+	// error: Start at 6, measure delay 2 -> write error at 8
+	sc.schedule(2*qdelay, func() {
+		ml.Register(ctx, streamKey("s2"), samplerDef(start.Add(6*qdelay), 4*qdelay, "47", 2*qdelay),
+			func(_ *context.T, _ *sbmodel.KDataPoint, _ sbmodel.VDataPoint) error {
+				return errSentinel
+			})
+	})
+
+	sc.wg.Wait()
+	ml.WaitAll()
+
+	dwNoDelay.assert(t, "42@1;42@4;") // 42: time 1, 4
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
new file mode 100644
index 0000000..ee54ec1
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sampler implements an asynchronous worker for a single run of the
+// sampling script.
+package sampler
+
+import (
+	"bytes"
+	"fmt"
+	"os/exec"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+type MeasureCallback func(result *MeasureResult) error
+
+type MeasureResult struct {
+	Time time.Time
+	Data sbmodel.VDataPoint
+}
+
+// Run synchronously runs the sampler script, producing a single data point or
+// error message and passing it to the callback (also run synchronously). Fatal
+// errors (including callback errors) are returned from Run.
+// The script is terminated using SIGINT (followed by SIGKILL) to the process
+// group if the context is cancelled. The callback will be called at most once
+// per Run call (may not be called if cancelled or on fatal error).
+func Run(ctx *context.T, samDef *sbmodel.SamplerDef, cb MeasureCallback) error {
+	var out bytes.Buffer
+	// Run sampling script using bash and capture output.
+	sh := exec.Command("setsid", "bash")
+	sh.Stdin = bytes.NewBufferString(samDef.Script)
+	sh.Stdout = &out
+	// TODO(ivanpi): Fail on any stderr output?
+	// TODO(ivanpi): Use end instead of begin timestamp?
+	timestamp := time.Now()
+	// Failure to start the script is a fatal error.
+	if err := sh.Start(); err != nil {
+		return fmt.Errorf("failed starting measuring script: %v", err)
+	}
+	waiter := make(chan error, 1)
+	// Start script asynchronously.
+	go func() {
+		waiter <- sh.Wait()
+	}()
+	// Wait for script exit or cancellation.
+	// TODO(ivanpi): Add script timeout.
+	select {
+	case <-ctx.Done():
+		// If cancelled, send SIGINT to script process group.
+		syscall.Kill(-sh.Process.Pid, syscall.SIGINT)
+		// Wait for script to exit. After a delay, if not exited, send SIGKILL
+		// and wait.
+		select {
+		case <-waiter:
+		case <-time.After(config.ScriptKillDelay):
+			syscall.Kill(-sh.Process.Pid, syscall.SIGKILL)
+			<-waiter
+		}
+		return nil
+	case err := <-waiter:
+		// If script has exited, parse data point or get error.
+		if err != nil {
+			return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Script error: %v", err)}})
+		}
+		measured, err := strconv.ParseFloat(strings.TrimSpace(out.String()), 64)
+		if err != nil {
+			return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointError{Value: fmt.Sprintf("Parse error: %v", err)}})
+		}
+		return cb(&MeasureResult{Time: timestamp, Data: sbmodel.VDataPointValue{Value: measured}})
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
new file mode 100644
index 0000000..04466c4
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/sampler/sampler_test.go
@@ -0,0 +1,146 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sampler_test
+
+import (
+	"strings"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	tu "v.io/x/ref/test/testutil"
+	"v.io/x/sensorlog_lite/internal/measure/sampler"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// Runs the sampler script and verifies that the callback was called with the
+// expected value or error.
+func runSamplerTest(t *testing.T, samDef *sbmodel.SamplerDef) sbmodel.VDataPoint {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	var gotRes *sampler.MeasureResult
+	start := time.Now()
+	if err := sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+		gotRes = res
+		return nil
+	}); err != nil {
+		t.Fatal(tu.FormatLogLine(3, "unexpected sampling error: %v", err))
+	}
+	end := time.Now()
+
+	if gotRes == nil {
+		t.Fatal(tu.FormatLogLine(3, "sampler callback was not called"))
+	}
+
+	if gotRes.Time.Before(start) || gotRes.Time.After(end) {
+		t.Error(tu.FormatLogLine(3, "invalid time: %v, expected between %v and %v", gotRes.Time, start, end))
+	}
+
+	return gotRes.Data
+}
+
+func runSamplerTestForValue(t *testing.T, samDef *sbmodel.SamplerDef, expectVal float64) {
+	res := runSamplerTest(t, samDef)
+	switch res := res.(type) {
+	case sbmodel.VDataPointValue:
+		if res.Value != expectVal {
+			t.Error(tu.FormatLogLine(2, "unexpected value, got: %v, want: %v", res.Value, expectVal))
+		}
+	default:
+		t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want value: %v", res, expectVal))
+	}
+}
+
+func runSamplerTestForError(t *testing.T, samDef *sbmodel.SamplerDef, expectErrPrefix string) {
+	res := runSamplerTest(t, samDef)
+	switch res := res.(type) {
+	case sbmodel.VDataPointError:
+		if !strings.HasPrefix(res.Value, expectErrPrefix) {
+			t.Error(tu.FormatLogLine(2, "unexpected error, got: %v, want prefix: %v", res.Value, expectErrPrefix))
+		}
+	default:
+		t.Error(tu.FormatLogLine(2, "unexpected type, got: %v, want error with prefix: %v", res, expectErrPrefix))
+	}
+}
+
+func TestSamplerRun(t *testing.T) {
+	runSamplerTestForValue(t, &sbmodel.SamplerDef{
+		Script:   `echo 42`,
+		Start:    time.Now(),
+		Interval: time.Second, // 7.5M years overflows time.Duration
+	}, 42.0)
+
+	runSamplerTestForValue(t, &sbmodel.SamplerDef{
+		Script: `
+for i in $(seq 10000); do :; done
+printf "%f\n" -12.73
+exit 0
+`,
+		Start:    time.Now(),
+		Interval: time.Millisecond,
+	}, -12.73)
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42; exit 1`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Script error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `exit 0`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42foo`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+
+	runSamplerTestForError(t, &sbmodel.SamplerDef{
+		Script:   `echo 42; echo 47`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}, "Parse error")
+}
+
+func TestSamplerCancel(t *testing.T) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	// The script sleeps for 10 seconds, not interruptible by SIGINT/SIGTERM.
+	samDef := &sbmodel.SamplerDef{
+		Script: `
+trap 'sleep 10' INT TERM
+sleep 10
+echo 42
+`,
+		Start:    time.Now(),
+		Interval: time.Second,
+	}
+	// Asynchronously run sampler, cancelling shortly after.
+	done := make(chan error, 1)
+	go func() {
+		done <- sampler.Run(ctx, samDef, func(res *sampler.MeasureResult) error {
+			t.Errorf("worker was cancelled, result callback should not have been called, got: %v", res.Data)
+			return nil
+		})
+	}()
+
+	time.Sleep(100 * time.Millisecond)
+	cancelTime := time.Now()
+	cancel()
+
+	if err := <-done; err != nil {
+		t.Errorf("unexpected sampling error: %v", err)
+	}
+	// Cancel should have killed the script using SIGKILL a short time after the
+	// script failed to exit on SIGINT.
+	if cancelTime.Add(1 * time.Second).Before(time.Now()) {
+		t.Errorf("sampling script took more than a second to stop")
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
new file mode 100644
index 0000000..1ee3909
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for syncgroup management.
+
+package measure
+
+import (
+	"fmt"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/security/access"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+// InitSyncgroup creates the syncgroup for the measuring device devId, giving
+// full configuration access to admin. The syncgroup uses sgPublishSb and
+// sgMountTables for publishing (create/join) and syncing, respectively.
+// InitSyncgroup must not be called concurrently for the same devId, or
+// retried with different parameters for the same devId, otherwise behaviour
+// is unspecified.
+func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
+	if err := keyutil.ValidateId(devId); err != nil {
+		return fmt.Errorf("invalid devId: %v", err)
+	}
+
+	sgName := config.SyncgroupName(sgPublishSb, devId)
+	// Check for syncgroup. If it already exists, we have nothing to do.
+	if sgs, err := db.GetSyncgroupNames(ctx); err != nil {
+		return err
+	} else {
+		for _, sg := range sgs {
+			if sg == sgName {
+				return nil
+			}
+		}
+	}
+
+	// Both measured and admin client have full permissions on the syncgroup.
+	sgAcl := access.Permissions{}
+	sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.AllTypicalTags()...)
+	sbutil.AddPermsForPattern(&sgAcl, admin, access.AllTypicalTags()...)
+
+	// Maps all syncgroup prefixes to ACLs.
+	prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
+
+	// StreamDef : <devId>
+	// Admin client has full permissions, measured drops to readonly.
+	prefixStreamDef := nosql_wire.TableRow{
+		TableName: sbmodel.KStreamDef{}.Table(),
+		Row: devId,
+	}
+	aclStreamDef := access.Permissions{}
+	sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
+	sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
+	prefixSpec[prefixStreamDef] = aclStreamDef
+
+	// DataPoint : <devId>
+	// Admin client has full permissions, measured drops to read/write.
+	prefixDataPoint := nosql_wire.TableRow{
+		TableName: sbmodel.KDataPoint{}.Table(),
+		Row: devId,
+	}
+	aclDataPoint := access.Permissions{}
+	sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
+	sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+	prefixSpec[prefixDataPoint] = aclDataPoint
+
+	var prefixes []nosql_wire.TableRow
+	// Apply prefix ACLs to all syncgroup prefixes.
+	for prefix, prefixAcl := range prefixSpec {
+		// Ignore ErrNoAccess, assume we already dropped permissions.
+		err := db.Table(prefix.TableName).SetPrefixPermissions(ctx, nosql.Prefix(prefix.Row), prefixAcl)
+		if err != nil && verror.ErrorID(err) != verror.ErrNoAccess.ID {
+			return err
+		}
+		prefixes = append(prefixes, prefix)
+	}
+
+	sgSpec := nosql_wire.SyncgroupSpec{
+		Description: fmt.Sprintf("measured-main-%s", devId),
+		Perms:       sgAcl,
+		Prefixes:    prefixes,
+		MountTables: sgMountTables,
+	}
+	sgMemberInfo := nosql_wire.SyncgroupMemberInfo{SyncPriority: config.SyncPriority}
+
+	return db.Syncgroup(sgName).Create(ctx, sgSpec, sgMemberInfo)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
new file mode 100644
index 0000000..d447fa6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"v.io/v23/security/access"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+func TestCreateSyncgroup(t *testing.T) {
+	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as measured.
+	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, sbmodel.MeasuredTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	devId := "measured1"
+	admin := "root:two"
+	syncMts := []string{}
+
+	// Creating the syncgroup should succeed.
+	if err := measure.InitSyncgroup(ctxMeasured, db, devId, admin, sbName, syncMts); err != nil {
+		t.Fatalf("InitSyncgroup failed: %v", err)
+	}
+
+	sgName := config.SyncgroupName(sbName, devId)
+	if sgs, err := db.GetSyncgroupNames(ctxMeasured); err != nil {
+		t.Fatalf("GetSyncgroupNames failed: %v", err)
+	} else if got, want := sgs, []string{sgName}; !reflect.DeepEqual(got, want) {
+		t.Errorf("GetSyncgroupNames got: %v, want: %v", got, want)
+	}
+
+	// Creating the syncgroup should be idempotent.
+	if err := measure.InitSyncgroup(ctxMeasured, db, devId, admin, sbName, syncMts); err != nil {
+		t.Errorf("InitSyncgroup should be idempotent, retry failed: %v", err)
+	}
+
+	// measured should have dropped privileges on <StreamDefTable>/<devId>.
+	expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
+		"Admin":{"In":["root:two"]},
+		"Read":{"In":["root:two", "root:one"]},
+		"Write":{"In":["root:two"]},
+		"Debug":{"In":["root:two"]},
+		"Resolve":{"In":["root:two", "root:one"]}
+	}`))
+	if err != nil {
+		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+	}
+	sgDataTable := db.Table(sbmodel.KStreamDef{}.Table())
+	if gotPerms, err := sgDataTable.GetPrefixPermissions(ctxMeasured, devId); err != nil {
+		t.Errorf("GetPrefixPermissions failed: %v", err)
+	} else if got, want := gotPerms[0].Perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
+		t.Errorf("Unexpected permissions on streamdef/<devId>: got %v, want %v", got, want)
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
new file mode 100644
index 0000000..6f7897e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for configuration change watching.
+
+package measure
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/services/watch"
+	"v.io/v23/syncbase/nosql"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// RegisterWorker is called with the key and value of a new or modified
+// StreamDef. Thread safety is not required.
+type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
+
+// WatchForStreams watches Syncbase for new and modified stream definitions
+// for the specified measuring device and calls the register callback.
+// If a malformed StreamDef key or value is encountered, or register returns
+// an error, measured exits with an error.
+// WatchForStreams is synchronous and runs until the context is cancelled or
+// an error is encountered.
+func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+	tableName := sbmodel.KStreamDef{}.Table()
+	watchPrefix := keyutil.Join(devId, "")
+	var resMark watch.ResumeMarker
+
+	// BeginBatch scoped using function with deferred Abort.
+	if err := func() error {
+		bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+		if err != nil {
+			return err
+		}
+		defer bdb.Abort(ctx)
+
+		resMark, err = bdb.GetResumeMarker(ctx)
+		if err != nil {
+			return err
+		}
+
+		// Register samplers for all existing StreamDefs.
+		sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
+		defer sstr.Cancel()
+
+		for sstr.Advance() {
+			key := &sbmodel.KStreamDef{}
+			if err := key.Parse(sstr.Key()); err != nil {
+				return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+			}
+			val := &sbmodel.VStreamDef{}
+			if err := sstr.Value(val); err != nil {
+				return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
+			}
+			if err := register(key, val); err != nil {
+				return err
+			}
+		}
+		return sstr.Err()
+	}(); err != nil {
+		return err
+	}
+
+	// Watch for StreamDef changes and register samplers as needed.
+	ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+	if err != nil {
+		return err
+	}
+	defer ws.Cancel()
+
+	for ws.Advance() {
+		c := ws.Change()
+		key := &sbmodel.KStreamDef{}
+		if err := key.Parse(c.Row); err != nil {
+			return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+		}
+		switch c.ChangeType {
+		case nosql.PutChange:
+			val := &sbmodel.VStreamDef{}
+			if err := c.Value(val); err != nil {
+				return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
+			}
+			if err := register(key, val); err != nil {
+				return err
+			}
+		case nosql.DeleteChange:
+			return fmt.Errorf("StreamDef delete is not supported")
+		}
+	}
+	return ws.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
new file mode 100644
index 0000000..b28cb78
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/syncbase/nosql"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	tu "v.io/x/ref/test/testutil"
+	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var watchTables = []sbmodel.TableSpec{
+	{Prototype: &sbmodel.KStreamDef{}},
+}
+
+func TestWatchForStreams(t *testing.T) {
+	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as measured. Keep write permission on StreamDef.
+	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	devId := "dev1"
+	otherDev := "dev2"
+	expectStreams := []string{"str1", "str2", "str3"}
+	gotStreams := make([]string, 0, len(expectStreams))
+
+	time.Sleep(1 * time.Second)
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
+	putStreamDef(t, ctxMeasured, db, otherDev, "foo")
+
+	// Watch should call the callback for all devId streams, whether they were
+	// written before or after starting watch. Streams for other devices should
+	// not be returned.
+	time.Sleep(1 * time.Second)
+	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+		if got, want := key.DevId, devId; got != want {
+			return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+		}
+		gotStreams = append(gotStreams, val.Desc)
+		return nil
+	})
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])
+
+	time.Sleep(1 * time.Second)
+	putStreamDef(t, ctxMeasured, db, otherDev, "bar")
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err != nil {
+		t.Fatalf("watcher exited with error: %v", err)
+	}
+
+	sort.Strings(gotStreams)
+	sort.Strings(expectStreams)
+	if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+		t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+	}
+}
+
+func TestWatchError(t *testing.T) {
+	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as measured. Keep write permission on StreamDef.
+	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	// Watch should return the callback error.
+	devId := "dev1"
+	cbErr := fmt.Errorf("callbackError")
+	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+		return cbErr
+	})
+	// Put anything to trigger callback.
+	putStreamDef(t, ctxMeasured, db, devId, "foo")
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err != cbErr {
+		t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
+	}
+
+	// Watch should return an error when a malformed key is encountered.
+	devId = "dev2"
+	stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+		return nil
+	})
+	putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err == nil {
+		t.Errorf("watch should have failed on malformed key")
+	}
+}
+
+func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+	ctx, stop = context.WithCancel(ctx)
+	wait = util.AsyncRun(func() error {
+		return measure.WatchForStreams(ctx, db, devId, register)
+	}, nil)
+	return stop, wait
+}
+
+func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+	key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+	val := sbmodel.VStreamDef{Desc: desc}
+	if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
+		t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
new file mode 100644
index 0000000..4286bff
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/keyutil/keyutil.go
@@ -0,0 +1,65 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for building and parsing Sensor Log data keys.
+package keyutil
+
+import (
+	"crypto/rand"
+	"encoding/hex"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+)
+
+const Sep = '$'
+
+// Join joins key parts into key.
+func Join(parts ...string) string {
+	return strings.Join(parts, string(Sep))
+}
+
+// Split splits key into parts and verifies that the number of parts matches
+// the expected number, returning an error otherwise.
+func Split(key string, expectParts int) ([]string, error) {
+	parts := strings.Split(key, string(Sep))
+	if gotParts := len(parts); expectParts != gotParts {
+		return nil, fmt.Errorf("invalid key %q: expected %d parts, got %d", key, expectParts, gotParts)
+	}
+	return parts, nil
+}
+
+func StringifyTime(t time.Time) string {
+	return fmt.Sprintf("%016x", t.UnixNano())
+}
+
+func ParseTime(t string) (time.Time, error) {
+	nsec, err := strconv.ParseInt(t, 16, 64)
+	if err != nil {
+		return time.Time{}, fmt.Errorf("failed parsing time: %v", err)
+	}
+	return time.Unix(0, nsec), nil
+}
+
+// ValidateId returns an error if the argument is not a valid identifier.
+func ValidateId(id string) error {
+	if id == "" {
+		return fmt.Errorf("cannot be empty")
+	}
+	if strings.Contains(id, string(Sep)) {
+		return fmt.Errorf("cannot contain %q", Sep)
+	}
+	return nil
+}
+
+// UUID generates a random UUID.
+func UUID() string {
+	uuid := make([]byte, 16)
+	if _, err := rand.Read(uuid); err != nil {
+		panic(fmt.Errorf("rng failed: %v", err))
+	}
+	// TODO(ivanpi): Use base64 once Syncbase keys are less restricted.
+	return hex.EncodeToString(uuid)
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
new file mode 100644
index 0000000..32167f6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -0,0 +1,87 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Implements PersistentDataKey for each K<T> type, as described in types.vdl.
+
+package sbmodel
+
+import (
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+// PersistentDataKey is a type encapsulating data from the row key of a
+// top-level value type persisted to Syncbase.
+type PersistentDataKey interface {
+	// Table returns the name of the Syncbase table for the data type.
+	Table() string
+	// Key returns the row key for the value.
+	Key() string
+	// Parse parses the row key for the value into self. Returns an error if key
+	// is malformed.
+	Parse(key string) error
+}
+
+// devicecfg : <DevId>
+func (_ KDeviceCfg) Table() string { return "devicecfg" }
+func (k *KDeviceCfg) Key() string  { return keyutil.Join(k.DevId) }
+func (k *KDeviceCfg) Parse(key string) error {
+	parts, err := keyutil.Split(key, 1)
+	if err != nil {
+		return err
+	}
+	k.DevId = parts[0]
+	return nil
+}
+
+// streamdef : <DevId>/<StreamId>
+func (_ KStreamDef) Table() string { return "streamdef" }
+func (k *KStreamDef) Key() string  { return keyutil.Join(k.DevId, k.StreamId) }
+func (k *KStreamDef) Parse(key string) error {
+	parts, err := keyutil.Split(key, 2)
+	if err != nil {
+		return err
+	}
+	k.DevId, k.StreamId = parts[0], parts[1]
+	return nil
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+func (k KDataPoint) Table() string {
+	return "sdata"
+}
+func (k *KDataPoint) Key() string {
+	return keyutil.Join(k.DevId, k.StreamId, keyutil.StringifyTime(k.Timestamp))
+}
+func (k *KDataPoint) Parse(key string) error {
+	parts, err := keyutil.Split(key, 3)
+	if err != nil {
+		return err
+	}
+	timestamp, err := keyutil.ParseTime(parts[2])
+	if err != nil {
+		return err
+	}
+	k.DevId, k.StreamId, k.Timestamp = parts[0], parts[1], timestamp
+	return nil
+}
+
+// TableSpec defines a Syncbase table, encapsulating a key prototype and
+// permissions.
+type TableSpec struct {
+	Prototype PersistentDataKey
+	ReadOnly  bool
+}
+
+// All top-level types persisted to master device Syncbase.
+var MasterTables = []TableSpec{
+	{Prototype: &KDeviceCfg{}},
+	{Prototype: &KStreamDef{}},
+	{Prototype: &KDataPoint{}},
+}
+
+// All top-level types persisted to measured Syncbase.
+var MeasuredTables = []TableSpec{
+	{Prototype: &KStreamDef{}, ReadOnly: true},
+	{Prototype: &KDataPoint{}},
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
new file mode 100644
index 0000000..5f729bf
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Syncbase data model for Sensor Log.
+//
+// Every type <T> stored in Syncbase is defined as a pair of types, K<T> and
+// V<T>, representing data stored in the key and value, respectively, in a
+// single table. K<T> types satisfy the PersistentDataKey interface, supporting
+// conversion to and from the row key.
+package sbmodel
+
+import (
+  "time"
+)
+
+// devicecfg : <DevId>
+// Measuring device handle. Master only.
+type VDeviceCfg struct {
+  // Human-readable, not necessarily unique description of the device.
+  Desc string
+  // Syncbase instance publishing the syncgroup created by the device.
+  SgPublishSb string
+}
+
+type KDeviceCfg struct {
+  DevId string
+}
+
+// streamdef : <DevId>/<StreamId>
+// Configures a stream of data to be measured.
+type VStreamDef struct {
+  // Human-readable, not necessarily unique description of the stream.
+  Desc string
+  // Sampling configuration.
+  Sampler SamplerDef
+  // Flag to start and stop sampling.
+  Enabled bool
+}
+
+type KStreamDef struct {
+  DevId    string
+  StreamId string
+}
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+  // Shell script executed after every Interval, starting from Start.
+  // It should output a single data point, if available. A non-zero exit
+  // status or failure to parse the value will produce an error instead.
+  Script   string
+  Start    time.Time
+  Interval time.Duration
+  // TODO(ivanpi): Add timeout.
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+  DevId     string
+  StreamId  string
+  Timestamp time.Time
+}
+
+type VDataPoint union {
+  Value float64
+  Error string
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
new file mode 100644
index 0000000..3f1c289
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.vdl.go
@@ -0,0 +1,147 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+// Syncbase data model for Sensor Log.
+//
+// Every type <T> stored in Syncbase is defined as a pair of types, K<T> and
+// V<T>, representing data stored in the key and value, respectively, in a
+// single table. K<T> types satisfy the PersistentDataKey interface, supporting
+// conversion to and from the row key.
+package sbmodel
+
+import (
+	// VDL system imports
+	"v.io/v23/vdl"
+
+	// VDL user imports
+	"time"
+	_ "v.io/v23/vdlroot/time"
+)
+
+// devicecfg : <DevId>
+// Measuring device handle. Master only.
+type VDeviceCfg struct {
+	// Human-readable, not necessarily unique description of the device.
+	Desc string
+	// Syncbase instance publishing the syncgroup created by the device.
+	SgPublishSb string
+}
+
+func (VDeviceCfg) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDeviceCfg"`
+}) {
+}
+
+type KDeviceCfg struct {
+	DevId string
+}
+
+func (KDeviceCfg) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDeviceCfg"`
+}) {
+}
+
+// streamdef : <DevId>/<StreamId>
+// Configures a stream of data to be measured.
+type VStreamDef struct {
+	// Human-readable, not necessarily unique description of the stream.
+	Desc string
+	// Sampling configuration.
+	Sampler SamplerDef
+	// Flag to start and stop sampling.
+	Enabled bool
+}
+
+func (VStreamDef) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VStreamDef"`
+}) {
+}
+
+type KStreamDef struct {
+	DevId    string
+	StreamId string
+}
+
+func (KStreamDef) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KStreamDef"`
+}) {
+}
+
+// Sampling script and polling frequency.
+type SamplerDef struct {
+	// Shell script executed after every Interval, starting from Start.
+	// It should output a single data point, if available. A non-zero exit
+	// status or failure to parse the value will produce an error instead.
+	Script   string
+	Start    time.Time
+	Interval time.Duration
+}
+
+func (SamplerDef) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.SamplerDef"`
+}) {
+}
+
+// sdata : <DevId>/<StreamId>/<Timestamp>
+// Measured data value or error.
+type KDataPoint struct {
+	DevId     string
+	StreamId  string
+	Timestamp time.Time
+}
+
+func (KDataPoint) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.KDataPoint"`
+}) {
+}
+
+type (
+	// VDataPoint represents any single field of the VDataPoint union type.
+	VDataPoint interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the VDataPoint union type.
+		__VDLReflect(__VDataPointReflect)
+	}
+	// VDataPointValue represents field Value of the VDataPoint union type.
+	VDataPointValue struct{ Value float64 }
+	// VDataPointError represents field Error of the VDataPoint union type.
+	VDataPointError struct{ Value string }
+	// __VDataPointReflect describes the VDataPoint union type.
+	__VDataPointReflect struct {
+		Name  string `vdl:"v.io/x/sensorlog_lite/internal/sbmodel.VDataPoint"`
+		Type  VDataPoint
+		Union struct {
+			Value VDataPointValue
+			Error VDataPointError
+		}
+	}
+)
+
+func (x VDataPointValue) Index() int                       { return 0 }
+func (x VDataPointValue) Interface() interface{}           { return x.Value }
+func (x VDataPointValue) Name() string                     { return "Value" }
+func (x VDataPointValue) __VDLReflect(__VDataPointReflect) {}
+
+func (x VDataPointError) Index() int                       { return 1 }
+func (x VDataPointError) Interface() interface{}           { return x.Value }
+func (x VDataPointError) Name() string                     { return "Error" }
+func (x VDataPointError) __VDLReflect(__VDataPointReflect) {}
+
+func init() {
+	vdl.Register((*VDeviceCfg)(nil))
+	vdl.Register((*KDeviceCfg)(nil))
+	vdl.Register((*VStreamDef)(nil))
+	vdl.Register((*KStreamDef)(nil))
+	vdl.Register((*SamplerDef)(nil))
+	vdl.Register((*KDataPoint)(nil))
+	vdl.Register((*VDataPoint)(nil))
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
new file mode 100644
index 0000000..f166f99
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sbmodel_test
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+)
+
+type keyTest interface {
+	Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey)
+}
+
+// validKeyTest verifies that the key is correctly parsed and built.
+type validKeyTest struct {
+	keyStr    string
+	keyParsed sbmodel.PersistentDataKey
+}
+
+func (kt *validKeyTest) Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey) {
+	if err := keyPrototype.Parse(kt.keyStr); err != nil {
+		t.Errorf("key %q parse failed: %v", kt.keyStr, err)
+		return
+	}
+	if got, want := keyPrototype, kt.keyParsed; !reflect.DeepEqual(got, want) {
+		t.Errorf("incorrect key parse: got %v, want %v", got, want)
+	}
+	if got, want := kt.keyParsed.Table(), keyPrototype.Table(); got != want {
+		t.Errorf("incorrect parsed key table: got %v, want %v", got, want)
+	}
+	if got, want := kt.keyParsed.Key(), kt.keyStr; got != want {
+		t.Errorf("incorrect key build: got %s, want %s", got, want)
+	}
+}
+
+// invalidKeyTest verifies that malformed key parsing fails.
+type invalidKeyTest struct {
+	keyStr string
+}
+
+func (kt *invalidKeyTest) Run(t *testing.T, keyPrototype sbmodel.PersistentDataKey) {
+	if err := keyPrototype.Parse(kt.keyStr); err == nil {
+		t.Errorf("key %q parse should have failed", kt.keyStr)
+	}
+}
+
+func TestDeviceCfgKeys(t *testing.T) {
+	tests := []keyTest{
+		&validKeyTest{
+			keyStr: keyutil.Join("foo"),
+			keyParsed: &sbmodel.KDeviceCfg{
+				DevId: "foo",
+			},
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("foo", "bar"),
+		},
+	}
+	for _, kt := range tests {
+		kt.Run(t, &sbmodel.KDeviceCfg{})
+	}
+}
+
+func TestStreamDefKeys(t *testing.T) {
+	tests := []keyTest{
+		&validKeyTest{
+			keyStr: keyutil.Join("pc", "cputemp"),
+			keyParsed: &sbmodel.KStreamDef{
+				DevId:    "pc",
+				StreamId: "cputemp",
+			},
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("pc"),
+		},
+	}
+	for _, kt := range tests {
+		kt.Run(t, &sbmodel.KStreamDef{})
+	}
+}
+
+func TestDataPointKeys(t *testing.T) {
+	tests := []keyTest{
+		&validKeyTest{
+			keyStr: keyutil.Join("meter", "amps", "00000022fcde41b2"),
+			keyParsed: &sbmodel.KDataPoint{
+				DevId:     "meter",
+				StreamId:  "amps",
+				Timestamp: time.Unix(0, 0x22fcde41b2),
+			},
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("meter", "amps"),
+		},
+		&invalidKeyTest{
+			keyStr: keyutil.Join("meter", "amps", "2.17"),
+		},
+	}
+	for _, kt := range tests {
+		kt.Run(t, &sbmodel.KDataPoint{})
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
new file mode 100644
index 0000000..1bc1784
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for creating and opening the database in Syncbase.
+package sbutil
+
+import (
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/syncbase"
+	"v.io/v23/syncbase/nosql"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
+// instance, creating it if missing, initializing specified tables.
+func CreateOrOpenDB(ctx *context.T, sbService string, tables []sbmodel.TableSpec) (nosql.Database, error) {
+	aclFull := access.Permissions{}
+	// Allow everyone to resolve to allow joining syncgroups.
+	AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
+	// Restrict other permissions to self.
+	AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin, access.Debug)
+
+	aclReadOnly := access.Permissions{}
+	// Allow everyone to resolve to allow joining syncgroups.
+	AddPermsForPattern(&aclReadOnly, string(security.AllPrincipals), access.Resolve)
+	// Restrict other permissions to self, except Write.
+	AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin, access.Debug)
+
+	sbs := syncbase.NewService(sbService)
+	app := sbs.App(config.AppName)
+	if err := createIfMissing(ctx, app, aclFull); err != nil {
+		return nil, err
+	}
+
+	// TODO(ivanpi): Add schema version.
+	db := app.NoSQLDatabase(config.DBName, nil)
+	if err := createIfMissing(ctx, db, aclFull); err != nil {
+		return nil, err
+	}
+
+	// TODO(ivanpi): Add table schemas when available.
+	for _, ts := range tables {
+		tb := db.Table(ts.Prototype.Table())
+		acl := aclReadOnly
+		if !ts.ReadOnly {
+			acl = aclFull
+		}
+		if err := createIfMissing(ctx, tb, acl); err != nil {
+			return nil, err
+		}
+	}
+
+	return db, nil
+}
+
+// creatable is satisfied by Syncbase hierarchy layers (app, db, table) that
+// can be created and tested for existence.
+type creatable interface {
+	Create(ctx *context.T, acl access.Permissions) error
+	Exists(ctx *context.T) (bool, error)
+}
+
+// createIfMissing checks if the given creatable layer exists and, if not,
+// creates it.
+// TODO(ivanpi): Syncbase client helpers for MustExist / CreateIfMissing.
+func createIfMissing(ctx *context.T, target creatable, acl access.Permissions) error {
+	if exists, err := target.Exists(ctx); err != nil {
+		return err
+	} else if exists {
+		return nil
+	}
+	if err := target.Create(ctx, acl); err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+		return err
+	}
+	return nil
+}
+
+// AddPermsForPrincipal adds to the ACL all specified permissions for all
+// default blessings of the provided principal.
+func AddPermsForPrincipal(acl *access.Permissions, principal security.Principal, tags ...access.Tag) {
+	for _, pattern := range security.DefaultBlessingPatterns(principal) {
+		AddPermsForPattern(acl, string(pattern), tags...)
+	}
+}
+
+// AddPermsForPattern adds to the ACL all specified permissions for the
+// specified pattern.
+func AddPermsForPattern(acl *access.Permissions, pattern string, tags ...access.Tag) {
+	for _, tag := range tags {
+		acl.Add(security.BlessingPattern(pattern), string(tag))
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
new file mode 100644
index 0000000..8b4155a
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -0,0 +1,127 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sbutil_test
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"v.io/v23/security/access"
+	"v.io/v23/verror"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+func TestCreateOrOpenDB(t *testing.T) {
+	_, ctxOwner, sbName, rootPrincipal, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+	defer cleanup()
+	ctxGuest := sbtu.NewCtx(ctxOwner, rootPrincipal, "two")
+
+	// Try to open app/db (create both) as guest, fail with ErrNoAccess.
+	if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Errorf("CreateOrOpenDB should have failed with ErrNoAccess, got error: %v", err)
+	}
+	// Open app/db (create both) as owner.
+	dbOwner, err := sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MasterTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+	// Open existing app/db as guest.
+	if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); err != nil {
+		t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+	// Destroy db (but not app) to simulate interrupted creation.
+	if err := dbOwner.Destroy(ctxOwner); err != nil {
+		t.Errorf("dbOwner.Destroy should have succeeded, got error: %v", err)
+	}
+	// Try to open app/db (create db) as guest, fail with ErrNoAccess.
+	if _, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Errorf("CreateOrOpenDB should have failed with ErrNoAccess, got error: %v", err)
+	}
+	// Open app/db (recreate db) as owner.
+	dbOwner, err = sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MasterTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+	// Open recreated app/db as guest.
+	dbGuest, err := sbutil.CreateOrOpenDB(ctxGuest, sbName, sbmodel.MasterTables)
+	if err != nil {
+		t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+	// Expect db permissions with full access for owner, resolve only for others.
+	expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
+		"Admin":{"In":["root:one"]},
+		"Read":{"In":["root:one"]},
+		"Write":{"In":["root:one"]},
+		"Debug":{"In":["root:one"]},
+		"Resolve":{"In":["..."]}
+	}`))
+	if err != nil {
+		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+	}
+	if perms, _, err := dbOwner.GetPermissions(ctxOwner); err != nil {
+		t.Errorf("GetPermissions should have succeeded, got error: %v", err)
+	} else if got, want := perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
+		t.Errorf("Unexpected database permissions: got %v, want %v", got, want)
+	}
+	// Check that all tables exist.
+	for _, ts := range sbmodel.MasterTables {
+		tb := dbGuest.Table(ts.Prototype.Table())
+		if exists, err := tb.Exists(ctxGuest); err != nil || !exists {
+			t.Errorf("Expected table %s to exist, got: %v (error: %v)", tb.Name(), exists, err)
+		}
+	}
+}
+
+func TestTablePermissions(t *testing.T) {
+	_, ctxOwner, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as owner.
+	dbOwner, err := sbutil.CreateOrOpenDB(ctxOwner, sbName, sbmodel.MeasuredTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	expectPermsFull, err := access.ReadPermissions(bytes.NewBufferString(`{
+		"Admin":{"In":["root:one"]},
+		"Read":{"In":["root:one"]},
+		"Write":{"In":["root:one"]},
+		"Debug":{"In":["root:one"]},
+		"Resolve":{"In":["..."]}
+	}`))
+	if err != nil {
+		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+	}
+	expectPermsReadOnly, err := access.ReadPermissions(bytes.NewBufferString(`{
+		"Admin":{"In":["root:one"]},
+		"Read":{"In":["root:one"]},
+		"Debug":{"In":["root:one"]},
+		"Resolve":{"In":["..."]}
+	}`))
+	if err != nil {
+		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
+	}
+
+	// Check that all tables have correct permissions (full or readonly).
+	for _, ts := range sbmodel.MeasuredTables {
+		tb := dbOwner.Table(ts.Prototype.Table())
+		if exists, err := tb.Exists(ctxOwner); err != nil || !exists {
+			t.Errorf("Expected table %s to exist, got: %v (error: %v)", tb.Name(), exists, err)
+		}
+		want := expectPermsFull
+		if ts.ReadOnly {
+			want = expectPermsReadOnly
+		}
+		if got, err := tb.GetPermissions(ctxOwner); err != nil {
+			t.Errorf("GetPermissions should have succeeded, got error: %v", err)
+		} else if got, want = got.Normalize(), want.Normalize(); !reflect.DeepEqual(got, want) {
+			t.Errorf("Unexpected table %s permissions: got %v, want %v", tb.Name(), got, want)
+		}
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util.go b/go/src/v.io/x/sensorlog/internal/util/util.go
new file mode 100644
index 0000000..e01c63e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for running tasks asynchronously.
+package util
+
+import (
+	"v.io/v23/verror"
+)
+
+type Task func() error
+type ErrorCb func(err error)
+
+// AsyncRun asynchronously starts task and returns a function that can be used
+// to wait for task completion, returning the error if any. In addition, if the
+// task returns a non-nil error, onError is called with the error. onError can
+// be nil. ErrCanceled is ignored (treated as a nil error).
+func AsyncRun(task Task, onError ErrorCb) (wait func() error) {
+	var err error
+	done := make(chan error, 1)
+	go func() {
+		defer close(done)
+		err = task()
+		// Ignore ErrCanceled.
+		if verror.ErrorID(err) == verror.ErrCanceled.ID {
+			err = nil
+		}
+		// Call onError if provided and err is not nil.
+		if err != nil && onError != nil {
+			onError(err)
+		}
+	}()
+	return func() error {
+		<-done
+		return err
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util_test.go b/go/src/v.io/x/sensorlog/internal/util/util_test.go
new file mode 100644
index 0000000..90a820f
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util_test.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+	"errors"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+	errFoo = errors.New("errFoo")
+	errBar = errors.New("errBar")
+)
+
+func TestAsyncRun(t *testing.T) {
+	wait1 := util.AsyncRun(func() error {
+		time.Sleep(200 * time.Microsecond)
+		return nil
+	}, func(err error) {
+		t.Errorf("unexpected callback with error: %v", err)
+	})
+
+	var ecb2 error
+	wait2 := util.AsyncRun(func() error {
+		time.Sleep(100 * time.Microsecond)
+		return errFoo
+	}, func(err error) {
+		ecb2 = err
+	})
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+	if wgot, cbgot, want := wait2(), ecb2, errFoo; wgot != want || cbgot != want {
+		t.Errorf("wait2 unexpected error: wait got %v, cb got %v, want %v", wgot, cbgot, want)
+	}
+}
+
+func TestAsyncRunNoCallback(t *testing.T) {
+	wait1 := util.AsyncRun(func() error {
+		time.Sleep(200 * time.Microsecond)
+		return nil
+	}, nil)
+
+	wait2 := util.AsyncRun(func() error {
+		time.Sleep(100 * time.Microsecond)
+		return errBar
+	}, nil)
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+	if wgot, want := wait2(), errBar; wgot != want {
+		t.Errorf("wait2 unexpected error: got %v, want %v", wgot, want)
+	}
+}
+
+func TestAsyncRunIgnoreErrCancelled(t *testing.T) {
+	ctx, cancel := context.RootContext()
+
+	wait1 := util.AsyncRun(func() error {
+		<-ctx.Done()
+		return verror.New(verror.ErrCanceled, ctx)
+	}, func(err error) {
+		t.Errorf("unexpected callback with error: %v", err)
+	})
+
+	cancel()
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
new file mode 100644
index 0000000..7c75976
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -0,0 +1,113 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// measured is the Sensor Log Lite measuring daemon. Runs on any device,
+// sampling data points and writing them to Syncbase. Sampling configuration
+// is read from Syncbase as written by the client.
+package main
+
+import (
+	"flag"
+	"os"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/x/lib/vlog"
+	"v.io/x/ref/lib/signals"
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/measure/runloop"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+	flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.")
+	flagDevId     = flag.String("devid", "", "DevId to be claimed by this measured. Must be specified.")
+	// Flags below are only applied the first time measured is run against a Syncbase service with the same devid.
+	flagAdmin     = flag.String("admin", "", "Blessing of admin user allowed to join the syncgroup. Must be specified.")
+	flagPublishSb = flag.String("publish-sb", "", "Syncbase service to publish the syncgroup at. Must be absolute. Must be specified. The syncgroup is published as '"+config.SyncgroupName("<publish-sb>", "<devid>")+"'.")
+	flagPublishMt = flag.String("publish-mt", "", "Additional mounttable to use for sync rendezvous in addition to namespace roots. Optional.")
+)
+
+func main() {
+	os.Exit(runMain())
+}
+
+func runMain() int {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	if *flagDevId == "" {
+		vlog.Errorf("-devid must be specified")
+		return 1
+	}
+	if *flagAdmin == "" {
+		vlog.Errorf("-admin must be specified")
+		return 1
+	}
+	if !naming.Rooted(*flagPublishSb) {
+		vlog.Errorf("-publish-sb must be rooted")
+		return 1
+	}
+	publishMts := v23.GetNamespace(ctx).Roots()
+	if *flagPublishMt != "" {
+		publishMts = append(publishMts, *flagPublishMt)
+	}
+
+	db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MeasuredTables)
+	if err != nil {
+		vlog.Errorf("Failed opening Syncbase db: %v", err)
+		return 1
+	}
+	vlog.VI(0).Infof("measured connected to %s", db.FullName())
+
+	if err := measure.InitSyncgroup(ctx, db, *flagDevId, *flagAdmin, *flagPublishSb, publishMts); err != nil {
+		vlog.Errorf("Failed initializing syncgroup: %v", err)
+		return 1
+	}
+
+	ctx, stop := context.WithCancel(ctx)
+
+	ml := runloop.NewLoop(func(err error) {
+		vlog.Errorf("Measure loop failed: %v", err)
+		stop()
+	})
+
+	writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		return db.Table(key.Table()).Put(ctx, key.Key(), val)
+	}
+
+	waitWatch := util.AsyncRun(func() error {
+		return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+			if val.Enabled {
+				ml.Register(ctx, key, &val.Sampler, writer)
+			} else {
+				ml.Unregister(key)
+			}
+			return nil
+		})
+	}, func(err error) {
+		vlog.Errorf("Watch failed: %v", err)
+		stop()
+	})
+
+	defer func() {
+		_ = waitWatch()
+		ml.WaitAll()
+	}()
+
+	select {
+	case <-signals.ShutdownOnSignals(nil):
+		stop()
+		vlog.VI(0).Infof("Exiting on signal")
+		return 0
+	case <-ctx.Done():
+		vlog.VI(0).Infof("Exiting on error")
+		return 1
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh b/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh
new file mode 100755
index 0000000..e8ca090
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/run_cli_syncbased.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Starts an instance of master device syncbased and required services.
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_IPADDR_PORT, SL_TMPDIR
+function main() {
+  local -r PREFIX="${SL_PREFIX:-sl/client}"
+  local -r DEVID="${SL_DEVID:-main}"
+  local -r NAME="${PREFIX}/${DEVID}"
+  local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8202}"
+  local -r TMPDIR="${SL_TMPDIR:-sltmp}/${NAME}"
+
+  mkdir -p "${TMPDIR}"
+  trap "kill_child_processes; exit 1" ERR EXIT
+  run_mounttabled "${NAME}" "${IPADDR_PORT}"
+  run_syncbased "/${IPADDR_PORT}" "${NAME}" "${TMPDIR}"
+  # Wait for signal.
+  while true; do
+    sleep 10
+  done
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/scripts/run_measured.sh b/go/src/v.io/x/sensorlog/scripts/run_measured.sh
new file mode 100755
index 0000000..f034099
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/run_measured.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Starts an instance of measured and required services.
+#
+# mounttabled is started locally at IPADDR:PORT, mounting itself to the global
+# mounttable at $GLOBAL_MT/users/$USER/$PREFIX/$DEVID. syncbased is started in
+# the local mounttable at $PREFIX/$DEVID/syncbased.
+# measured is started, creating a syncgroup published at the local syncbased as
+# reached through the global mounttable:
+# $GLOBAL_MT/users/$USER/$PREFIX/$DEVID/$PREFIX/$DEVID/syncbased
+# Once the syncgroup is joined, sync is configured to use either the global or
+# the local mounttable (unless the local mounttable IPADDR:PORT changes).
+# measured drops most permissions on prefixes in the syncgroup. Full admin
+# permissions are granted to $ADMIN (as a blessing extension of the same
+# default blessing as the one running this script).
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_ADMIN, SL_IPADDR_PORT, SL_USER, SL_GLOBAL_MT, SL_TMPDIR
+function main() {
+  local -r PREFIX="${SL_PREFIX:-sl/measured}"
+  local -r DEVID="${SL_DEVID:-$(gen_uuid)}"
+  local -r ADMIN="${SL_ADMIN:-sl/client}"
+  local -r NAME="${PREFIX}/${DEVID}"
+  local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8707}"
+  local -r USER="${SL_USER:-$(get_user_email)}"
+  local -r GLOBAL_MT="${SL_GLOBAL_MT:-/ns.dev.v.io:8101}"
+  local -r TMPDIR="${SL_TMPDIR:-sltmp}/${NAME}"
+
+  mkdir -p "${TMPDIR}"
+  trap "kill_child_processes; exit 1" ERR EXIT
+  run_mounttabled "${NAME}" "${IPADDR_PORT}" "${GLOBAL_MT}/users/${USER}"
+  run_syncbased "/${IPADDR_PORT}" "${NAME}" "${TMPDIR}"
+  run_measured "/${IPADDR_PORT}" "${NAME}" "${DEVID}" "${ADMIN}" \
+      "${GLOBAL_MT}/users/${USER}/${NAME}/${NAME}/syncbased" \
+      "${GLOBAL_MT}/users/${USER}"
+  # Wait for signal.
+  while true; do
+    sleep 10
+  done
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
new file mode 100644
index 0000000..c503c91
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
@@ -0,0 +1,131 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Functions for starting the Sensor Log daemon and required services with
+# appropriate blessings. Expected to be run with V23_CREDENTIALS or through
+# an agent. NAME parameters are used both in service names and in blessing
+# extensions.
+
+set -eu
+
+# Default to one week timeout.
+readonly TIMEOUT='168h'
+
+# Kills all child processes of the current process.
+function kill_child_processes() {
+  kill -TERM -- -"${BASHPID}" || true
+  sleep 1
+  kill -KILL -- -"${BASHPID}" || true
+}
+export -f kill_child_processes
+
+# Generates a hex-encoded 16-byte random UUID.
+function gen_uuid() {
+  head -c 256 /dev/urandom | sha256sum | cut -c 1-32
+}
+export -f gen_uuid
+
+readonly BLESSING_CHAIN_SEPARATOR=':'
+
+# Converts name to blessing extension.
+# name_to_blessing NAME
+function name_to_blessing() {
+  sed -e "s,/,${BLESSING_CHAIN_SEPARATOR},g" <<< "$@"
+}
+export -f name_to_blessing
+
+# Gets first default blessing for the principal set in the environment.
+function get_blessing_root() {
+  "${JIRI_ROOT}"/release/go/bin/principal dump -s | cut -d ' ' -f 1 | cut -d ',' -f 1
+}
+export -f get_blessing_root
+
+# Extracts email address from the blessing obtained by get_blessing_root.
+function get_user_email() {
+  get_blessing_root | tr "${BLESSING_CHAIN_SEPARATOR}" '\n' | grep '@' | head -n 1
+}
+export -f get_user_email
+
+# Starts mounttabled at IPADDR:PORT. If $GLOBAL_MOUNT is provided, the
+# mounttable mounts itself under $GLOBAL_MOUNT/$NAME.
+# run_mounttabled NAME IPADDR:PORT [GLOBAL_MOUNT]
+function run_mounttabled() {
+  local -r NAME="$1"
+  local -r IPADDR_PORT="$2"
+  local GLOBAL_MOUNT="${3:-}"
+  if [[ -n "${GLOBAL_MOUNT}" ]]; then
+    GLOBAL_MOUNT="${GLOBAL_MOUNT}/${NAME}"
+  fi
+  # TODO(ivanpi): Lock down mounttable permissions.
+  "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+    -name="$(name_to_blessing "${NAME}/mounttabled")" \
+    "${JIRI_ROOT}"/release/go/bin/mounttabled -v23.tcp.address "${IPADDR_PORT}" \
+    -name="${GLOBAL_MOUNT}" \
+    &
+  sleep 1
+}
+export -f run_mounttabled
+
+# Starts syncbased with permissions other than resolve restricted to
+# <blessing_root>:$NAME.
+# run_syncbased MT NAME TMPDIR
+function run_syncbased() {
+  local -r MT="$1"
+  local -r NAME="$2"
+  local -r TMPDIR="$3"
+  local -r DEF_BLESSING_RUNNER="$(name_to_blessing "$(get_blessing_root)/${NAME}")"
+  local -r PERMISSIONS_LITERAL="{\
+\"Admin\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Read\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Write\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Debug\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
+\"Resolve\":{\"In\":[\"...\"]} \
+}"
+  "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+    -name "$(name_to_blessing "${NAME}/syncbased")" \
+    "${JIRI_ROOT}"/release/go/bin/syncbased -v23.namespace.root "${MT}" -name "${NAME}/syncbased" \
+    -engine leveldb -root-dir "${TMPDIR}/${NAME}/syncbased" \
+    -v23.permissions.literal "${PERMISSIONS_LITERAL}" \
+    &
+  sleep 1
+}
+export -f run_syncbased
+
+# Starts measured that uses $PUBLISH_SB to publish the syncgroup. Expects a
+# syncbase instance to have been started at $MT with the same $NAME. If
+# $PUBLISH_MT is provided, the syncgroup is advertised there in addition to
+# the local mounttable.
+# run_measured MT NAME DEVID ADMIN PUBLISH_SB [PUBLISH_MT]
+function run_measured() {
+  local -r MT="$1"
+  local -r NAME="$2"
+  local -r DEVID="$3"
+  local -r ADMIN="$4"
+  local -r PUBLISH_SB="$5"
+  local -r PUBLISH_MT="${6:-}"
+  local -r DEF_BLESSING_ADMIN="$(name_to_blessing "$(get_blessing_root)/${ADMIN}")"
+  "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+    -name="$(name_to_blessing "${NAME}")" \
+    "${JIRI_ROOT}"/experimental/projects/sensorlog_lite/bin/measured -v23.namespace.root "${MT}" \
+    -service "${NAME}/syncbased" -devid="${DEVID}" -admin="${DEF_BLESSING_ADMIN}" \
+    -publish-sb "${PUBLISH_SB}" -publish-mt="${PUBLISH_MT}" \
+    -alsologtostderr \
+    &
+  sleep 1
+}
+export -f run_measured
+
+# Runs slcli against master Syncbase with specified $NAME.
+# run_slcli MT NAME [args...]
+function run_slcli() {
+  local -r MT="$1"
+  local -r NAME="$2"
+  shift 2
+  "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \
+    -name "$(name_to_blessing "${NAME}")" \
+    "${JIRI_ROOT}"/experimental/projects/sensorlog_lite/bin/slcli -v23.namespace.root "${MT}" \
+    -service "${NAME}/syncbased" "$@"
+}
+export -f run_slcli
diff --git a/go/src/v.io/x/sensorlog/scripts/slcli.sh b/go/src/v.io/x/sensorlog/scripts/slcli.sh
new file mode 100755
index 0000000..04fe39c
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/scripts/slcli.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Copyright 2015 The Vanadium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+# Runs slcli against master device Syncbase.
+
+set -eu
+
+source "${JIRI_ROOT}/experimental/projects/sensorlog_lite/src/v.io/x/sensorlog_lite/scripts/runner_lib.sh"
+
+# Must be run with V23_CREDENTIALS set or through the agent.
+# Optional environment variables: SL_PREFIX, SL_DEVID, SL_IPADDR_PORT
+function main() {
+  local -r PREFIX="${SL_PREFIX:-sl/client}"
+  local -r DEVID="${SL_DEVID:-main}"
+  local -r NAME="${PREFIX}/${DEVID}"
+  local -r IPADDR_PORT="${SL_IPADDR_PORT:-$(dig $(hostname) +short):8202}"
+  run_slcli "/${IPADDR_PORT}" "${NAME}" "$@"
+}
+
+main "$@"
diff --git a/go/src/v.io/x/sensorlog/slcli/device.go b/go/src/v.io/x/sensorlog/slcli/device.go
new file mode 100644
index 0000000..ee658f0
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/device.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli device configuration.
+
+package main
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/lib/v23cmd"
+	"v.io/x/sensorlog_lite/internal/client"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+var cmdSLDevice = &cmdline.Command{
+	Name:  "device",
+	Short: "Manage measuring devices",
+	Long: `
+Add measuring devices.
+
+TODO(ivanpi): List.
+`,
+	Children: []*cmdline.Command{cmdSLDeviceAdd /*, cmdSLDeviceList */},
+}
+
+var cmdSLDeviceAdd = &cmdline.Command{
+	Runner: v23cmd.RunnerFunc(runSLDeviceAdd),
+	Name:   "add",
+	Short:  "Add new measuring device",
+	Long: `
+Adds a new measuring device and outputs its identifier.
+`,
+	ArgsName: "<publish_sb> <device_id> [<device_desc>]",
+	ArgsLong: `
+<publish_sb> is the rooted name of the Syncbase instance where the device
+             syncgroup is published.
+
+<device_id> is the identifier of the device to add.
+
+<device_desc> is a human-readable description of the device.
+              It doesn't need to be unique.
+`,
+}
+
+func runSLDeviceAdd(ctx *context.T, env *cmdline.Env, args []string) error {
+	if len(args) < 2 || len(args) > 3 {
+		return env.UsageErrorf("expects between 2 and 3 arguments")
+	}
+	sgPublishSb := args[0]
+	devId := args[1]
+	desc := ""
+	if len(args) > 2 {
+		desc = args[2]
+	}
+
+	db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed opening Syncbase db: %v", err)
+	}
+
+	devKey, err := client.AddDevice(ctx, db, devId, sgPublishSb, desc)
+	if err != nil {
+		return err
+	}
+
+	fmt.Fprintf(env.Stdout, "%s\n", devKey.DevId)
+
+	return nil
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/list.go b/go/src/v.io/x/sensorlog/slcli/list.go
new file mode 100644
index 0000000..3a87fa5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/list.go
@@ -0,0 +1,85 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli measured data listing support.
+
+package main
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/lib/signals"
+	"v.io/x/ref/lib/v23cmd"
+	"v.io/x/sensorlog_lite/internal/client"
+	"v.io/x/sensorlog_lite/internal/config"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var cmdSLList = &cmdline.Command{
+	Runner: v23cmd.RunnerFunc(runSLList),
+	Name:   "list",
+	Short:  "List measured data",
+	Long: `
+Prints all measured data points for the given stream and device, sorted
+chronologically.
+`,
+	ArgsName: "<device_id> <stream_id>",
+	ArgsLong: `
+<device_id> and <stream_id> specify the data stream to print the data points
+from.
+`,
+}
+
+var (
+	flagFollow bool
+)
+
+func init() {
+	cmdSLList.Flags.BoolVar(&flagFollow, "follow", false, "Follow updates until killed, similar to 'tail -f'.")
+}
+
+// TODO(ivanpi): Add time interval querying and aggregation functions.
+func runSLList(ctx *context.T, env *cmdline.Env, args []string) error {
+	if len(args) != 2 {
+		return env.UsageErrorf("expects exactly 2 arguments")
+	}
+	streamKey := &sbmodel.KStreamDef{
+		DevId:    args[0],
+		StreamId: args[1],
+	}
+
+	dataPrintCb := func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
+		return nil
+	}
+
+	db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed opening Syncbase db: %v", err)
+	}
+
+	if !flagFollow {
+		return client.ListStreamData(ctx, db, streamKey, dataPrintCb)
+	}
+
+	ctx, stop := context.WithCancel(ctx)
+
+	waitFollow := util.AsyncRun(func() error {
+		return client.FollowStreamData(ctx, db, streamKey, dataPrintCb)
+	}, func(err error) {
+		stop()
+	})
+
+	select {
+	case <-signals.ShutdownOnSignals(nil):
+		stop()
+	case <-ctx.Done():
+	}
+
+	return waitFollow()
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/slcli.go b/go/src/v.io/x/sensorlog/slcli/slcli.go
new file mode 100644
index 0000000..20c65f6
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/slcli.go
@@ -0,0 +1,39 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli is the Sensor Log Lite command line configuration tool and client.
+// Must be run against master device Syncbase.
+// Supports configuring sampling streams by writing configuration to Syncbase,
+// which is then read by measured on the target device.
+// Also supports listing measured data.
+// TODO(ivanpi): Add data querying and graph plotting.
+package main
+
+import (
+	"flag"
+	"regexp"
+
+	"v.io/x/lib/cmdline"
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/sensorlog_lite/internal/config"
+)
+
+var (
+	flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.")
+)
+
+var cmdSensorLogLite = &cmdline.Command{
+	Name:  "slcli",
+	Short: "Sensor Log Lite command line configuration tool and client",
+	Long: `
+Command line interface for Sensor Log Lite, used for listing data and
+manipulating configuration via a master device Syncbase.
+`,
+	Children: []*cmdline.Command{cmdSLDevice, cmdSLStream, cmdSLList},
+}
+
+func main() {
+	cmdline.HideGlobalFlagsExcept(regexp.MustCompile(`^((service)|(v23\.namespace\.root)|(v23\.credentials))$`))
+	cmdline.Main(cmdSensorLogLite)
+}
diff --git a/go/src/v.io/x/sensorlog/slcli/stream.go b/go/src/v.io/x/sensorlog/slcli/stream.go
new file mode 100644
index 0000000..23c75bc
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/slcli/stream.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// slcli stream configuration.
+
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/lib/v23cmd"
+	"v.io/x/sensorlog_lite/internal/client"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+)
+
+var cmdSLStream = &cmdline.Command{
+	Name:  "stream",
+	Short: "Manage sampling streams",
+	Long: `
+Create sampling streams.
+
+TODO(ivanpi): Enable/disable, list.
+`,
+	Children: []*cmdline.Command{cmdSLStreamCreate /*, cmdSLStreamEnable, cmdSLStreamDisable, cmdSLStreamList*/},
+}
+
+var cmdSLStreamCreate = &cmdline.Command{
+	Runner: v23cmd.RunnerFunc(runSLStreamCreate),
+	Name:   "create",
+	Short:  "Create new data stream",
+	Long: `
+Creates a new sampling stream and outputs its identifier.
+
+The sampling script is read from stdin. It must be a bash script that prints
+a single floating point result per invocation and exits with a zero status,
+otherwise an error is logged instead of the data point. The script will be
+run on the specified device at the specified frequency whenever the measuring
+daemon is running on the device, as long as the stream is enabled.
+`,
+	ArgsName: "<device_id> <stream_id> <interval> [<stream_desc>]",
+	ArgsLong: `
+<device_id> is the identifier of the device to use for sampling.
+
+<stream_id> is the identifier of the stream to be created. It must be unique
+            per measuring device.
+
+<interval> is the time interval between two subsequent invocations of the
+           measuring script. Measurements may be skipped if the interval is
+           too short compared to the device and sampling script speed.
+
+<stream_desc> is a human-readable description of the stream.
+              It doesn't need to be unique.
+`,
+}
+
+func runSLStreamCreate(ctx *context.T, env *cmdline.Env, args []string) error {
+	if len(args) < 3 || len(args) > 4 {
+		return env.UsageErrorf("expects between 3 and 4 arguments")
+	}
+	devKey := &sbmodel.KDeviceCfg{
+		DevId: args[0],
+	}
+	streamId := args[1]
+	interval, err := time.ParseDuration(args[2])
+	if err != nil {
+		return fmt.Errorf("failed parsing interval %q: %v", args[2], err)
+	}
+	desc := ""
+	if len(args) > 3 {
+		desc = args[3]
+	}
+
+	script, err := ioutil.ReadAll(env.Stdin)
+	if err != nil {
+		return err
+	}
+
+	db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
+	if err != nil {
+		return fmt.Errorf("failed opening Syncbase db: %v", err)
+	}
+
+	streamKey, err := client.CreateStream(ctx, db, devKey, streamId, string(script), interval, desc)
+	if err != nil {
+		return err
+	}
+
+	fmt.Fprintf(env.Stdout, "%s\n", streamKey.StreamId)
+
+	return nil
+}