server: implement prefix ACLs

This change implements prefix ACLs as described in the design doc.
It also removes the rpc.ServerCall argument from some functions that
don't use it at all.

Change-Id: I791d21575c6a5b5f93f9fc2d8afd63e8f4e8778d
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
 			return err
 		}
 		// Check for "database already exists".
-		if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -172,7 +172,7 @@
 		info := &dbInfo{
 			Name: dbName,
 		}
-		return a.putDbInfo(ctx, call, st, dbName, info)
+		return a.putDbInfo(ctx, st, dbName, info)
 	}); err != nil {
 		return err
 	}
@@ -192,7 +192,7 @@
 
 	// 3. Flip dbInfo.Initialized to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Initialized = true
 			return nil
 		})
@@ -234,7 +234,7 @@
 
 	// 2. Flip dbInfo.Deleted to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Deleted = true
 			return nil
 		})
@@ -251,7 +251,7 @@
 	}
 
 	// 4. Delete dbInfo record.
-	if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+	if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
 		return err
 	}
 
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
-	"v.io/v23/rpc"
 )
 
 type dbInfoLayer struct {
@@ -49,9 +48,9 @@
 
 // getDbInfo reads data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
 	info := &dbInfo{}
-	if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+	if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
 		return nil, err
 	}
 	return info, nil
@@ -59,27 +58,27 @@
 
 // putDbInfo writes data to the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
-	return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+	return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
 }
 
 // delDbInfo deletes data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
-	return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+	return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
 }
 
 // updateDbInfo performs a read-modify-write.
 // fn should "modify" v, and should return a VDL-compatible error.
 // Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
 	_ = st.(store.Transaction) // panics on failure, as desired
-	info, err := a.getDbInfo(ctx, call, st, dbName)
+	info, err := a.getDbInfo(ctx, st, dbName)
 	if err != nil {
 		return err
 	}
 	if err := fn(info); err != nil {
 		return err
 	}
-	return a.putDbInfo(ctx, call, st, dbName, info)
+	return a.putDbInfo(ctx, st, dbName, info)
 }
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 0971923..f2839d6 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -103,7 +103,7 @@
 		Name:  d.name,
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, d.st, d, data); err != nil {
+	if err := util.Put(ctx, d.st, d, data); err != nil {
 		return nil, err
 	}
 	return d, nil
@@ -443,6 +443,7 @@
 			// key
 			keyBytes := s.it.Key(nil)
 			parts := util.SplitKeyParts(string(keyBytes))
+			// TODO(rogulenko): Check access for the key.
 			s.currKey = parts[len(parts)-1]
 			// value
 			valueBytes := s.it.Value(nil)
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
 }
 
 // checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
 // Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
 func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return util.Get(ctx, call, st, r.t, &tableData{})
+	return r.t.checkAccess(ctx, call, st, r.key)
 }
 
 // get reads data from the storage engine.
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
 package nosql
 
 import (
+	"strings"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 )
 
 // tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
 			return err
 		}
 		// Check for "table already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -54,7 +57,7 @@
 			Name:  t.name,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, t, data)
+		return util.Put(ctx, st, t, data)
 	})
 }
 
@@ -71,20 +74,30 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all rows in this table.
-		return util.Delete(ctx, call, st, t)
+		return util.Delete(ctx, st, t)
 	})
 }
 
 func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReadWriter) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		key := []byte{}
 		for it.Advance() {
 			key = it.Key(key)
+			// Check perms.
+			parts := util.SplitKeyParts(string(key))
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				// TODO(rogulenko): Revisit this behavior. Probably we should
+				// delete all rows that we have access to.
+				it.Cancel()
+				return err
+			}
+			// Delete the key-value pair.
 			if err := st.Delete(key); err != nil {
 				return verror.New(verror.ErrInternal, ctx, err)
 			}
