sensorlog_lite: measured watch for stream configuration changes.

Watch the store for StreamDef changes and call callback on change.
Utility for starting goroutines with error callbacks.

Change-Id: I1bbfc1b8fb7061c9bed36192c1a691197fdeec40
diff --git a/go/src/v.io/x/sensorlog/internal/measure/doc.go b/go/src/v.io/x/sensorlog/internal/measure/doc.go
index e753d44..c7e18f3 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/doc.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/doc.go
@@ -4,5 +4,5 @@
 
 // Package measure implements Sensor Log measured methods, intended to run
 // against the measured device Syncbase. It supports configuring the device
-// syncgroup.
+// syncgroup and watching for stream configuration changes.
 package measure
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
new file mode 100644
index 0000000..0975be5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for configuration change watching.
+
+package measure
+
+import (
+	"fmt"
+
+	"v.io/v23/context"
+	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/services/watch"
+	"v.io/v23/syncbase/nosql"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// RegisterWorker is called with the key and value of a new or modified
+// StreamDef. Thread safety is not required.
+type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
+
+// WatchForStreams watches Syncbase for new and modified stream definitions
+// for the specified measuring device and calls the register callback.
+// If a malformed StreamDef key or value is encountered, or register returns
+// an error, measured exits with an error.
+// WatchForStreams is synchronous and runs until the context is cancelled or
+// an error is encountered.
+func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+	tableName := sbmodel.KStreamDef{}.Table()
+	watchPrefix := devId
+	var resMark watch.ResumeMarker
+
+	// BeginBatch scoped using function with deferred Abort.
+	if err := func() error {
+		bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+		if err != nil {
+			return err
+		}
+		defer bdb.Abort(ctx)
+
+		resMark, err = bdb.GetResumeMarker(ctx)
+		if err != nil {
+			return err
+		}
+
+		// Register samplers for all existing StreamDefs.
+		sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
+		defer sstr.Cancel()
+
+		for sstr.Advance() {
+			key := &sbmodel.KStreamDef{}
+			if err := key.Parse(sstr.Key()); err != nil {
+				return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+			}
+			val := &sbmodel.VStreamDef{}
+			if err := sstr.Value(val); err != nil {
+				return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
+			}
+			if err := register(key, val); err != nil {
+				return err
+			}
+		}
+		return sstr.Err()
+	}(); err != nil {
+		return err
+	}
+
+	// Watch for StreamDef changes and register samplers as needed.
+	ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+	if err != nil {
+		return err
+	}
+	defer ws.Cancel()
+
+	for ws.Advance() {
+		c := ws.Change()
+		key := &sbmodel.KStreamDef{}
+		if err := key.Parse(c.Row); err != nil {
+			return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+		}
+		switch c.ChangeType {
+		case nosql.PutChange:
+			val := &sbmodel.VStreamDef{}
+			if err := c.Value(val); err != nil {
+				return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
+			}
+			if err := register(key, val); err != nil {
+				return err
+			}
+		case nosql.DeleteChange:
+			return fmt.Errorf("StreamDef delete is not supported")
+		}
+	}
+	return ws.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
new file mode 100644
index 0000000..b28cb78
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/syncbase/nosql"
+	_ "v.io/x/ref/runtime/factories/generic"
+	sbtu "v.io/x/ref/services/syncbase/testutil"
+	tu "v.io/x/ref/test/testutil"
+	"v.io/x/sensorlog_lite/internal/measure"
+	"v.io/x/sensorlog_lite/internal/sbmodel"
+	"v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var watchTables = []sbmodel.TableSpec{
+	{Prototype: &sbmodel.KStreamDef{}},
+}
+
+func TestWatchForStreams(t *testing.T) {
+	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as measured. Keep write permission on StreamDef.
+	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	devId := "dev1"
+	otherDev := "dev2"
+	expectStreams := []string{"str1", "str2", "str3"}
+	gotStreams := make([]string, 0, len(expectStreams))
+
+	time.Sleep(1 * time.Second)
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
+	putStreamDef(t, ctxMeasured, db, otherDev, "foo")
+
+	// Watch should call the callback for all devId streams, whether they were
+	// written before or after starting watch. Streams for other devices should
+	// not be returned.
+	time.Sleep(1 * time.Second)
+	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+		if got, want := key.DevId, devId; got != want {
+			return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+		}
+		gotStreams = append(gotStreams, val.Desc)
+		return nil
+	})
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])
+
+	time.Sleep(1 * time.Second)
+	putStreamDef(t, ctxMeasured, db, otherDev, "bar")
+	putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err != nil {
+		t.Fatalf("watcher exited with error: %v", err)
+	}
+
+	sort.Strings(gotStreams)
+	sort.Strings(expectStreams)
+	if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+		t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+	}
+}
+
+func TestWatchError(t *testing.T) {
+	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+	defer cleanup()
+
+	// Open app/db (create both) as measured. Keep write permission on StreamDef.
+	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+	if err != nil {
+		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+	}
+
+	// Watch should return the callback error.
+	devId := "dev1"
+	cbErr := fmt.Errorf("callbackError")
+	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+		return cbErr
+	})
+	// Put anything to trigger callback.
+	putStreamDef(t, ctxMeasured, db, devId, "foo")
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err != cbErr {
+		t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
+	}
+
+	// Watch should return an error when a malformed key is encountered.
+	devId = "dev2"
+	stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+		return nil
+	})
+	putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")
+
+	time.Sleep(3 * time.Second)
+	stop()
+	if err := wait(); err == nil {
+		t.Errorf("watch should have failed on malformed key")
+	}
+}
+
+func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+	ctx, stop = context.WithCancel(ctx)
+	wait = util.AsyncRun(func() error {
+		return measure.WatchForStreams(ctx, db, devId, register)
+	}, nil)
+	return stop, wait
+}
+
+func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+	key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+	val := sbmodel.VStreamDef{Desc: desc}
+	if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
+		t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util.go b/go/src/v.io/x/sensorlog/internal/util/util.go
new file mode 100644
index 0000000..e01c63e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for running tasks asynchronously.
+package util
+
+import (
+	"v.io/v23/verror"
+)
+
+type Task func() error
+type ErrorCb func(err error)
+
+// AsyncRun asynchronously starts task and returns a function that can be used
+// to wait for task completion, returning the error if any. In addition, if the
+// task returns a non-nil error, onError is called with the error. onError can
+// be nil. ErrCanceled is ignored (treated as a nil error).
+func AsyncRun(task Task, onError ErrorCb) (wait func() error) {
+	var err error
+	done := make(chan error, 1)
+	go func() {
+		defer close(done)
+		err = task()
+		// Ignore ErrCanceled.
+		if verror.ErrorID(err) == verror.ErrCanceled.ID {
+			err = nil
+		}
+		// Call onError if provided and err is not nil.
+		if err != nil && onError != nil {
+			onError(err)
+		}
+	}()
+	return func() error {
+		<-done
+		return err
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util_test.go b/go/src/v.io/x/sensorlog/internal/util/util_test.go
new file mode 100644
index 0000000..90a820f
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util_test.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+	"errors"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+	"v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+	errFoo = errors.New("errFoo")
+	errBar = errors.New("errBar")
+)
+
+func TestAsyncRun(t *testing.T) {
+	wait1 := util.AsyncRun(func() error {
+		time.Sleep(200 * time.Microsecond)
+		return nil
+	}, func(err error) {
+		t.Errorf("unexpected callback with error: %v", err)
+	})
+
+	var ecb2 error
+	wait2 := util.AsyncRun(func() error {
+		time.Sleep(100 * time.Microsecond)
+		return errFoo
+	}, func(err error) {
+		ecb2 = err
+	})
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+	if wgot, cbgot, want := wait2(), ecb2, errFoo; wgot != want || cbgot != want {
+		t.Errorf("wait2 unexpected error: wait got %v, cb got %v, want %v", wgot, cbgot, want)
+	}
+}
+
+func TestAsyncRunNoCallback(t *testing.T) {
+	wait1 := util.AsyncRun(func() error {
+		time.Sleep(200 * time.Microsecond)
+		return nil
+	}, nil)
+
+	wait2 := util.AsyncRun(func() error {
+		time.Sleep(100 * time.Microsecond)
+		return errBar
+	}, nil)
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+	if wgot, want := wait2(), errBar; wgot != want {
+		t.Errorf("wait2 unexpected error: got %v, want %v", wgot, want)
+	}
+}
+
+func TestAsyncRunIgnoreErrCancelled(t *testing.T) {
+	ctx, cancel := context.RootContext()
+
+	wait1 := util.AsyncRun(func() error {
+		<-ctx.Done()
+		return verror.New(verror.ErrCanceled, ctx)
+	}, func(err error) {
+		t.Errorf("unexpected callback with error: %v", err)
+	})
+
+	cancel()
+
+	if wgot, want := wait1(), error(nil); wgot != want {
+		t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+	}
+}
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index 1e442db..105eabb 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -12,6 +12,7 @@
 	"os"
 
 	"v.io/v23"
+	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/lib/signals"
@@ -20,6 +21,7 @@
 	"v.io/x/sensorlog_lite/internal/measure"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
 )
 
 var (
@@ -64,7 +66,27 @@
 		return 1
 	}
 
-	<-signals.ShutdownOnSignals(ctx)
+	ctx, stop := context.WithCancel(ctx)
 
-	return 0
+	waitWatch := util.AsyncRun(func() error {
+		return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+			ctx.Infof("watch: %s, %v", key.Key(), &val)
+			return nil
+		})
+	}, func(err error) {
+		vlog.Errorf("Watch failed: %v", err)
+		stop()
+	})
+
+	defer waitWatch()
+
+	select {
+	case <-signals.ShutdownOnSignals(nil):
+		stop()
+		vlog.VI(0).Infof("Exiting on signal")
+		return 0
+	case <-ctx.Done():
+		vlog.VI(0).Infof("Exiting on error")
+		return 1
+	}
 }