blob: 558a1c1bcec7843c7e5d9bc0a1d8198d1582a5cd [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package nosql
import (
"strings"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
"v.io/v23/vom"
)
// tableReq is a per-request object that handles Table RPCs.
type tableReq struct {
name string
d *databaseReq
}
var (
_ wire.TableServerMethods = (*tableReq)(nil)
)
////////////////////////////////////////
// RPC methods
func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, schemaVersion int32, perms access.Permissions) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Check databaseData perms.
dData := &databaseData{}
if err := util.GetWithAuth(ctx, call, tx, t.d.stKey(), dData); err != nil {
return err
}
// Check for "table already exists".
if err := util.Get(ctx, tx, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
// TODO(sadovsky): Should this be ErrExistOrNoAccess, for privacy?
return verror.New(verror.ErrExist, ctx, t.name)
}
// Write new tableData.
if perms == nil {
perms = dData.Perms
}
data := &tableData{
Name: t.name,
Perms: perms,
}
return util.Put(ctx, tx, t.stKey(), data)
})
}
func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Read-check-delete tableData.
if err := util.GetWithAuth(ctx, call, tx, t.stKey(), &tableData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all rows in this table.
return util.Delete(ctx, tx, t.stKey())
})
}
func (t *tableReq) Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error) {
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return false, err
}
return util.ErrorToExists(util.GetWithAuth(ctx, call, t.d.st, t.stKey(), &tableData{}))
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
impl := func(tx store.Transaction) error {
// Check for table-level access before doing a scan.
if err := t.checkAccess(ctx, call, tx, ""); err != nil {
return err
}
// Check if the db schema version and the version provided by client
// matches.
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
it := tx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
if err := t.checkAccess(ctx, call, tx, externalKey); err != nil {
// TODO(rogulenko): Revisit this behavior. Probably we should
// delete all rows that we have access to.
it.Cancel()
return err
}
// Delete the key-value pair.
if err := tx.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
if t.d.batchId != nil {
if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
}
}
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, schemaVersion int32, start, limit []byte) error {
impl := func(sntx store.SnapshotOrTransaction) error {
// Check for table-level access before doing a scan.
if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
sender := call.SendStream()
key, value := []byte{}, []byte{}
for it.Advance() {
key, value = it.Key(key), it.Value(value)
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
if err := t.checkAccess(ctx, call, sntx, externalKey); err != nil {
it.Cancel()
return err
}
sender.Send(wire.KeyValue{Key: externalKey, Value: value})
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
if t.d.batchId != nil {
return impl(t.d.batchReader())
} else {
sntx := t.d.st.NewSnapshot()
defer sntx.Abort()
return impl(sntx)
}
}
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, key string) ([]wire.PrefixPermissions, error) {
impl := func(sntx store.SnapshotOrTransaction) ([]wire.PrefixPermissions, error) {
// Check permissions only at table level.
if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return nil, err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return nil, err
}
// Get the most specific permissions object.
prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return nil, err
}
result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
// Collect all parent permissions objects all the way up to the table level.
for prefix != "" {
prefix = prefixPerms.Parent
if prefixPerms, err = t.permsForPrefix(ctx, sntx, prefixPerms.Parent); err != nil {
return nil, err
}
result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
}
return result, nil
}
if t.d.batchId != nil {
return impl(t.d.batchReader())
} else {
sntx := t.d.st.NewSnapshot()
defer sntx.Abort()
return impl(sntx)
}
}
func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string, perms access.Permissions) error {
impl := func(tx store.Transaction) error {
if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
if err := t.lock(ctx, tx); err != nil {
return err
}
if prefix == "" {
data := &tableData{}
return util.UpdateWithAuth(ctx, call, tx, t.stKey(), data, func() error {
data.Perms = perms
return nil
})
}
// Get the most specific permissions object.
parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
// In case there is no permissions object for the given prefix, we need
// to add a new node to the prefix permissions tree. We do it by updating
// parents for all children of the prefix to the node corresponding to
// the prefix.
if parent != prefix {
if err := t.updateParentRefs(ctx, tx, prefix, prefix); err != nil {
return err
}
} else {
parent = prefixPerms.Parent
}
stPrefix := t.prefixPermsKey(prefix)
stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
// Put the (prefix, perms) pair to the database.
if err := util.Put(ctx, tx, stPrefix, prefixPerms); err != nil {
return err
}
return util.Put(ctx, tx, stPrefixLimit, prefixPerms)
}
if t.d.batchId != nil {
if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
}
}
func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string) error {
if prefix == "" {
return verror.New(verror.ErrBadArg, ctx, prefix)
}
impl := func(tx store.Transaction) error {
if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
if err := t.lock(ctx, tx); err != nil {
return err
}
// Get the most specific permissions object.
parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
if parent != prefix {
// This can happen only if there is no permissions object for the
// given prefix. Since DeletePermissions is idempotent, return nil.
return nil
}
// We need to delete the node corresponding to the prefix from the prefix
// permissions tree. We do it by updating parents for all children of the
// prefix to the parent of the node corresponding to the prefix.
if err := t.updateParentRefs(ctx, tx, prefix, prefixPerms.Parent); err != nil {
return err
}
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
if err := tx.Delete(stPrefix); err != nil {
return err
}
return tx.Delete(stPrefixLimit)
}
if t.d.batchId != nil {
if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
}
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) (<-chan string, error) {
// Check perms.
if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
closeSntx()
return nil, err
}
// TODO(rogulenko): Check prefix permissions for children.
return util.Glob(ctx, call, "*", sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
}
if t.d.batchId != nil {
return impl(t.d.batchReader(), func() error {
return nil
})
} else {
sn := t.d.st.NewSnapshot()
return impl(sn, sn.Abort)
}
}
////////////////////////////////////////
// Internal helpers
func (t *tableReq) stKey() string {
return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
}
func (t *tableReq) stKeyPart() string {
return t.name
}
// updateParentRefs updates the parent for all children of the given
// prefix to newParent.
func (t *tableReq) updateParentRefs(ctx *context.T, tx store.Transaction, prefix, newParent string) error {
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixStart := append(stPrefix, 0)
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
it := tx.Scan(stPrefixStart, stPrefixLimit)
var key, value []byte
for it.Advance() {
key, value = it.Key(key), it.Value(value)
var prefixPerms stPrefixPerms
if err := vom.Decode(value, &prefixPerms); err != nil {
it.Cancel()
return verror.New(verror.ErrInternal, ctx, err)
}
prefixPerms.Parent = newParent
if err := util.Put(ctx, tx, string(key), prefixPerms); err != nil {
it.Cancel()
return err
}
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
// lock invalidates all in-flight transactions that have touched this table,
// such that any subsequent tx.Commit() will return ErrConcurrentTransaction.
//
// It is necessary to call lock() every time prefix permissions are updated so
// that snapshots inside all transactions reflect up-to-date permissions. Since
// every public function that touches this table has to read the table-level
// permissions object, it suffices to add the key of this object to the write
// set of the current transaction.
//
// TODO(rogulenko): Revisit this behavior to provide more granularity.
// One option is to add a prefix and its parent to the write set of the current
// transaction when the permissions object for that prefix is updated.
func (t *tableReq) lock(ctx *context.T, tx store.Transaction) error {
var data tableData
if err := util.Get(ctx, tx, t.stKey(), &data); err != nil {
return err
}
return util.Put(ctx, tx, t.stKey(), data)
}
// checkAccess checks that this table exists in the database, and performs
// an authorization check. The access is checked at table level and at the
// level of the most specific prefix for the given key.
// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
// access check to be a check for "Resolve", i.e. also check access to
// service, app and database.
func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, key string) error {
prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return err
}
if prefix != "" {
if err := util.GetWithAuth(ctx, call, sntx, t.stKey(), &tableData{}); err != nil {
return err
}
}
auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
if err := auth.Authorize(ctx, call.Security()); err != nil {
return verror.New(verror.ErrNoAccess, ctx, prefix)
}
return nil
}
// permsForKey returns the longest prefix of the given key that has
// associated permissions, along with its permissions object.
// permsForKey doesn't perform an authorization check.
//
// Effectively, we represent all prefixes as a forest T, where each vertex maps
// to a prefix. A parent for a string is the maximum proper prefix of it that
// belongs to T. Each prefix P from T is represented as a pair of entries with
// keys P and P~ with values of type stPrefixPerms (parent + perms). High level
// explanation of how this function works:
// 1 iter = db.Scan(K, "")
// Here last character of iter.Key() is removed automatically if it is '~'
// 2 if hasPrefix(K, iter.Key()) return iter.Value()
// 3 return parent(iter.Key())
// Short proof:
// iter returned on line 1 points to one of the following:
// - a string t that is equal to K;
// - a string t~: if t is not a prefix of K, then K < t < t~ which
// contradicts with property of returned iterator on line 1 => t is prefix of
// K; also t is the largest prefix of K, as all larger prefixes of K are
// less than t~; in this case line 2 returns correct result;
// - a string t that doesn't end with '~': it can't be a prefix of K, as all
// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
// prefix of K; in this case line 3 returns correct result.
func (t *tableReq) permsForKey(ctx *context.T, sntx store.SnapshotOrTransaction, key string) (string, stPrefixPerms, error) {
it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
if !it.Advance() {
prefixPerms, err := t.permsForPrefix(ctx, sntx, "")
return "", prefixPerms, err
}
defer it.Cancel()
parts := util.SplitKeyParts(string(it.Key(nil)))
prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
value := it.Value(nil)
var prefixPerms stPrefixPerms
if err := vom.Decode(value, &prefixPerms); err != nil {
return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
}
if strings.HasPrefix(key, prefix) {
return prefix, prefixPerms, nil
}
prefixPerms, err := t.permsForPrefix(ctx, sntx, prefixPerms.Parent)
return prefixPerms.Parent, prefixPerms, err
}
// permsForPrefix returns the permissions object associated with the
// provided prefix.
func (t *tableReq) permsForPrefix(ctx *context.T, sntx store.SnapshotOrTransaction, prefix string) (stPrefixPerms, error) {
if prefix == "" {
var data tableData
if err := util.Get(ctx, sntx, t.stKey(), &data); err != nil {
return stPrefixPerms{}, err
}
return stPrefixPerms{Perms: data.Perms}, nil
}
var prefixPerms stPrefixPerms
if err := util.Get(ctx, sntx, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
}
return prefixPerms, nil
}
// prefixPermsKey returns the key used for storing permissions for the given
// prefix in the table.
func (t *tableReq) prefixPermsKey(prefix string) string {
return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
}