server: implement prefix ACLs
This change implements prefix ACLs as described in the design doc.
It also removes the rpc.ServerCall argument from some functions that
don't use it at all.
Change-Id: I791d21575c6a5b5f93f9fc2d8afd63e8f4e8778d
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
return err
}
// Check for "database already exists".
- if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -172,7 +172,7 @@
info := &dbInfo{
Name: dbName,
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}); err != nil {
return err
}
@@ -192,7 +192,7 @@
// 3. Flip dbInfo.Initialized to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Initialized = true
return nil
})
@@ -234,7 +234,7 @@
// 2. Flip dbInfo.Deleted to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Deleted = true
return nil
})
@@ -251,7 +251,7 @@
}
// 4. Delete dbInfo record.
- if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+ if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
return err
}
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
- "v.io/v23/rpc"
)
type dbInfoLayer struct {
@@ -49,9 +48,9 @@
// getDbInfo reads data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+ if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
return nil, err
}
return info, nil
@@ -59,27 +58,27 @@
// putDbInfo writes data to the storage engine.
// Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+ return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
}
// delDbInfo deletes data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+ return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
}
// updateDbInfo performs a read-modify-write.
// fn should "modify" v, and should return a VDL-compatible error.
// Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
_ = st.(store.Transaction) // panics on failure, as desired
- info, err := a.getDbInfo(ctx, call, st, dbName)
+ info, err := a.getDbInfo(ctx, st, dbName)
if err != nil {
return err
}
if err := fn(info); err != nil {
return err
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 0971923..f2839d6 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -103,7 +103,7 @@
Name: d.name,
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, d.st, d, data); err != nil {
+ if err := util.Put(ctx, d.st, d, data); err != nil {
return nil, err
}
return d, nil
@@ -443,6 +443,7 @@
// key
keyBytes := s.it.Key(nil)
parts := util.SplitKeyParts(string(keyBytes))
+ // TODO(rogulenko): Check access for the key.
s.currKey = parts[len(parts)-1]
// value
valueBytes := s.it.Value(nil)
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
}
// checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
// Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return util.Get(ctx, call, st, r.t, &tableData{})
+ return r.t.checkAccess(ctx, call, st, r.key)
}
// get reads data from the storage engine.
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
package nosql
import (
+ "strings"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/v23/vom"
)
// tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
return err
}
// Check for "table already exists".
- if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -54,7 +57,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, call, st, t, data)
+ return util.Put(ctx, st, t, data)
})
}
@@ -71,20 +74,30 @@
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, call, st, t)
+ return util.Delete(ctx, st, t)
})
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
impl := func(st store.StoreReadWriter) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
+ // Check perms.
+ parts := util.SplitKeyParts(string(key))
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ // TODO(rogulenko): Revisit this behavior. Probably we should
+ // delete all rows that we have access to.
+ it.Cancel()
+ return err
+ }
+ // Delete the key-value pair.
if err := st.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -107,8 +120,8 @@
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
impl := func(st store.StoreReader) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
key, value := []byte{}, []byte{}
for it.Advance() {
key, value = it.Key(key), it.Value(value)
+ // Check perms.
parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ it.Cancel()
+ return err
+ }
+ sender.Send(wire.KeyValue{Key: externalKey, Value: value})
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
return impl(st)
}
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
- if prefix != "" {
- return verror.NewErrNotImplemented(ctx)
- }
- impl := func(st store.StoreReadWriter) error {
- data := &tableData{}
- return util.Update(ctx, call, st, t, data, func() error {
- data.Perms = perms
- return nil
- })
- }
- if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
- return err
- } else {
- return impl(st)
- }
- } else {
- return store.RunInTransaction(t.d.st, impl)
- }
-}
-
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
- if key != "" {
- return nil, verror.NewErrNotImplemented(ctx)
- }
impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
- data := &tableData{}
- if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ // Check permissions only at table level.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return nil, err
}
- return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+ // Get the most specific permissions object.
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return nil, err
+ }
+ result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+ // Collect all parent permissions objects all the way up to the table level.
+ for prefix != "" {
+ prefix = prefixPerms.Parent
+ if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+ return nil, err
+ }
+ result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+ }
+ return result, nil
}
var st store.StoreReader
if t.d.batchId != nil {
@@ -179,17 +187,114 @@
return impl(st)
}
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ if prefix == "" {
+ data := &tableData{}
+ return util.Update(ctx, call, st, t, data, func() error {
+ data.Perms = perms
+ return nil
+ })
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ // In case there is no permissions object for the given prefix, we need
+ // to add a new node to the prefix permissions tree. We do it by updating
+ // parents for all children of the prefix to the node corresponding to
+ // the prefix.
+ if parent != prefix {
+ if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+ return err
+ }
+ } else {
+ parent = prefixPerms.Parent
+ }
+ stPrefix := t.prefixPermsKey(prefix)
+ stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+ prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+ // Put the (prefix, perms) pair to the database.
+ if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+ return err
+ }
+ return util.PutObject(st, stPrefixLimit, prefixPerms)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
+}
+
func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
- return verror.NewErrNotImplemented(ctx)
+ if prefix == "" {
+ return verror.New(verror.ErrBadArg, ctx, prefix)
+ }
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ if parent != prefix {
+ // This can happen only if there is no permissions object for the
+ // given prefix. Since DeletePermissions is idempotent, return nil.
+ return nil
+ }
+ // We need to delete the node corresponding to the prefix from the prefix
+ // permissions tree. We do it by updating parents for all children of the
+ // prefix to the parent of the node corresponding to the prefix.
+ if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+ return err
+ }
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ if err := st.Delete(stPrefix); err != nil {
+ return err
+ }
+ return st.Delete(stPrefixLimit)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
// Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
closeStoreReader()
return nil, err
}
+ // TODO(rogulenko): Check prefix permissions for children.
return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
var st store.StoreReader
@@ -226,3 +331,145 @@
func (t *tableReq) stKeyPart() string {
return t.name
}
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixStart := append(stPrefix, 0)
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ it := st.Scan(stPrefixStart, stPrefixLimit)
+ var key, value []byte
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ it.Cancel()
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ prefixPerms.Parent = newParent
+ if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+ it.Cancel()
+ return err
+ }
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return err
+ }
+ return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return err
+ }
+ if prefix != "" {
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ return err
+ }
+ }
+ auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+ if err := auth.Authorize(ctx, call.Security()); err != nil {
+ return verror.New(verror.ErrNoAccess, ctx, prefix)
+ }
+ return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1 iter = db.Scan(K, "")
+// Here last character of iter.Key() is removed automatically if it is '~'
+// 2 if hasPrefix(K, iter.Key()) return iter.Value()
+// 3 return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+// contradicts with property of returned iterator on line 1 => t is prefix of
+// K; also t is the largest prefix of K, as all larger prefixes of K are
+// less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+// prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+ it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+ if !it.Advance() {
+ prefixPerms, err := t.permsForPrefix(ctx, st, "")
+ return "", prefixPerms, err
+ }
+ defer it.Cancel()
+ parts := util.SplitKeyParts(string(it.Key(nil)))
+ prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+ value := it.Value(nil)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ if strings.HasPrefix(key, prefix) {
+ return prefix, prefixPerms, nil
+ }
+ prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+ return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+ if prefix == "" {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return stPrefixPerms{}, err
+ }
+ return stPrefixPerms{Perms: data.Perms}, nil
+ }
+ var prefixPerms stPrefixPerms
+ if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+ return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/services/syncbase/server/nosql/types.vdl b/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/services/syncbase/server/nosql/types.vdl
+++ b/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
Name string
Perms access.Permissions
}
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
diff --git a/services/syncbase/server/nosql/types.vdl.go b/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/services/syncbase/server/nosql/types.vdl.go
+++ b/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
}) {
}
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
func init() {
vdl.Register((*databaseData)(nil))
vdl.Register((*tableData)(nil))
+ vdl.Register((*stPrefixPerms)(nil))
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
data := &serviceData{
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, s.st, s, data); err != nil {
+ if err := util.Put(ctx, s.st, s, data); err != nil {
return nil, err
}
if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
return err
}
// Check for "app already exists".
- if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -192,7 +192,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, call, st, a, data)
+ return util.Put(ctx, st, a, data)
}); err != nil {
return err
}
@@ -218,7 +218,7 @@
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, call, st, a)
+ return util.Delete(ctx, st, a)
}); err != nil {
return err
}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
DatabasePrefix = "$database"
DbInfoPrefix = "$dbInfo"
LogPrefix = "$log"
+ PermsPrefix = "$perms"
RowPrefix = "$row"
ServicePrefix = "$service"
SyncPrefix = "$sync"
@@ -27,4 +28,8 @@
BatchSep = ":"
// Separator for parts of storage engine keys.
KeyPartSep = ":"
+ // PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+ // a prefix range. Should be more than any regular key in the store.
+ // TODO(rogulenko): Change this constant to something out of the UTF8 space.
+ PrefixRangeLimitSuffix = "~"
)
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
// GetWithoutAuth does st.Get(l.StKey(), v), populating v.
// Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
if err := GetObject(st, l.StKey(), v); err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
// Get does GetWithoutAuth followed by an auth check.
// Returns a VDL-compatible error.
func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
- if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+ if err := GetWithoutAuth(ctx, st, l, v); err != nil {
return err
}
auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
// Put does st.Put(l.StKey(), v).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
if err := PutObject(st, l.StKey(), v); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -83,7 +83,7 @@
// Delete does st.Delete(l.StKey()).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
if err := st.Delete([]byte(l.StKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -103,7 +103,7 @@
if err := fn(); err != nil {
return err
}
- return Put(ctx, call, st, l, v)
+ return Put(ctx, st, l, v)
}
////////////////////////////////////////////////////////////