sensorlog_lite: measured watch for stream configuration changes.
Watch the store for StreamDef changes and call callback on change.
Utility for starting goroutines with error callbacks.
Change-Id: I1bbfc1b8fb7061c9bed36192c1a691197fdeec40
diff --git a/go/src/v.io/x/sensorlog/internal/measure/doc.go b/go/src/v.io/x/sensorlog/internal/measure/doc.go
index e753d44..c7e18f3 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/doc.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/doc.go
@@ -4,5 +4,5 @@
// Package measure implements Sensor Log measured methods, intended to run
// against the measured device Syncbase. It supports configuring the device
-// syncgroup.
+// syncgroup and watching for stream configuration changes.
package measure
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
new file mode 100644
index 0000000..0975be5
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Measured methods for configuration change watching.
+
+package measure
+
+import (
+ "fmt"
+
+ "v.io/v23/context"
+ nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/services/watch"
+ "v.io/v23/syncbase/nosql"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+)
+
+// RegisterWorker is called with the key and value of a new or modified
+// StreamDef. Thread safety is not required.
+type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
+
+// WatchForStreams watches Syncbase for new and modified stream definitions
+// for the specified measuring device and calls the register callback.
+// If a malformed StreamDef key or value is encountered, or register returns
+// an error, measured exits with an error.
+// WatchForStreams is synchronous and runs until the context is cancelled or
+// an error is encountered.
+func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
+ tableName := sbmodel.KStreamDef{}.Table()
+ watchPrefix := devId
+ var resMark watch.ResumeMarker
+
+ // BeginBatch scoped using function with deferred Abort.
+ if err := func() error {
+ bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ return err
+ }
+ defer bdb.Abort(ctx)
+
+ resMark, err = bdb.GetResumeMarker(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Register samplers for all existing StreamDefs.
+ sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KStreamDef{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+ }
+ val := &sbmodel.VStreamDef{}
+ if err := sstr.Value(val); err != nil {
+ return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
+ }
+ if err := register(key, val); err != nil {
+ return err
+ }
+ }
+ return sstr.Err()
+ }(); err != nil {
+ return err
+ }
+
+ // Watch for StreamDef changes and register samplers as needed.
+ ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+ if err != nil {
+ return err
+ }
+ defer ws.Cancel()
+
+ for ws.Advance() {
+ c := ws.Change()
+ key := &sbmodel.KStreamDef{}
+ if err := key.Parse(c.Row); err != nil {
+ return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
+ }
+ switch c.ChangeType {
+ case nosql.PutChange:
+ val := &sbmodel.VStreamDef{}
+ if err := c.Value(val); err != nil {
+ return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
+ }
+ if err := register(key, val); err != nil {
+ return err
+ }
+ case nosql.DeleteChange:
+ return fmt.Errorf("StreamDef delete is not supported")
+ }
+ }
+ return ws.Err()
+}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
new file mode 100644
index 0000000..b28cb78
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package measure_test
+
+import (
+ "fmt"
+ "reflect"
+ "sort"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/syncbase/nosql"
+ _ "v.io/x/ref/runtime/factories/generic"
+ sbtu "v.io/x/ref/services/syncbase/testutil"
+ tu "v.io/x/ref/test/testutil"
+ "v.io/x/sensorlog_lite/internal/measure"
+ "v.io/x/sensorlog_lite/internal/sbmodel"
+ "v.io/x/sensorlog_lite/internal/sbmodel/keyutil"
+ "v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var watchTables = []sbmodel.TableSpec{
+ {Prototype: &sbmodel.KStreamDef{}},
+}
+
+func TestWatchForStreams(t *testing.T) {
+ _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as measured. Keep write permission on StreamDef.
+ db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ devId := "dev1"
+ otherDev := "dev2"
+ expectStreams := []string{"str1", "str2", "str3"}
+ gotStreams := make([]string, 0, len(expectStreams))
+
+ time.Sleep(1 * time.Second)
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
+ putStreamDef(t, ctxMeasured, db, otherDev, "foo")
+
+ // Watch should call the callback for all devId streams, whether they were
+ // written before or after starting watch. Streams for other devices should
+ // not be returned.
+ time.Sleep(1 * time.Second)
+ stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+ if got, want := key.DevId, devId; got != want {
+ return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
+ }
+ gotStreams = append(gotStreams, val.Desc)
+ return nil
+ })
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])
+
+ time.Sleep(1 * time.Second)
+ putStreamDef(t, ctxMeasured, db, otherDev, "bar")
+ putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err != nil {
+ t.Fatalf("watcher exited with error: %v", err)
+ }
+
+ sort.Strings(gotStreams)
+ sort.Strings(expectStreams)
+ if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
+ t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
+ }
+}
+
+func TestWatchError(t *testing.T) {
+ _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one/sb", nil)
+ defer cleanup()
+
+ // Open app/db (create both) as measured. Keep write permission on StreamDef.
+ db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchTables)
+ if err != nil {
+ t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
+ }
+
+ // Watch should return the callback error.
+ devId := "dev1"
+ cbErr := fmt.Errorf("callbackError")
+ stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+ return cbErr
+ })
+ // Put anything to trigger callback.
+ putStreamDef(t, ctxMeasured, db, devId, "foo")
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err != cbErr {
+ t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
+ }
+
+ // Watch should return an error when a malformed key is encountered.
+ devId = "dev2"
+ stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
+ return nil
+ })
+ putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")
+
+ time.Sleep(3 * time.Second)
+ stop()
+ if err := wait(); err == nil {
+ t.Errorf("watch should have failed on malformed key")
+ }
+}
+
+func runWatchForStreams(ctx *context.T, db nosql.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
+ ctx, stop = context.WithCancel(ctx)
+ wait = util.AsyncRun(func() error {
+ return measure.WatchForStreams(ctx, db, devId, register)
+ }, nil)
+ return stop, wait
+}
+
+func putStreamDef(t *testing.T, ctx *context.T, db nosql.DatabaseHandle, devId, desc string) {
+ key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+ val := sbmodel.VStreamDef{Desc: desc}
+ if err := db.Table(key.Table()).Put(ctx, key.Key(), &val); err != nil {
+ t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util.go b/go/src/v.io/x/sensorlog/internal/util/util.go
new file mode 100644
index 0000000..e01c63e
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Utilities for running tasks asynchronously.
+package util
+
+import (
+ "v.io/v23/verror"
+)
+
+type Task func() error
+type ErrorCb func(err error)
+
+// AsyncRun asynchronously starts task and returns a function that can be used
+// to wait for task completion, returning the error if any. In addition, if the
+// task returns a non-nil error, onError is called with the error. onError can
+// be nil. ErrCanceled is ignored (treated as a nil error).
+func AsyncRun(task Task, onError ErrorCb) (wait func() error) {
+ var err error
+ done := make(chan error, 1)
+ go func() {
+ defer close(done)
+ err = task()
+ // Ignore ErrCanceled.
+ if verror.ErrorID(err) == verror.ErrCanceled.ID {
+ err = nil
+ }
+ // Call onError if provided and err is not nil.
+ if err != nil && onError != nil {
+ onError(err)
+ }
+ }()
+ return func() error {
+ <-done
+ return err
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/internal/util/util_test.go b/go/src/v.io/x/sensorlog/internal/util/util_test.go
new file mode 100644
index 0000000..90a820f
--- /dev/null
+++ b/go/src/v.io/x/sensorlog/internal/util/util_test.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/sensorlog_lite/internal/util"
+)
+
+var (
+ errFoo = errors.New("errFoo")
+ errBar = errors.New("errBar")
+)
+
+func TestAsyncRun(t *testing.T) {
+ wait1 := util.AsyncRun(func() error {
+ time.Sleep(200 * time.Microsecond)
+ return nil
+ }, func(err error) {
+ t.Errorf("unexpected callback with error: %v", err)
+ })
+
+ var ecb2 error
+ wait2 := util.AsyncRun(func() error {
+ time.Sleep(100 * time.Microsecond)
+ return errFoo
+ }, func(err error) {
+ ecb2 = err
+ })
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+ if wgot, cbgot, want := wait2(), ecb2, errFoo; wgot != want || cbgot != want {
+ t.Errorf("wait2 unexpected error: wait got %v, cb got %v, want %v", wgot, cbgot, want)
+ }
+}
+
+func TestAsyncRunNoCallback(t *testing.T) {
+ wait1 := util.AsyncRun(func() error {
+ time.Sleep(200 * time.Microsecond)
+ return nil
+ }, nil)
+
+ wait2 := util.AsyncRun(func() error {
+ time.Sleep(100 * time.Microsecond)
+ return errBar
+ }, nil)
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+ if wgot, want := wait2(), errBar; wgot != want {
+ t.Errorf("wait2 unexpected error: got %v, want %v", wgot, want)
+ }
+}
+
+func TestAsyncRunIgnoreErrCancelled(t *testing.T) {
+ ctx, cancel := context.RootContext()
+
+ wait1 := util.AsyncRun(func() error {
+ <-ctx.Done()
+ return verror.New(verror.ErrCanceled, ctx)
+ }, func(err error) {
+ t.Errorf("unexpected callback with error: %v", err)
+ })
+
+ cancel()
+
+ if wgot, want := wait1(), error(nil); wgot != want {
+ t.Errorf("wait1 unexpected error: got %v, want %v", wgot, want)
+ }
+}
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index 1e442db..105eabb 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -12,6 +12,7 @@
"os"
"v.io/v23"
+ "v.io/v23/context"
"v.io/v23/naming"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/signals"
@@ -20,6 +21,7 @@
"v.io/x/sensorlog_lite/internal/measure"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
)
var (
@@ -64,7 +66,27 @@
return 1
}
- <-signals.ShutdownOnSignals(ctx)
+ ctx, stop := context.WithCancel(ctx)
- return 0
+ waitWatch := util.AsyncRun(func() error {
+ return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
+ ctx.Infof("watch: %s, %v", key.Key(), &val)
+ return nil
+ })
+ }, func(err error) {
+ vlog.Errorf("Watch failed: %v", err)
+ stop()
+ })
+
+ defer waitWatch()
+
+ select {
+ case <-signals.ShutdownOnSignals(nil):
+ stop()
+ vlog.VI(0).Infof("Exiting on signal")
+ return 0
+ case <-ctx.Done():
+ vlog.VI(0).Infof("Exiting on error")
+ return 1
+ }
}