@@ -107,8 +120,8 @@
 
 func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReader) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
 		key, value := []byte{}, []byte{}
 		for it.Advance() {
 			key, value = it.Key(key), it.Value(value)
+			// Check perms.
 			parts := util.SplitKeyParts(string(key))
-			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				it.Cancel()
+				return err
+			}
+			sender.Send(wire.KeyValue{Key: externalKey, Value: value})
 		}
 		if err := it.Err(); err != nil {
 			return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
 	return impl(st)
 }
 
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
-	if prefix != "" {
-		return verror.NewErrNotImplemented(ctx)
-	}
-	impl := func(st store.StoreReadWriter) error {
-		data := &tableData{}
-		return util.Update(ctx, call, st, t, data, func() error {
-			data.Perms = perms
-			return nil
-		})
-	}
-	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
-			return err
-		} else {
-			return impl(st)
-		}
-	} else {
-		return store.RunInTransaction(t.d.st, impl)
-	}
-}
-
 func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
-	if key != "" {
-		return nil, verror.NewErrNotImplemented(ctx)
-	}
 	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
-		data := &tableData{}
-		if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+		// Check permissions only at table level.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return nil, err
 		}
-		return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+		// Get the most specific permissions object.
+		prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+		if err != nil {
+			return nil, err
+		}
+		result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+		// Collect all parent permissions objects all the way up to the table level.
+		for prefix != "" {
+			prefix = prefixPerms.Parent
+			if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+				return nil, err
+			}
+			result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+		}
+		return result, nil
 	}
 	var st store.StoreReader
 	if t.d.batchId != nil {
@@ -179,17 +187,114 @@
 	return impl(st)
 }
 
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		if prefix == "" {
+			data := &tableData{}
+			return util.Update(ctx, call, st, t, data, func() error {
+				data.Perms = perms
+				return nil
+			})
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		// In case there is no permissions object for the given prefix, we need
+		// to add a new node to the prefix permissions tree. We do it by updating
+		// parents for all children of the prefix to the node corresponding to
+		// the prefix.
+		if parent != prefix {
+			if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+				return err
+			}
+		} else {
+			parent = prefixPerms.Parent
+		}
+		stPrefix := t.prefixPermsKey(prefix)
+		stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+		prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+		// Put the (prefix, perms) pair to the database.
+		if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+			return err
+		}
+		return util.PutObject(st, stPrefixLimit, prefixPerms)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
+}
+
 func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
-	return verror.NewErrNotImplemented(ctx)
+	if prefix == "" {
+		return verror.New(verror.ErrBadArg, ctx, prefix)
+	}
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		if parent != prefix {
+			// This can happen only if there is no permissions object for the
+			// given prefix. Since DeletePermissions is idempotent, return nil.
+			return nil
+		}
+		// We need to delete the node corresponding to the prefix from the prefix
+		// permissions tree. We do it by updating parents for all children of the
+		// prefix to the parent of the node corresponding to the prefix.
+		if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+			return err
+		}
+		stPrefix := []byte(t.prefixPermsKey(prefix))
+		stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+		if err := st.Delete(stPrefix); err != nil {
+			return err
+		}
+		return st.Delete(stPrefixLimit)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
 func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
 		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			closeStoreReader()
 			return nil, err
 		}
+		// TODO(rogulenko): Check prefix permissions for children.
 		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
 	var st store.StoreReader
