syncbase: Expose Collection ACLs in Watch.
Implement API from v.io/c/23042 .
Collection creations and permissions changes are propagated to the
client with appropriate CollectionInfo, as well as collection destroys.
Collection updates were hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.
Also factored out collection perms key parsing.
MultiPart: 2/2
Change-Id: I83499fa5b9e24f6a2665db5a18ffe676830620f1
diff --git a/services/syncbase/common/key_util.go b/services/syncbase/common/key_util.go
index 45038f3..976052e 100644
--- a/services/syncbase/common/key_util.go
+++ b/services/syncbase/common/key_util.go
@@ -73,11 +73,29 @@
func ParseRowKeyOrDie(key string) (collection wire.Id, row string) {
collection, row, err := ParseRowKey(key)
if err != nil {
+ // TODO(ivanpi): Get rid of *OrDie() functions. Always log internal errors.
vlog.Fatal(err)
}
return collection, row
}
+// ParseCollectionPermsKey extracts the collection id from the given storage
+// engine key for a collection perms entry. Returns an error if the given key
+// is not a storage engine key for a collection perms entry.
+func ParseCollectionPermsKey(key string) (collection wire.Id, err error) {
+ // TODO(rdaoud,ivanpi): See hack in collection.go.
+ parts := SplitNKeyParts(key, 3)
+ pfx := parts[0]
+ if len(parts) < 3 || pfx != CollectionPermsPrefix || parts[2] != "" {
+ return wire.Id{}, fmt.Errorf("ParseCollectionPermsKey: invalid key %q", key)
+ }
+ cxId, err := util.DecodeId(parts[1])
+ if err != nil {
+ return wire.Id{}, fmt.Errorf("ParseCollectionPermsKey: invalid collection %q: %v", parts[1], err)
+ }
+ return cxId, nil
+}
+
// ScanPrefixArgs returns args for sn.Scan() for the specified prefix.
func ScanPrefixArgs(stKeyPrefix, prefix string) ([]byte, []byte) {
return ScanRangeArgs(stKeyPrefix, util.PrefixRangeStart(prefix), util.PrefixRangeLimit(prefix))
diff --git a/services/syncbase/common/key_util_test.go b/services/syncbase/common/key_util_test.go
index 3b2e45c..4c61d7e 100644
--- a/services/syncbase/common/key_util_test.go
+++ b/services/syncbase/common/key_util_test.go
@@ -162,6 +162,36 @@
}
}
+func TestParseCollectionPermsKey(t *testing.T) {
+ tests := []struct {
+ key string
+ collection wire.Id
+ err bool
+ }{
+ {common.CollectionPermsPrefix + "\xfeu,c\xfe", wire.Id{"u", "c"}, false},
+ {common.CollectionPermsPrefix + "\xfeu,\xfe", wire.Id{"u", ""}, false},
+ {common.CollectionPermsPrefix + "\xfe,c\xfe", wire.Id{"", "c"}, false},
+ {common.CollectionPermsPrefix + "\xfe,\xfe", wire.Id{"", ""}, false},
+ {"\xfeu,c\xfe", wire.Id{}, true},
+ {"pfx\xfeu,c\xfe", wire.Id{}, true},
+ {common.RowPrefix + "\xfeu,c\xfe", wire.Id{}, true},
+ {common.CollectionPermsPrefix + "\xfe", wire.Id{}, true},
+ {common.CollectionPermsPrefix + "\xfe\xfe", wire.Id{}, true},
+ {common.CollectionPermsPrefix + "\xfeu,c\xferow", wire.Id{}, true},
+ {common.CollectionPermsPrefix + "\xfeu,c", wire.Id{}, true},
+ {common.CollectionPermsPrefix + "\xfe", wire.Id{}, true},
+ }
+ for _, test := range tests {
+ collection, err := common.ParseCollectionPermsKey(test.key)
+ if !reflect.DeepEqual(collection, test.collection) {
+ t.Errorf("%q: got %v, want %v", test.key, collection, test.collection)
+ }
+ if !reflect.DeepEqual(err != nil, test.err) {
+ t.Errorf("%q: got %v, want %v", test.key, err != nil, test.err)
+ }
+ }
+}
+
func TestScanPrefixArgs(t *testing.T) {
tests := []struct {
stKeyPrefix, prefix, wantStart, wantLimit string
diff --git a/services/syncbase/server/database.go b/services/syncbase/server/database.go
index 8211194..eb7de69 100644
--- a/services/syncbase/server/database.go
+++ b/services/syncbase/server/database.go
@@ -460,8 +460,7 @@
keyBytes := []byte{}
for it.Advance() {
keyBytes = it.Key(keyBytes)
- parts := common.SplitNKeyParts(string(keyBytes), 3)
- id, err := pubutil.DecodeId(parts[1])
+ id, err := common.ParseCollectionPermsKey(string(keyBytes))
if err != nil {
it.Cancel()
return verror.New(verror.ErrInternal, ctx, err)
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index 991bfd1..0d8a6cd 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -10,6 +10,7 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
+ "v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/services/watch"
pubutil "v.io/v23/syncbase/util"
@@ -17,6 +18,7 @@
"v.io/v23/vom"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/server/filter"
+ "v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/store/watchable"
)
@@ -110,6 +112,8 @@
// scanInitialState sends the initial state of all matching and accessible
// collections and rows in the database. Checks access on collections, but
// not on database.
+// TODO(ivanpi): Assumes Read perms on database. Careful if supporting RPCs
+// requiring only Resolve (e.g. WatchGlob).
// TODO(ivanpi): Abstract out multi-scan for scan and possibly query support.
// TODO(ivanpi): Use watch pattern prefixes to optimize scan ranges.
func (d *database) scanInitialState(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, sntx store.SnapshotOrTransaction, watchFilter filter.CollectionRowFilter) error {
@@ -117,13 +121,12 @@
// TODO(ivanpi): Collection scan order not alphabetical.
cxIt := sntx.Scan(common.ScanPrefixArgs(common.CollectionPermsPrefix, ""))
defer cxIt.Cancel()
- cxKey := []byte{}
+ cxKey, cxPermsValue := []byte{}, []byte{}
for cxIt.Advance() {
- cxKey = cxIt.Key(cxKey)
- // See comment in util/constants.go for why we use SplitNKeyParts.
- // TODO(rdaoud,ivanpi): See hack in collection.go.
- cxParts := common.SplitNKeyParts(string(cxKey), 3)
- cxId, err := pubutil.DecodeId(cxParts[1])
+ cxKey, cxPermsValue = cxIt.Key(cxKey), cxIt.Value(cxPermsValue)
+ // Database permissions for Watch ensure that the user is always allowed
+ // to know that a collection exists.
+ cxId, err := common.ParseCollectionPermsKey(string(cxKey))
if err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -131,21 +134,43 @@
if !watchFilter.CollectionMatches(cxId) {
continue
}
+ // Send collection info.
+ var cxPerms interfaces.CollectionPerms
+ if err := vom.Decode(cxPermsValue, &cxPerms); err != nil {
+ return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+ }
+ cxInfo := collectionInfoFromPerms(ctx, call, cxPerms.GetPerms())
+ cxInfoAsRawBytes, err := vom.RawBytesFromValue(cxInfo)
+ if err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ if err := sender.addChange(
+ pubutil.EncodeId(cxId),
+ watch.Exists,
+ &wire.StoreChange{
+ Value: cxInfoAsRawBytes,
+ // Note: FromSync cannot be reconstructed from scan.
+ FromSync: false,
+ }); err != nil {
+ return err
+ }
+ // Check permissions for row access.
+ // TODO(ivanpi): Collection scan already gets perms, optimize?
c := &collectionReq{
id: cxId,
d: d,
}
- // Check permissions for row access.
- // TODO(ivanpi): Collection scan already gets perms, optimize?
if _, err := c.checkAccess(ctx, call, sntx); err != nil {
if verror.ErrorID(err) == verror.ErrNoAccess.ID {
- // TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
- // this to caller.
+ // Skip sending rows if the collection is inaccessible. Caller can see
+ // from collection info that they have no read access and may therefore
+ // have missing rows.
+ // TODO(ivanpi): If read access is regained, should skipped rows be sent
+ // retroactively?
continue
}
return err
}
- // TODO(ivanpi): Send collection info.
// Send matching rows.
if err := c.scanInitialState(ctx, call, sender, sntx, watchFilter); err != nil {
return err
@@ -246,6 +271,8 @@
// Note: Since the governing ACL for each change is no longer tracked, the
// permissions check uses the ACLs in effect at the time processLogBatch is
// called.
+// TODO(ivanpi): Assumes Read perms on database. Careful if supporting RPCs
+// requiring only Resolve (e.g. WatchGlob).
func (d *database) processLogBatch(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, watchFilter filter.CollectionRowFilter, logs []*watchable.LogEntry) error {
sn := d.st.NewSnapshot()
defer sn.Abort()
@@ -266,73 +293,145 @@
default:
continue
}
- // TODO(rogulenko): Currently we only process rows, i.e. keys of the form
- // <RowPrefix>:xxx:yyy. Consider processing other keys.
- if !common.IsRowKey(opKey) {
- continue
- }
- cxId, row, err := common.ParseRowKey(opKey)
- if err != nil {
- return verror.NewErrInternal(ctx) // no detailed error before access check
- }
- // Filter out unnecessary rows.
- if !watchFilter.RowMatches(cxId, row) {
- continue
- }
- c := &collectionReq{
- id: cxId,
- d: d,
- }
- // Filter out rows that we can't access.
- // TODO(ivanpi): Check only once per collection per batch.
- if _, err := c.checkAccess(ctx, call, sn); err != nil {
- if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
- // Note, the collection may not exist anymore, in which case permissions
- // cannot be retrieved. This case is treated the same as ErrNoAccess, by
- // skipping the row.
- // TODO(ivanpi): Consider using the implicit ACL instead for nonexistent
- // collections.
- // TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
- // this to caller.
+ // TODO(rogulenko,ivanpi): Currently we only process rows and collection
+ // perms. Consider making watchable and processing other keys.
+ switch common.FirstKeyPart(opKey) {
+ case common.RowPrefix:
+ cxId, row, err := common.ParseRowKey(opKey)
+ if err != nil {
+ return verror.NewErrInternal(ctx) // no detailed error before access check
+ }
+ // Filter out unnecessary rows.
+ if !watchFilter.RowMatches(cxId, row) {
continue
}
- return err
- }
- switch op := op.(type) {
- case *watchable.PutOp:
- // Note, valueBytes is reused on each iteration, so the reference must not
- // be used beyond this case block. The code below is safe since only the
- // VOM-decoded copy is used after the call to vom.Decode.
- if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
+ // Filter out rows that we can't access.
+ // TODO(ivanpi): Check only once per collection per batch.
+ c := &collectionReq{
+ id: cxId,
+ d: d,
}
- var rowValueAsRawBytes *vom.RawBytes
- if err := vom.Decode(valueBytes, &rowValueAsRawBytes); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- if err := sender.addChange(
- naming.Join(pubutil.EncodeId(cxId), row),
- watch.Exists,
- &wire.StoreChange{
- Value: rowValueAsRawBytes,
- FromSync: logEntry.FromSync,
- }); err != nil {
+ if _, err := c.checkAccess(ctx, call, sn); err != nil {
+ if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
+ // Skip sending rows if the collection is inaccessible. Caller can see
+ // from collection info that they have no read access and may therefore
+ // have missing rows.
+ // Note, the collection may not exist anymore, in which case permissions
+ // cannot be retrieved. This case is treated the same as ErrNoAccess, by
+ // skipping the row.
+ // TODO(ivanpi): Consider using the implicit ACL instead for nonexistent
+ // collections.
+ // TODO(ivanpi): If read access is regained, should skipped rows be sent
+ // retroactively?
+ continue
+ }
return err
}
- case *watchable.DeleteOp:
- if err := sender.addChange(
- naming.Join(pubutil.EncodeId(cxId), row),
- watch.DoesNotExist,
- &wire.StoreChange{
- FromSync: logEntry.FromSync,
- }); err != nil {
- return err
+ switch op := op.(type) {
+ case *watchable.PutOp:
+ // Note, valueBytes is reused on each iteration, so the reference must not
+ // be used beyond this case block. The code below is safe since only the
+ // VOM-decoded copy is used after the call to vom.Decode.
+ if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ var rowValueAsRawBytes *vom.RawBytes
+ if err := vom.Decode(valueBytes, &rowValueAsRawBytes); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ if err := sender.addChange(
+ naming.Join(pubutil.EncodeId(cxId), row),
+ watch.Exists,
+ &wire.StoreChange{
+ Value: rowValueAsRawBytes,
+ FromSync: logEntry.FromSync,
+ }); err != nil {
+ return err
+ }
+ case *watchable.DeleteOp:
+ if err := sender.addChange(
+ naming.Join(pubutil.EncodeId(cxId), row),
+ watch.DoesNotExist,
+ &wire.StoreChange{
+ FromSync: logEntry.FromSync,
+ }); err != nil {
+ return err
+ }
}
+
+ case common.CollectionPermsPrefix:
+ // Database permissions for Watch ensure that the user is always allowed
+ // to know that a collection exists.
+ cxId, err := common.ParseCollectionPermsKey(opKey)
+ if err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ // Filter out unnecessary collections.
+ if !watchFilter.CollectionMatches(cxId) {
+ continue
+ }
+ switch op := op.(type) {
+ case *watchable.PutOp:
+ if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
+ return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+ }
+ var cxPerms interfaces.CollectionPerms
+ if err := vom.Decode(valueBytes, &cxPerms); err != nil {
+ return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+ }
+ cxInfo := collectionInfoFromPerms(ctx, call, cxPerms.GetPerms())
+ cxInfoAsRawBytes, err := vom.RawBytesFromValue(cxInfo)
+ if err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ if err := sender.addChange(
+ pubutil.EncodeId(cxId),
+ watch.Exists,
+ &wire.StoreChange{
+ Value: cxInfoAsRawBytes,
+ FromSync: logEntry.FromSync,
+ }); err != nil {
+ return err
+ }
+ case *watchable.DeleteOp:
+ if err := sender.addChange(
+ pubutil.EncodeId(cxId),
+ watch.DoesNotExist,
+ &wire.StoreChange{
+ FromSync: logEntry.FromSync,
+ }); err != nil {
+ return err
+ }
+ }
+
+ default:
+ continue
}
}
return nil
}
+// collectionInfoFromPerms converts a collection permissions object into a
+// StoreChangeCollectionInfo tailored to the caller. The returned collection
+// info is safe to send to the caller, assuming the caller is allowed to know
+// the collection exists. It includes a set listing all access tags that the
+// caller has on the collection. The collection permissions object itself is
+// included only if the caller is allowed to see it (has Admin permissions).
+func collectionInfoFromPerms(ctx *context.T, call rpc.ServerCall, cxPerms access.Permissions) *wire.StoreChangeCollectionInfo {
+ ci := &wire.StoreChangeCollectionInfo{
+ Allowed: make(map[access.Tag]struct{}),
+ }
+ for tag, acl := range cxPerms {
+ if acl.Authorize(ctx, call.Security()) == nil {
+ ci.Allowed[access.Tag(tag)] = struct{}{}
+ }
+ }
+ if _, isAdmin := ci.Allowed[access.Admin]; isAdmin {
+ ci.Perms = cxPerms
+ }
+ return ci
+}
+
// watchBatchSender sends a sequence of watch changes forming a batch, delaying
// sends to allow setting the Continued flag on the last change.
type watchBatchSender struct {
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 875f97b..22f9a6f 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -197,10 +197,11 @@
}
// A WatchChangeTest is a syncbase.WatchChange that has a public ValueBytes
-// field, to allow tests to set it.
+// and CollectionInfo field, to allow tests to set them.
type WatchChangeTest struct {
syncbase.WatchChange
- ValueBytes *vom.RawBytes
+ ValueBytes *vom.RawBytes
+ CollectionInfo *wire.StoreChangeCollectionInfo
}
func WatchChangeTestRootPut(resumeMarker watch.ResumeMarker) WatchChangeTest {
@@ -214,6 +215,42 @@
}
}
+func WatchChangeTestCollectionPut(cxId wire.Id, allowedTags []access.Tag, perms access.Permissions, resumeMarker watch.ResumeMarker) WatchChangeTest {
+ allowedTagsSet := make(map[access.Tag]struct{})
+ if len(allowedTags) == 0 {
+ allowedTagsSet = nil
+ } else {
+ for _, tag := range allowedTags {
+ allowedTagsSet[tag] = struct{}{}
+ }
+ }
+ return WatchChangeTest{
+ WatchChange: syncbase.WatchChange{
+ EntityType: syncbase.EntityCollection,
+ Collection: cxId,
+ ChangeType: syncbase.PutChange,
+ ResumeMarker: resumeMarker,
+ Continued: (resumeMarker == nil),
+ },
+ CollectionInfo: &wire.StoreChangeCollectionInfo{
+ Allowed: allowedTagsSet,
+ Perms: perms,
+ },
+ }
+}
+
+func WatchChangeTestCollectionDelete(cxId wire.Id, resumeMarker watch.ResumeMarker) WatchChangeTest {
+ return WatchChangeTest{
+ WatchChange: syncbase.WatchChange{
+ EntityType: syncbase.EntityCollection,
+ Collection: cxId,
+ ChangeType: syncbase.DeleteChange,
+ ResumeMarker: resumeMarker,
+ Continued: (resumeMarker == nil),
+ },
+ }
+}
+
func WatchChangeTestRowPut(cxId wire.Id, rowKey string, value interface{}, resumeMarker watch.ResumeMarker) WatchChangeTest {
return WatchChangeTest{
WatchChange: syncbase.WatchChange{
@@ -262,6 +299,8 @@
wantErr := want.ValueBytes.ToValue(&wantValue)
eq = ((gotErr == nil) == (wantErr == nil)) &&
reflect.DeepEqual(gotValue, wantValue)
+ case syncbase.EntityCollection:
+ eq = reflect.DeepEqual(got.CollectionInfo(), want.CollectionInfo)
default:
eq = true
}
@@ -278,7 +317,11 @@
Fatalf(t, "wstream.Advance() reached the end: %v", wstream.Err())
}
if got := wstream.Change(); !WatchChangeEq(&got, &want) {
- Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
+ if got.ChangeType == syncbase.PutChange && got.EntityType == syncbase.EntityCollection {
+ Fatalf(t, "unexpected watch change: got %+v, want %+v (cx info: got %+v, want %+v)", got, want, got.CollectionInfo(), want.CollectionInfo)
+ } else {
+ Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
+ }
}
}
}