sensorlog_lite: Add slcli list -follow.
slcli list now supports -follow flag to continue listing new data
points until killed, similar to 'tail -f'.
Change-Id: I8b6f67387709b9d58877f6b1a198a97c0133810b
diff --git a/go/src/v.io/x/sensorlog/README.md b/go/src/v.io/x/sensorlog/README.md
index 23e403a..82059e7 100644
--- a/go/src/v.io/x/sensorlog/README.md
+++ b/go/src/v.io/x/sensorlog/README.md
@@ -77,6 +77,10 @@
$ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list dev1 stream42
+To keep listing newly measured data until `slcli` is killed, add `-follow`:
+
+ $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list -follow dev1 stream42
+
### Debugging
The internal state of the master Syncbase can be examined using `sb`:
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 92919a6..e2aedb0 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -8,9 +8,11 @@
import (
"fmt"
+ "sort"
"v.io/v23/context"
nosql_wire "v.io/v23/services/syncbase/nosql"
+ "v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
"v.io/v23/verror"
"v.io/x/sensorlog_lite/internal/sbmodel"
@@ -23,20 +25,86 @@
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+ _, err := listStreamData(ctx, db, streamKey, listCb)
+ return err
+}
+
+// FollowStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each. It keeps listing new data
+// points until ctx is cancelled.
+// TODO(ivanpi): Allow specifying time interval.
+func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+ tableName := sbmodel.KDataPoint{}.Table()
+ dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+ resMark, err := listStreamData(ctx, db, streamKey, listCb)
+ if err != nil {
+ return err
+ }
+
+ // Watch for new DataPoints.
+ ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+ if err != nil {
+ return err
+ }
+ defer ws.Cancel()
+
+ trans := make([]*dataPoint, 0, 16)
+ for ws.Advance() {
+ c := ws.Change()
+ var elem dataPoint
+ if err := elem.Key.Parse(c.Row); err != nil {
+ return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ switch c.ChangeType {
+ case nosql.PutChange:
+ if err := c.Value(&elem.Val); err != nil {
+ return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
+ }
+ // In the absence of overlapping syncgroups or partial ACLs, the order of
+ // writes is preserved by sync. However, several writes may be grouped
+ // into a single batch, and the order is not preserved within a batch.
+ // Each batch is manually sorted before being emitted to the callback.
+ trans = append(trans, &elem)
+ if !c.Continued {
+ sort.Stable(dataPointSort(trans))
+ for _, elem := range trans {
+ if err := listCb(&elem.Key, elem.Val); err != nil {
+ return err
+ }
+ }
+ trans = trans[:0]
+ }
+ case nosql.DeleteChange:
+ // no-op
+ }
+ }
+ return ws.Err()
+}
+
+// listStreamData implements listing (scanning over) existing stream data. It
+// also returns the resume marker to allow watching for future data.
+func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
+ var resMark watch.ResumeMarker
tableName := sbmodel.KDataPoint{}.Table()
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
if err != nil {
- return err
+ return resMark, err
}
defer bdb.Abort(ctx)
+ resMark, err = bdb.GetResumeMarker(ctx)
+ if err != nil {
+ return resMark, err
+ }
+
streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
if exists, err := streamRow.Exists(ctx); err != nil {
- return err
+ return resMark, err
} else if !exists {
- return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+ return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
}
sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
@@ -45,15 +113,27 @@
for sstr.Advance() {
key := &sbmodel.KDataPoint{}
if err := key.Parse(sstr.Key()); err != nil {
- return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
}
var val sbmodel.VDataPoint
if err := sstr.Value(&val); err != nil {
- return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+ return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
}
if err := listCb(key, val); err != nil {
- return err
+ return resMark, err
}
}
- return sstr.Err()
+ return resMark, sstr.Err()
}
+
+type dataPoint struct {
+ Key sbmodel.KDataPoint
+ Val sbmodel.VDataPoint
+}
+
+// dataPointSort implements sorting a dataPoint slice by timestamp.
+type dataPointSort []*dataPoint
+
+func (s dataPointSort) Len() int { return len(s) }
+func (s dataPointSort) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
diff --git a/go/src/v.io/x/sensorlog/slcli/list.go b/go/src/v.io/x/sensorlog/slcli/list.go
index d65e1e6..3a87fa5 100644
--- a/go/src/v.io/x/sensorlog/slcli/list.go
+++ b/go/src/v.io/x/sensorlog/slcli/list.go
@@ -11,11 +11,13 @@
"v.io/v23/context"
"v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
"v.io/x/sensorlog_lite/internal/client"
"v.io/x/sensorlog_lite/internal/config"
"v.io/x/sensorlog_lite/internal/sbmodel"
"v.io/x/sensorlog_lite/internal/sbutil"
+ "v.io/x/sensorlog_lite/internal/util"
)
var cmdSLList = &cmdline.Command{
@@ -33,6 +35,14 @@
`,
}
+var (
+ flagFollow bool
+)
+
+func init() {
+ cmdSLList.Flags.BoolVar(&flagFollow, "follow", false, "Follow updates until killed, similar to 'tail -f'.")
+}
+
// TODO(ivanpi): Add time interval querying and aggregation functions.
func runSLList(ctx *context.T, env *cmdline.Env, args []string) error {
if len(args) != 2 {
@@ -43,13 +53,33 @@
StreamId: args[1],
}
+ dataPrintCb := func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+ fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
+ return nil
+ }
+
db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
if err != nil {
return fmt.Errorf("failed opening Syncbase db: %v", err)
}
- return client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
- fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
- return nil
+ if !flagFollow {
+ return client.ListStreamData(ctx, db, streamKey, dataPrintCb)
+ }
+
+ ctx, stop := context.WithCancel(ctx)
+
+ waitFollow := util.AsyncRun(func() error {
+ return client.FollowStreamData(ctx, db, streamKey, dataPrintCb)
+ }, func(err error) {
+ stop()
})
+
+ select {
+ case <-signals.ShutdownOnSignals(nil):
+ stop()
+ case <-ctx.Done():
+ }
+
+ return waitFollow()
}