sensorlog: Simplify watch and ACLs.

Use watch from empty resume marker and chained ACL construction.
Remove unused ACL tags.

Change-Id: If63997612b48fc782399721b7e9e1a3ac1b5560f
diff --git a/go/src/v.io/x/sensorlog/Makefile b/go/src/v.io/x/sensorlog/Makefile
index 703091c..e8f6bd3 100644
--- a/go/src/v.io/x/sensorlog/Makefile
+++ b/go/src/v.io/x/sensorlog/Makefile
@@ -26,7 +26,6 @@
 .PHONY: test
 test:
 	jiri go test v.io/x/sensorlog/...
-	# TODO(ivanpi): Add jiri command to run integration tests.
 	jiri go test v.io/x/sensorlog/internal/client -run=TestV23* -v23.tests
 
 .PHONY: clean
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index e658247..0a0b4d9 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -12,7 +12,6 @@
 
 	"v.io/v23/context"
 	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/services/watch"
 	"v.io/v23/syncbase/nosql"
 	"v.io/v23/verror"
 	"v.io/x/sensorlog/internal/sbmodel"
@@ -25,8 +24,39 @@
 // in chronological order, calling listCb for each.
 // TODO(ivanpi): Allow specifying time interval.
 func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
-	_, err := listStreamData(ctx, db, streamKey, listCb)
-	return err
+	tableName := sbmodel.KDataPoint{}.Table()
+	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+	if err != nil {
+		return err
+	}
+	defer bdb.Abort(ctx)
+
+	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+	if exists, err := streamRow.Exists(ctx); err != nil {
+		return err
+	} else if !exists {
+		return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+	}
+
+	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+	defer sstr.Cancel()
+
+	for sstr.Advance() {
+		key := &sbmodel.KDataPoint{}
+		if err := key.Parse(sstr.Key()); err != nil {
+			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+		}
+		var val sbmodel.VDataPoint
+		if err := sstr.Value(&val); err != nil {
+			return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+		}
+		if err := listCb(key, val); err != nil {
+			return err
+		}
+	}
+	return sstr.Err()
 }
 
 // FollowStreamData lists all data points for the stream specified by streamKey
@@ -37,13 +67,8 @@
 	tableName := sbmodel.KDataPoint{}.Table()
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
-	resMark, err := listStreamData(ctx, db, streamKey, listCb)
-	if err != nil {
-		return err
-	}
-
-	// Watch for new DataPoints.
-	ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+	// Watch for DataPoints, existing followed by new.
+	ws, err := db.Watch(ctx, tableName, dataPrefix, nil)
 	if err != nil {
 		return err
 	}
@@ -82,50 +107,6 @@
 	return ws.Err()
 }
 
-// listStreamData implements listing (scanning over) existing stream data. It
-// also returns the resume marker to allow watching for future data.
-func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
-	var resMark watch.ResumeMarker
-	tableName := sbmodel.KDataPoint{}.Table()
-	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
-
-	bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
-	if err != nil {
-		return resMark, err
-	}
-	defer bdb.Abort(ctx)
-
-	resMark, err = bdb.GetResumeMarker(ctx)
-	if err != nil {
-		return resMark, err
-	}
-
-	streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
-	if exists, err := streamRow.Exists(ctx); err != nil {
-		return resMark, err
-	} else if !exists {
-		return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
-	}
-
-	sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
-	defer sstr.Cancel()
-
-	for sstr.Advance() {
-		key := &sbmodel.KDataPoint{}
-		if err := key.Parse(sstr.Key()); err != nil {
-			return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
-		}
-		var val sbmodel.VDataPoint
-		if err := sstr.Value(&val); err != nil {
-			return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
-		}
-		if err := listCb(key, val); err != nil {
-			return resMark, err
-		}
-	}
-	return resMark, sstr.Err()
-}
-
 type dataPoint struct {
 	Key sbmodel.KDataPoint
 	Val sbmodel.VDataPoint
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 9e5c17d..72278b0 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -27,6 +27,7 @@
 // InitSyncgroup must not be called concurrently for the same devId, or
 // retried with different parameters for the same devId, otherwise behaviour
 // is unspecified.
+// TODO(ivanpi): Remove Resolve permissions when v.io/i/1110 is fixed.
 func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
 	if err := keyutil.ValidateId(devId); err != nil {
 		return fmt.Errorf("invalid devId: %v", err)
@@ -46,8 +47,8 @@
 
 	// Both measured and admin client have full permissions on the syncgroup.
 	sgAcl := access.Permissions{}
-	sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.AllTypicalTags()...)
-	sbutil.AddPermsForPattern(&sgAcl, admin, access.AllTypicalTags()...)
+	sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.Read, access.Admin)
+	sbutil.AddPermsForPattern(&sgAcl, admin, access.Read, access.Admin)
 
 	// Maps all syncgroup prefixes to ACLs.
 	prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
