sensorlog: Simplify watch and ACLs.
Use watch from empty resume marker and chained ACL construction.
Remove unused ACL tags.
Change-Id: If63997612b48fc782399721b7e9e1a3ac1b5560f
diff --git a/go/src/v.io/x/sensorlog/Makefile b/go/src/v.io/x/sensorlog/Makefile
index 703091c..e8f6bd3 100644
--- a/go/src/v.io/x/sensorlog/Makefile
+++ b/go/src/v.io/x/sensorlog/Makefile
@@ -26,7 +26,6 @@
.PHONY: test
test:
jiri go test v.io/x/sensorlog/...
- # TODO(ivanpi): Add jiri command to run integration tests.
jiri go test v.io/x/sensorlog/internal/client -run=TestV23* -v23.tests
.PHONY: clean
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index e658247..0a0b4d9 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -12,7 +12,6 @@
"v.io/v23/context"
nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/sbmodel"
@@ -25,8 +24,39 @@
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
- _, err := listStreamData(ctx, db, streamKey, listCb)
- return err
+ tableName := sbmodel.KDataPoint{}.Table()
+ dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
+
+ bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ return err
+ }
+ defer bdb.Abort(ctx)
+
+ streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
+ if exists, err := streamRow.Exists(ctx); err != nil {
+ return err
+ } else if !exists {
+ return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
+ }
+
+ sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KDataPoint{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ var val sbmodel.VDataPoint
+ if err := sstr.Value(&val); err != nil {
+ return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+ }
+ if err := listCb(key, val); err != nil {
+ return err
+ }
+ }
+ return sstr.Err()
}
// FollowStreamData lists all data points for the stream specified by streamKey
@@ -37,13 +67,8 @@
tableName := sbmodel.KDataPoint{}.Table()
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
- resMark, err := listStreamData(ctx, db, streamKey, listCb)
- if err != nil {
- return err
- }
-
- // Watch for new DataPoints.
- ws, err := db.Watch(ctx, tableName, dataPrefix, resMark)
+ // Watch for DataPoints, existing followed by new.
+ ws, err := db.Watch(ctx, tableName, dataPrefix, nil)
if err != nil {
return err
}
@@ -82,50 +107,6 @@
return ws.Err()
}
-// listStreamData implements listing (scanning over) existing stream data. It
-// also returns the resume marker to allow watching for future data.
-func listStreamData(ctx *context.T, db nosql.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) (watch.ResumeMarker, error) {
- var resMark watch.ResumeMarker
- tableName := sbmodel.KDataPoint{}.Table()
- dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
-
- bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
- if err != nil {
- return resMark, err
- }
- defer bdb.Abort(ctx)
-
- resMark, err = bdb.GetResumeMarker(ctx)
- if err != nil {
- return resMark, err
- }
-
- streamRow := bdb.Table(streamKey.Table()).Row(streamKey.Key())
- if exists, err := streamRow.Exists(ctx); err != nil {
- return resMark, err
- } else if !exists {
- return resMark, verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
- }
-
- sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(dataPrefix))
- defer sstr.Cancel()
-
- for sstr.Advance() {
- key := &sbmodel.KDataPoint{}
- if err := key.Parse(sstr.Key()); err != nil {
- return resMark, fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
- }
- var val sbmodel.VDataPoint
- if err := sstr.Value(&val); err != nil {
- return resMark, fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
- }
- if err := listCb(key, val); err != nil {
- return resMark, err
- }
- }
- return resMark, sstr.Err()
-}
-
type dataPoint struct {
Key sbmodel.KDataPoint
Val sbmodel.VDataPoint
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 9e5c17d..72278b0 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -27,6 +27,7 @@
// InitSyncgroup must not be called concurrently for the same devId, or
// retried with different parameters for the same devId, otherwise behaviour
// is unspecified.
+// TODO(ivanpi): Remove Resolve permissions when v.io/i/1110 is fixed.
func InitSyncgroup(ctx *context.T, db nosql.Database, devId, admin, sgPublishSb string, sgMountTables []string) error {
if err := keyutil.ValidateId(devId); err != nil {
return fmt.Errorf("invalid devId: %v", err)
@@ -46,8 +47,8 @@
// Both measured and admin client have full permissions on the syncgroup.
sgAcl := access.Permissions{}
- sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.AllTypicalTags()...)
- sbutil.AddPermsForPattern(&sgAcl, admin, access.AllTypicalTags()...)
+ sbutil.AddPermsForPrincipal(&sgAcl, v23.GetPrincipal(ctx), access.Read, access.Admin)
+ sbutil.AddPermsForPattern(&sgAcl, admin, access.Read, access.Admin)
// Maps all syncgroup prefixes to ACLs.
prefixSpec := make(map[nosql_wire.TableRow]access.Permissions)
@@ -60,7 +61,7 @@
}
aclStreamDef := access.Permissions{}
sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
- sbutil.AddPermsForPattern(&aclStreamDef, admin, access.AllTypicalTags()...)
+ sbutil.AddPermsForPattern(&aclStreamDef, admin, access.Resolve, access.Read, access.Write, access.Admin)
prefixSpec[prefixStreamDef] = aclStreamDef
// DataPoint : <devId>
@@ -71,7 +72,7 @@
}
aclDataPoint := access.Permissions{}
sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
- sbutil.AddPermsForPattern(&aclDataPoint, admin, access.AllTypicalTags()...)
+ sbutil.AddPermsForPattern(&aclDataPoint, admin, access.Resolve, access.Read, access.Write, access.Admin)
prefixSpec[prefixDataPoint] = aclDataPoint
var prefixes []nosql_wire.TableRow
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
index 6e5f1a2..1522c73 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup_test.go
@@ -5,10 +5,10 @@
package measure_test
import (
- "bytes"
"reflect"
"testing"
+ "v.io/v23/security"
"v.io/v23/security/access"
_ "v.io/x/ref/runtime/factories/generic"
sbtu "v.io/x/ref/services/syncbase/testutil"
@@ -50,16 +50,9 @@
}
// measured should have dropped privileges on <StreamDefTable>/<devId>.
- expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
- "Admin":{"In":["root:two"]},
- "Read":{"In":["root:two", "root:one"]},
- "Write":{"In":["root:two"]},
- "Debug":{"In":["root:two"]},
- "Resolve":{"In":["root:two", "root:one"]}
- }`))
- if err != nil {
- t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
- }
+ expectPerms := access.Permissions{}.
+ Add(security.BlessingPattern("root:one"), string(access.Resolve), string(access.Read)).
+ Add(security.BlessingPattern("root:two"), string(access.Resolve), string(access.Read), string(access.Write), string(access.Admin))
sgDataTable := db.Table(sbmodel.KStreamDef{}.Table())
if gotPerms, err := sgDataTable.GetPrefixPermissions(ctxMeasured, devId); err != nil {
t.Errorf("GetPrefixPermissions failed: %v", err)
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index 3340308..3518d91 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -10,8 +10,6 @@
"fmt"
"v.io/v23/context"
- nosql_wire "v.io/v23/services/syncbase/nosql"
- "v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
@@ -21,8 +19,9 @@
// StreamDef. Thread safety is not required.
type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
-// WatchForStreams watches Syncbase for new and modified stream definitions
-// for the specified measuring device and calls the register callback.
+// WatchForStreams watches Syncbase for existing, new, and modified stream
+// definitions for the specified measuring device and calls the register
+// callback.
// If a malformed StreamDef key or value is encountered, or register returns
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
@@ -30,45 +29,9 @@
func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
tableName := sbmodel.KStreamDef{}.Table()
watchPrefix := keyutil.Join(devId, "")
- var resMark watch.ResumeMarker
-
- // BeginBatch scoped using function with deferred Abort.
- if err := func() error {
- bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
- if err != nil {
- return err
- }
- defer bdb.Abort(ctx)
-
- resMark, err = bdb.GetResumeMarker(ctx)
- if err != nil {
- return err
- }
-
- // Register samplers for all existing StreamDefs.
- sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
- defer sstr.Cancel()
-
- for sstr.Advance() {
- key := &sbmodel.KStreamDef{}
- if err := key.Parse(sstr.Key()); err != nil {
- return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
- }
- val := &sbmodel.VStreamDef{}
- if err := sstr.Value(val); err != nil {
- return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
- }
- if err := register(key, val); err != nil {
- return err
- }
- }
- return sstr.Err()
- }(); err != nil {
- return err
- }
// Watch for StreamDef changes and register samplers as needed.
- ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
+ ws, err := db.Watch(ctx, tableName, watchPrefix, nil)
if err != nil {
return err
}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index 3e0eedd..ff824b1 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -24,13 +24,13 @@
// Allow everyone to resolve to allow joining syncgroups.
AddPermsForPattern(&aclFull, string(security.AllPrincipals), access.Resolve)
// Restrict other permissions to self.
- AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin, access.Debug)
+ AddPermsForPrincipal(&aclFull, v23.GetPrincipal(ctx), access.Read, access.Write, access.Admin)
aclReadOnly := access.Permissions{}
// Allow everyone to resolve to allow joining syncgroups.
AddPermsForPattern(&aclReadOnly, string(security.AllPrincipals), access.Resolve)
// Restrict other permissions to self, except Write.
- AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin, access.Debug)
+ AddPermsForPrincipal(&aclReadOnly, v23.GetPrincipal(ctx), access.Read, access.Admin)
sbs := syncbase.NewService(sbService)
app := sbs.App(config.AppName)
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
index e57a228..eb41163 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -5,10 +5,10 @@
package sbutil_test
import (
- "bytes"
"reflect"
"testing"
+ "v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/verror"
_ "v.io/x/ref/runtime/factories/generic"
@@ -54,16 +54,9 @@
t.Errorf("CreateOrOpenDB should have succeeded, got error: %v", err)
}
// Expect db permissions with full access for owner, resolve only for others.
- expectPerms, err := access.ReadPermissions(bytes.NewBufferString(`{
- "Admin":{"In":["root:one"]},
- "Read":{"In":["root:one"]},
- "Write":{"In":["root:one"]},
- "Debug":{"In":["root:one"]},
- "Resolve":{"In":["..."]}
- }`))
- if err != nil {
- t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
- }
+ expectPerms := access.Permissions{}.
+ Add(security.AllPrincipals, string(access.Resolve)).
+ Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read), string(access.Write))
if perms, _, err := dbOwner.GetPermissions(ctxOwner); err != nil {
t.Errorf("GetPermissions should have succeeded, got error: %v", err)
} else if got, want := perms.Normalize(), expectPerms.Normalize(); !reflect.DeepEqual(got, want) {
@@ -88,25 +81,12 @@
t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
}
- expectPermsFull, err := access.ReadPermissions(bytes.NewBufferString(`{
- "Admin":{"In":["root:one"]},
- "Read":{"In":["root:one"]},
- "Write":{"In":["root:one"]},
- "Debug":{"In":["root:one"]},
- "Resolve":{"In":["..."]}
- }`))
- if err != nil {
- t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
- }
- expectPermsReadOnly, err := access.ReadPermissions(bytes.NewBufferString(`{
- "Admin":{"In":["root:one"]},
- "Read":{"In":["root:one"]},
- "Debug":{"In":["root:one"]},
- "Resolve":{"In":["..."]}
- }`))
- if err != nil {
- t.Fatalf("ReadPermissions should have succeeded, got error: %v", err)
- }
+ expectPermsFull := access.Permissions{}.
+ Add(security.AllPrincipals, string(access.Resolve)).
+ Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read), string(access.Write))
+ expectPermsReadOnly := access.Permissions{}.
+ Add(security.AllPrincipals, string(access.Resolve)).
+ Add(security.BlessingPattern("root:one"), string(access.Admin), string(access.Read))
// Check that all tables have correct permissions (full or readonly).
for _, ts := range sbmodel.MeasuredTables {
diff --git a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
index 96f9f0b..62af0c7 100644
--- a/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
+++ b/go/src/v.io/x/sensorlog/scripts/runner_lib.sh
@@ -80,7 +80,6 @@
\"Admin\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
\"Read\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
\"Write\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
-\"Debug\":{\"In\":[\"${DEF_BLESSING_RUNNER}\"]}, \
\"Resolve\":{\"In\":[\"...\"]} \
}"
"${JIRI_ROOT}"/release/go/bin/vbecome -duration="${TIMEOUT}" \