blob: 7619fc40ae134e5ed25a7b83fa20b4007b2e7dbf [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package nosql
import (
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
)
// tableReq is a per-request object that handles Table RPCs.
type tableReq struct {
name string
d *databaseReq
}
var (
_ wire.TableServerMethods = (*tableReq)(nil)
_ util.Layer = (*tableReq)(nil)
)
////////////////////////////////////////
// RPC methods
func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Check databaseData perms.
dData := &databaseData{}
if err := util.Get(ctx, call, st, t.d, dData); err != nil {
return err
}
// Check for "table already exists".
if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
// TODO(sadovsky): Should this be ErrExistOrNoAccess, for privacy?
return verror.New(verror.ErrExist, ctx, t.name)
}
// Write new tableData.
if perms == nil {
perms = dData.Perms
}
data := &tableData{
Name: t.name,
Perms: perms,
}
return util.Put(ctx, call, st, t, data)
})
}
func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Read-check-delete tableData.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all rows in this table.
return util.Delete(ctx, call, st, t)
})
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
impl := func(st store.StoreReadWriter) error {
// Check perms.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
if err := st.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
if t.d.batchId != nil {
if st, err := t.d.batchReadWriter(); err != nil {
return err
} else {
return impl(st)
}
} else {
return store.RunInTransaction(t.d.st, impl)
}
}
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
impl := func(st store.StoreReader) error {
// Check perms.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
sender := call.SendStream()
key, value := []byte{}, []byte{}
for it.Advance() {
key, value = it.Key(key), it.Value(value)
parts := util.SplitKeyParts(string(key))
sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
var st store.StoreReader
if t.d.batchId != nil {
st = t.d.batchReader()
} else {
sn := t.d.st.NewSnapshot()
st = sn
defer sn.Close()
}
return impl(st)
}
func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
if prefix != "" {
return verror.NewErrNotImplemented(ctx)
}
impl := func(st store.StoreReadWriter) error {
data := &tableData{}
return util.Update(ctx, call, st, t, data, func() error {
data.Perms = perms
return nil
})
}
if t.d.batchId != nil {
if st, err := t.d.batchReadWriter(); err != nil {
return err
} else {
return impl(st)
}
} else {
return store.RunInTransaction(t.d.st, impl)
}
}
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
if key != "" {
return nil, verror.NewErrNotImplemented(ctx)
}
impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
data := &tableData{}
if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
return nil, err
}
return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
}
var st store.StoreReader
if t.d.batchId != nil {
st = t.d.batchReader()
} else {
sn := t.d.st.NewSnapshot()
st = sn
defer sn.Close()
}
return impl(st)
}
func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
return verror.NewErrNotImplemented(ctx)
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
// Check perms.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
closeStoreReader()
return nil, err
}
return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
var st store.StoreReader
var closeStoreReader func() error
if t.d.batchId != nil {
st = t.d.batchReader()
closeStoreReader = func() error {
return nil
}
} else {
sn := t.d.st.NewSnapshot()
st = sn
closeStoreReader = func() error {
return sn.Close()
}
}
return impl(st, closeStoreReader)
}
////////////////////////////////////////
// util.Layer methods
func (t *tableReq) Name() string {
return t.name
}
func (t *tableReq) StKey() string {
return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
}
////////////////////////////////////////
// Internal helpers
func (t *tableReq) stKeyPart() string {
return t.name
}