@@ -60,7 +61,7 @@
 	}
 	aclStreamDef := access.Permissions{}
 	sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
-	sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
+	sbutil.AddPermsForPattern(&aclStreamDef, admin, access.Resolve, access.Read, access.Write, access.Admin)
 	prefixSpec[prefixStreamDef] = aclStreamDef
 
 	// DataPoint : <devId>
@@ -71,7 +72,7 @@
 	}
 	aclDataPoint := access.Permissions{}
 	sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
-	sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+	sbutil.AddPermsForPattern(&aclDataPoint, admin, access.Resolve, access.Read, access.Write, access.Admin)
 	prefixSpec[prefixDataPoint] = aclDataPoint
 
 	var prefixes []nosql_wire.TableRow
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
index 6e5f1a2..1522c73 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
@@ -5,10 +5,10 @@
 package measure_test
 
 import (
-	"bytes"
 	"reflect"
 	"testing"
 
+	"v.io/v23/security"
 	"v.io/v23/security/access"
 	_ "v.io/x/ref/runtime/factories/generic"
 	sbtu "v.io/x/ref/services/syncbase/testutil"
@@ -50,16 +50,9 @@
 	}
 
 	// measured should have dropped privileges on <StreamDefTable>/<devId>.
-	expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
-		"Admin":{"In":["root:two"]},
-		"Read":{"In":["root:two", "root:one"]},
-		"Write":{"In":["root:two"]},
-		"Debug":{"In":["root:two"]},
-		"Resolve":{"In":["root:two", "root:one"]}
-	}`))
-	if err != nil {
-		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
-	}
+	expectPerms := access.Permissions{}.
+		Add(security.BlessingPattern("root:one"), string(access.Resolve), string(access.Read)).
+		Add(security.BlessingPattern("root:two"), string(access.Resolve), string(access.Read), string(access.Write), string(access.Admin))
 	sgDataTable := db.Table(sbmodel.KStreamDef{}.Table())
 	if gotPerms, err := sgDataTable.GetPrefixPermissions(ctxMeasured, devId); err != nil {
 		t.Errorf("GetPrefixPermissions failed: %v", err)
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 3340308..3518d91 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -10,8 +10,6 @@
 	"fmt"
 
 	"v.io/v23/context"
-	nosql_wire "v.io/v23/services/syncbase/nosql"
-	"v.io/v23/services/watch"
 	"v.io/v23/syncbase/nosql"
 	"v.io/x/sensorlog/internal/sbmodel"
 	"v.io/x/sensorlog/internal/sbmodel/keyutil"
@@ -21,8 +19,9 @@
 // StreamDef. Thread safety is not required.
 type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
 
-// WatchForStreams watches Syncbase for new and modified stream definitions
-// for the specified measuring device and calls the register callback.
+// WatchForStreams watches Syncbase for existing, new, and modified stream
+// definitions for the specified measuring device and calls the register
+// callback.
 // If a malformed StreamDef key or value is encountered, or register returns
 // an error, measured exits with an error.
 // WatchForStreams is synchronous and runs until the context is cancelled or
@@ -30,45 +29,9 @@
 func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
 	tableName := sbmodel.KStreamDef{}.Table()
 	watchPrefix := keyutil.Join(devId, "")
-	var resMark watch.ResumeMarker
-
-	// BeginBatch scoped using function with deferred Abort.
-	if err := func() error {
-		bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
-		if err != nil {
-			return err
-		}
-		defer bdb.Abort(ctx)
-
-		resMark, err = bdb.GetResumeMarker(ctx)
-		if err != nil {
-			return err
-		}
-
-		// Register samplers for all existing StreamDefs.
-		sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
-		defer sstr.Cancel()
-
-		for sstr.Advance() {
-			key := &sbmodel.KStreamDef{}
-			if err := key.Parse(sstr.Key()); err != nil {
-				return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
-			}
-			val := &sbmodel.VStreamDef{}
-			if err := sstr.Value(val); err != nil {
-				return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
-			}
-			if err := register(key, val); err != nil {
-				return err
-			}
-		}
-		return sstr.Err()
-	}(); err != nil {
-		return err
-	}
 
 	// Watch for StreamDef changes and register samplers as needed.
-	ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+	ws, err := db.Watch(ctx, tableName, watchPrefix, nil)
 	if err != nil {
 		return err
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index 3e0eedd..ff824b1 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -24,13 +24,13 @@
 	// Allow everyone to resolve to allow joining syncgroups.
 	AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
 	// Restrict other permissions to self.
-	AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin, access.Debug)
+	AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin)
 
 	aclReadOnly := access.Permissions{}
 	// Allow everyone to resolve to allow joining syncgroups.
 	AddPermsForPattern(&aclReadOnly, string(security.AllPrincipals), access.Resolve)
 	// Restrict other permissions to self, except Write.
-	AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin, access.Debug)
+	AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin)
 
 	sbs := syncbase.NewService(sbService)
 	app := sbs.App(config.AppName)
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
index e57a228..eb41163 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -5,10 +5,10 @@
 package sbutil_test
 
 import (
-	"bytes"
 	"reflect"
 	"testing"
 
+	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
 	_ "v.io/x/ref/runtime/factories/generic"
@@ -54,16 +54,9 @@
 		t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
 	}
 	// Expect db permissions with full access for owner, resolve only for others.
-	expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
-		"Admin":{"In":["root:one"]},
-		"Read":{"In":["root:one"]},
-		"Write":{"In":["root:one"]},
-		"Debug":{"In":["root:one"]},
-		"Resolve":{"In":["..."]}
-	}`))
-	if err != nil {
-		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
-	}
+	expectPerms := access.Permissions{}.
+		Add(security.AllPrincipals, string(access.Resolve)).
+		Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read), string(access.Write))
 	if perms, _, err := dbOwner.GetPermissions(ctxOwner); err != nil {
 		t.Errorf("GetPermissions should have succeeded, got error: %v", err)
 	} else if got, want := perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