@@ -226,3 +331,145 @@
 func (t *tableReq) stKeyPart() string {
 	return t.name
 }
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+	stPrefix := []byte(t.prefixPermsKey(prefix))
+	stPrefixStart := append(stPrefix, 0)
+	stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+	it := st.Scan(stPrefixStart, stPrefixLimit)
+	var key, value []byte
+	for it.Advance() {
+		key, value = it.Key(key), it.Value(value)
+		var prefixPerms stPrefixPerms
+		if err := vom.Decode(value, &prefixPerms); err != nil {
+			it.Cancel()
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		prefixPerms.Parent = newParent
+		if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+			it.Cancel()
+			return err
+		}
+	}
+	if err := it.Err(); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+	var data tableData
+	if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+		return err
+	}
+	return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+	prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+	if err != nil {
+		return err
+	}
+	if prefix != "" {
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			return err
+		}
+	}
+	auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+	if err := auth.Authorize(ctx, call.Security()); err != nil {
+		return verror.New(verror.ErrNoAccess, ctx, prefix)
+	}
+	return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1	iter = db.Scan(K, "")
+// 		Here last character of iter.Key() is removed automatically if it is '~'
+// 2	if hasPrefix(K, iter.Key()) return iter.Value()
+// 3	return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+//   contradicts with property of returned iterator on line 1 => t is prefix of
+//   K; also t is the largest prefix of K, as all larger prefixes of K are
+//   less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+//   proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+//   K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+//   prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+	it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+	if !it.Advance() {
+		prefixPerms, err := t.permsForPrefix(ctx, st, "")
+		return "", prefixPerms, err
+	}
+	defer it.Cancel()
+	parts := util.SplitKeyParts(string(it.Key(nil)))
+	prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+	value := it.Value(nil)
+	var prefixPerms stPrefixPerms
+	if err := vom.Decode(value, &prefixPerms); err != nil {
+		return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	if strings.HasPrefix(key, prefix) {
+		return prefix, prefixPerms, nil
+	}
+	prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+	return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+	if prefix == "" {
+		var data tableData
+		if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+			return stPrefixPerms{}, err
+		}
+		return stPrefixPerms{Perms: data.Perms}, nil
+	}
+	var prefixPerms stPrefixPerms
+	if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+		return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+	return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/services/syncbase/server/nosql/types.vdl b/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/services/syncbase/server/nosql/types.vdl
+++ b/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
 	Name  string
 	Perms access.Permissions
 }
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
diff --git a/services/syncbase/server/nosql/types.vdl.go b/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/services/syncbase/server/nosql/types.vdl.go
+++ b/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
 }) {
 }
 
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
 func init() {
 	vdl.Register((*databaseData)(nil))
 	vdl.Register((*tableData)(nil))
+	vdl.Register((*stPrefixPerms)(nil))
 }
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
 	data := &serviceData{
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, s.st, s, data); err != nil {
+	if err := util.Put(ctx, s.st, s, data); err != nil {
 		return nil, err
 	}
 	if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
 			return err
 		}
 		// Check for "app already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -192,7 +192,7 @@
 			Name:  appName,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, a, data)
+		return util.Put(ctx, st, a, data)
 	}); err != nil {
 		return err
 	}
@@ -218,7 +218,7 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all databases in this app.
-		return util.Delete(ctx, call, st, a)
+		return util.Delete(ctx, st, a)
 	}); err != nil {
 		return err
 	}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
 	DatabasePrefix = "$database"
 	DbInfoPrefix   = "$dbInfo"
 	LogPrefix      = "$log"
+	PermsPrefix    = "$perms"
 	RowPrefix      = "$row"
 	ServicePrefix  = "$service"
 	SyncPrefix     = "$sync"
@@ -27,4 +28,8 @@
 	BatchSep = ":"
 	// Separator for parts of storage engine keys.
 	KeyPartSep = ":"
+	// PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+	// a prefix range. Should be more than any regular key in the store.
+	// TODO(rogulenko): Change this constant to something out of the UTF8 space.
+	PrefixRangeLimitSuffix = "~"
 )
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
 
 // GetWithoutAuth does st.Get(l.StKey(), v), populating v.
 // Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
 	if err := GetObject(st, l.StKey(), v); err != nil {
 		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
 // Get does GetWithoutAuth followed by an auth check.
 // Returns a VDL-compatible error.
 func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
-	if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+	if err := GetWithoutAuth(ctx, st, l, v); err != nil {
 		return err
 	}
 	auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
 // Put does st.Put(l.StKey(), v).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
 	if err := PutObject(st, l.StKey(), v); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -83,7 +83,7 @@
 // Delete does st.Delete(l.StKey()).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
 	if err := st.Delete([]byte(l.StKey())); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -103,7 +103,7 @@
 	if err := fn(); err != nil {
 		return err
 	}
-	return Put(ctx, call, st, l, v)
+	return Put(ctx, st, l, v)
 }
 
 ////////////////////////////////////////////////////////////