sensorlog_lite: Add slcli list -follow.

slcli list now supports -follow flag to continue listing new data
points until killed, similar to 'tail -f'.

Change-Id: I8b6f67387709b9d58877f6b1a198a97c0133810b
diff --git a/go/src/v.io/x/sensorlog/README.md b/go/src/v.io/x/sensorlog/README.md
index 23e403a..82059e7 100644
--- a/go/src/v.io/x/sensorlog/README.md
+++ b/go/src/v.io/x/sensorlog/README.md
@@ -77,6 +77,10 @@
 
     $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list dev1 stream42
 
+To keep listing newly measured data until `slcli` is killed, add `-follow`:
+
+    $ V23_CREDENTIALS=<creds> ./scripts/slcli.sh list -follow dev1 stream42
+
 ### Debugging
 
 The internal state of the master Syncbase can be examined using `sb`:
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 92919a6..e2aedb0 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -8,9 +8,11 @@
 
 import (
 	"fmt"
+	"sort"
 
 	"v.io/v23/context"
 	nosql_wire "v.io/v23/services/syncbase/nosql"
+	"v.io/v23/services/watch"
 	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
@@ -23,20 +25,86 @@
 // in chronological order, calling listCb for each.
 // TODO(ivanpi): Allow specifying time interval.
 func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+	_, err := listStreamData(ctx, db, streamKey, listCb)
+	return err
+}
+
+// FollowStreamData lists all data points for the stream specified by streamKey
+// in chronological order, calling listCb for each. It keeps listing new data
+// points until ctx is cancelled.
+// TODO(ivanpi): Allow specifying time interval.
+func FollowStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
+	tableName := sbmodel.KDataPoint{}.Table()
+	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+	resMark, err := listStreamData(ctx, db, streamKey, listCb)
+	if err != nil {
+		return err
+	}
+
+	// Watch for new DataPoints.
+	ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+	if err != nil {
+		return err
+	}
+	defer ws.Cancel()
+
+	trans := make([]*dataPoint, 0, 16)
+	for ws.Advance() {
+		c := ws.Change()
+		var elem dataPoint
+		if err := elem.Key.Parse(c.Row); err != nil {
+			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+		}
+		switch c.ChangeType {
+		case nosql.PutChange:
+			if err := c.Value(&elem.Val); err != nil {
+				return fmt.Errorf("invalid DataPoint value for key %s: %v", c.Row, err)
+			}
+			// In the absence of overlapping syncgroups or partial ACLs, the order of
+			// writes is preserved by sync. However, several writes may be grouped
+			// into a single batch, and the order is not preserved within a batch.
+			// Each batch is manually sorted before being emitted to the callback.
+			trans = append(trans, &elem)
+			if !c.Continued {
+				sort.Stable(dataPointSort(trans))
+				for _, elem := range trans {
+					if err := listCb(&elem.Key, elem.Val); err != nil {
+						return err
+					}
+				}
+				trans = trans[:0]
+			}
+		case nosql.DeleteChange:
+			// no-op
+		}
+	}
+	return ws.Err()
+}
+
+// listStreamData implements listing (scanning over) existing stream data. It
+// also returns the resume marker to allow watching for future data.
+func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
+	var resMark watch.ResumeMarker
 	tableName := sbmodel.KDataPoint{}.Table()
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
 	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
 	if err != nil {
-		return err
+		return resMark, err
 	}
 	defer bdb.Abort(ctx)
 
+	resMark, err = bdb.GetResumeMarker(ctx)
+	if err != nil {
+		return resMark, err
+	}
+
 	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
 	if exists, err := streamRow.Exists(ctx); err != nil {
-		return err
+		return resMark, err
 	} else if !exists {
-		return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+		return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
 	}
 
 	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
@@ -45,15 +113,27 @@
 	for sstr.Advance() {
 		key := &sbmodel.KDataPoint{}
 		if err := key.Parse(sstr.Key()); err != nil {
-			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+			return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
 		}
 		var val sbmodel.VDataPoint
 		if err := sstr.Value(&val); err != nil {
-			return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+			return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
 		}
 		if err := listCb(key, val); err != nil {
-			return err
+			return resMark, err
 		}
 	}
-	return sstr.Err()
+	return resMark, sstr.Err()
 }
+
+type dataPoint struct {
+	Key sbmodel.KDataPoint
+	Val sbmodel.VDataPoint
+}
+
+// dataPointSort implements sorting a dataPoint slice by timestamp.
+type dataPointSort []*dataPoint
+
+func (s dataPointSort) Len() int           { return len(s) }
+func (s dataPointSort) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+func (s dataPointSort) Less(i, j int) bool { return s[i].Key.Timestamp.Before(s[j].Key.Timestamp) }
diff --git a/go/src/v.io/x/sensorlog/slcli/list.go b/go/src/v.io/x/sensorlog/slcli/list.go
index d65e1e6..3a87fa5 100644
--- a/go/src/v.io/x/sensorlog/slcli/list.go
+++ b/go/src/v.io/x/sensorlog/slcli/list.go
@@ -11,11 +11,13 @@
 
 	"v.io/v23/context"
 	"v.io/x/lib/cmdline"
+	"v.io/x/ref/lib/signals"
 	"v.io/x/ref/lib/v23cmd"
 	"v.io/x/sensorlog_lite/internal/client"
 	"v.io/x/sensorlog_lite/internal/config"
 	"v.io/x/sensorlog_lite/internal/sbmodel"
 	"v.io/x/sensorlog_lite/internal/sbutil"
+	"v.io/x/sensorlog_lite/internal/util"
 )
 
 var cmdSLList = &cmdline.Command{
@@ -33,6 +35,14 @@
 `,
 }
 
+var (
+	flagFollow bool
+)
+
+func init() {
+	cmdSLList.Flags.BoolVar(&flagFollow, "follow", false, "Follow updates until killed, similar to 'tail -f'.")
+}
+
 // TODO(ivanpi): Add time interval querying and aggregation functions.
 func runSLList(ctx *context.T, env *cmdline.Env, args []string) error {
 	if len(args) != 2 {
@@ -43,13 +53,33 @@
 		StreamId: args[1],
 	}
 
+	dataPrintCb := func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
+		fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
+		return nil
+	}
+
 	db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MasterTables)
 	if err != nil {
 		return fmt.Errorf("failed opening Syncbase db: %v", err)
 	}
 
-	return client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
-		fmt.Fprintf(env.Stdout, "%s %v\n", key.Timestamp.Format(config.TimeOutputFormat), val.Interface())
-		return nil
+	if !flagFollow {
+		return client.ListStreamData(ctx, db, streamKey, dataPrintCb)
+	}
+
+	ctx, stop := context.WithCancel(ctx)
+
+	waitFollow := util.AsyncRun(func() error {
+		return client.FollowStreamData(ctx, db, streamKey, dataPrintCb)
+	}, func(err error) {
+		stop()
 	})
+
+	select {
+	case <-signals.ShutdownOnSignals(nil):
+		stop()
+	case <-ctx.Done():
+	}
+
+	return waitFollow()
 }