@@ -88,25 +81,12 @@
 		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
 	}
 
-	expectPermsFull, err := access.ReadPermissions(bytes.NewBufferString(`{
-		"Admin":{"In":["root:one"]},
-		"Read":{"In":["root:one"]},
-		"Write":{"In":["root:one"]},
-		"Debug":{"In":["root:one"]},
-		"Resolve":{"In":["..."]}
-	}`))
-	if err != nil {
-		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
-	}
-	expectPermsReadOnly, err := access.ReadPermissions(bytes.NewBufferString(`{
-		"Admin":{"In":["root:one"]},
-		"Read":{"In":["root:one"]},
-		"Debug":{"In":["root:one"]},
-		"Resolve":{"In":["..."]}
-	}`))
-	if err != nil {
-		t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
-	}
+	expectPermsFull := access.Permissions{}.
+		Add(security.AllPrincipals, string(access.Resolve)).
+		Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read), string(access.Write))
+	expectPermsReadOnly := access.Permissions{}.
+		Add(security.AllPrincipals, string(access.Resolve)).
+		Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read))
 
 	// Check that all tables have correct permissions (full or readonly).
 	for _, ts := range sbmodel.MeasuredTables {
diff --git a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
index 96f9f0b..62af0c7 100644
--- a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
+++ b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
@@ -80,7 +80,6 @@
 \"Admin\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
 \"Read\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
 \"Write\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
-\"Debug\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
 \"Resolve\":{\"In\":[\"...\"]} \
 }"
   "${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \