| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package server |
| |
| import ( |
| "math/rand" |
| "path/filepath" |
| "sync" |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/v23/glob" |
| "v.io/v23/query/engine" |
| ds "v.io/v23/query/engine/datasource" |
| "v.io/v23/query/syncql" |
| "v.io/v23/rpc" |
| "v.io/v23/security" |
| "v.io/v23/security/access" |
| wire "v.io/v23/services/syncbase" |
| pubutil "v.io/v23/syncbase/util" |
| "v.io/v23/verror" |
| "v.io/v23/vom" |
| "v.io/x/lib/vlog" |
| "v.io/x/ref/services/syncbase/common" |
| "v.io/x/ref/services/syncbase/server/interfaces" |
| "v.io/x/ref/services/syncbase/server/util" |
| "v.io/x/ref/services/syncbase/store" |
| storeutil "v.io/x/ref/services/syncbase/store/util" |
| "v.io/x/ref/services/syncbase/store/watchable" |
| "v.io/x/ref/services/syncbase/vsync" |
| sbwatchable "v.io/x/ref/services/syncbase/watchable" |
| ) |
| |
| // database is a per-database singleton (i.e. not per-request) that handles |
| // Database RPCs. |
| // Note: If a database does not exist at the time of a database RPC, the |
| // dispatcher creates a short-lived database object to service that particular |
| // request. |
| type database struct { |
| id wire.Id |
| s *service |
| // The fields below are initialized iff this database exists. |
| exists bool |
| // TODO(sadovsky): Make st point to a store.Store wrapper that handles paging, |
| // and do not actually open the store in NewDatabase. |
| st *watchable.Store // stores all data for a single database |
| |
| // Active snapshots and transactions corresponding to client batches. |
| // TODO(sadovsky): Add timeouts and GC. |
| mu sync.Mutex // protects the fields below |
| sns map[uint64]store.Snapshot |
| txs map[uint64]*transactionState |
| |
| // Active ConflictResolver connection from the app to this database. |
| // NOTE: For now, we assume there's only one open conflict resolution stream |
| // per database (typically, from the app that owns the database). |
| crStream wire.ConflictManagerStartConflictResolverServerCall |
| // Mutex lock to protect concurrent read/write of crStream pointer |
| crMu sync.Mutex |
| } |
| |
| var ( |
| _ wire.DatabaseServerMethods = (*database)(nil) |
| _ interfaces.Database = (*database)(nil) |
| ) |
| |
| // DatabaseOptions configures a database. |
| type DatabaseOptions struct { |
| // Database-level permissions. |
| Perms access.Permissions |
| // Root dir for data storage. This path is relative from the service's RootDir. |
| RootDir string |
| // Storage engine to use. |
| Engine string |
| } |
| |
| type permissionState struct { |
| dataChanged bool |
| permsChanged bool |
| initialPerms access.Permissions |
| finalPerms access.Permissions |
| } |
| |
| type transactionState struct { |
| tx *watchable.Transaction |
| permsChanges map[wire.Id]*permissionState |
| } |
| |
| // Keeps track that this collection had a mutation and the permissions at the time. If no |
| // permissions are yet known for the collection then remember the current permissions as the |
| // initial permissions of the collection. |
| // When the transaction is committed we will know to validate this collection's permissions. |
| func (ts *transactionState) MarkDataChanged(collectionId wire.Id, perms access.Permissions) { |
| state := ts.permsState(collectionId) |
| state.dataChanged = true |
| if state.initialPerms == nil { |
| state.initialPerms = perms |
| } |
| state.finalPerms = perms |
| } |
| |
| // Keeps track that the permissions were changed on this collection and the before and after |
| // permissions. If no permissions are yet known for the collection then remember the current |
| // permissions as the initial permissions of the collection. |
| func (ts *transactionState) MarkPermsChanged(collectionId wire.Id, permsBefore access.Permissions, permsAfter access.Permissions) { |
| state := ts.permsState(collectionId) |
| state.permsChanged = true |
| if state.initialPerms == nil { |
| state.initialPerms = permsBefore |
| } |
| state.finalPerms = permsAfter |
| } |
| |
| // Resets all tracked changes to the collection. Used on collection destroy. Since destroy |
| // cannot happen on a synced collection, the destroy and any updates before it will not be |
| // seen remotely, so validation must start from the implicit permissions in case the |
| // collection is created again. This also allows destroy to not require both write and |
| // admin permissions. |
| func (ts *transactionState) ResetCollectionChanges(collectionId wire.Id) { |
| delete(ts.permsChanges, collectionId) |
| } |
| |
| // validatePermissionChanges performs an auth check on each collection that has a data change or |
| // permission change and returns false if any of the auth checks fail. |
| // TODO(ivanpi): This check should be done against signing blessings at signing time, in |
| // both batch and non-batch cases. |
| func (ts *transactionState) validatePermissionChanges(ctx *context.T, securityCall security.Call) bool { |
| for _, collectionState := range ts.permsChanges { |
| // This collection was modified, make sure that the write acl is either present at |
| // the end or that it had the write acl to begin with. This way we can be sure that |
| // a mutation didn't take place when it appeared that there was no write acl before |
| // and after the transaction. |
| if collectionState.dataChanged { |
| before := hasPermission(ctx, securityCall, collectionState.initialPerms, access.Write) |
| after := hasPermission(ctx, securityCall, collectionState.finalPerms, access.Write) |
| if !after && !before { |
| return false |
| } |
| } |
| |
| // The permissions were changed on the collection, make sure that the admin acl is |
| // present at the beginning. |
| if collectionState.permsChanged { |
| if !hasPermission(ctx, securityCall, collectionState.initialPerms, access.Admin) { |
| return false |
| } |
| } |
| } |
| return true |
| } |
| |
| func (ts *transactionState) permsState(collectionId wire.Id) *permissionState { |
| if ts.permsChanges == nil { |
| ts.permsChanges = make(map[wire.Id]*permissionState) |
| } |
| state, ok := ts.permsChanges[collectionId] |
| if !ok { |
| state = &permissionState{} |
| ts.permsChanges[collectionId] = state |
| } |
| return state |
| } |
| |
| // hasPermission returns true if the caller is authorized for the specific tag based on the |
| // passed in perms. |
| func hasPermission(ctx *context.T, securityCall security.Call, perms access.Permissions, tag access.Tag) bool { |
| permForTag, ok := perms[string(tag)] |
| // Authorize returns either an error or nil, so nil means the caller is authorized. |
| return ok && permForTag.Authorize(ctx, securityCall) == nil |
| } |
| |
| // openDatabase opens a database and returns a *database for it. Designed for |
| // use from within newDatabase and newService. |
| func openDatabase(ctx *context.T, s *service, id wire.Id, opts DatabaseOptions, openOpts storeutil.OpenOptions) (*database, error) { |
| // DatabaseOption's RootDir is relative to the service's RootDir (but for backwards compatibility |
| // s.absRootDir will return any absolute paths as-is). |
| p := s.absRootDir(filepath.Join(opts.RootDir, opts.Engine)) |
| st, err := storeutil.OpenStore(opts.Engine, p, openOpts) |
| if err != nil { |
| return nil, err |
| } |
| wst, err := watchable.Wrap(st, s.vclock, &watchable.Options{ |
| // TODO(ivanpi): Since ManagedPrefixes control what gets synced, they |
| // should be moved to a more visible place (e.g. constants). Also consider |
| // decoupling managed and synced prefixes. |
| ManagedPrefixes: []string{common.CollectionPermsPrefix, common.RowPrefix}, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return &database{ |
| id: id, |
| s: s, |
| exists: true, |
| st: wst, |
| sns: make(map[uint64]store.Snapshot), |
| txs: make(map[uint64]*transactionState), |
| }, nil |
| } |
| |
| // newDatabase creates a new database instance and returns it. |
| // Designed for use from within service.createDatabase. |
| func newDatabase(ctx *context.T, s *service, id wire.Id, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) { |
| if opts.Perms == nil { |
| return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified") |
| } |
| d, err := openDatabase(ctx, s, id, opts, storeutil.OpenOptions{CreateIfMissing: true, ErrorIfExists: true}) |
| if err != nil { |
| return nil, err |
| } |
| data := &DatabaseData{ |
| Perms: opts.Perms, |
| SchemaMetadata: metadata, |
| } |
| if err := store.Put(ctx, d.st, d.stKey(), data); err != nil { |
| return nil, err |
| } |
| |
| // Start a Sync watcher on this newly created database store. |
| vsync.NewSyncDatabase(d).StartStoreWatcher(ctx) |
| |
| return d, nil |
| } |
| |
| //////////////////////////////////////// |
| // RPC methods |
| |
| func (d *database) Create(ctx *context.T, call rpc.ServerCall, metadata *wire.SchemaMetadata, perms access.Permissions) error { |
| if d.exists { |
| return verror.New(verror.ErrExist, ctx, d.id) |
| } |
| // This database does not yet exist; d is just an ephemeral handle that holds |
| // {id wire.Id, s *service}. d.s.createDatabase will create a new database |
| // handle and store it in d.s.dbs[d.id]. |
| return d.s.createDatabase(ctx, call, d.id, perms, metadata) |
| } |
| |
| func (d *database) Destroy(ctx *context.T, call rpc.ServerCall) error { |
| return d.s.destroyDatabase(ctx, call, d.id) |
| } |
| |
| func (d *database) Exists(ctx *context.T, call rpc.ServerCall) (bool, error) { |
| if !d.exists { |
| return false, nil |
| } |
| return util.ErrorToExists(util.GetWithAuth(ctx, call, d.st, d.stKey(), &DatabaseData{})) |
| } |
| |
| var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) |
| |
| func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, opts wire.BatchOptions) (wire.BatchHandle, error) { |
| if !d.exists { |
| return "", verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| var id uint64 |
| var batchType common.BatchType |
| for { |
| id = uint64(rng.Int63()) |
| if opts.ReadOnly { |
| if _, ok := d.sns[id]; !ok { |
| d.sns[id] = d.st.NewSnapshot() |
| batchType = common.BatchTypeSn |
| break |
| } |
| } else { |
| if _, ok := d.txs[id]; !ok { |
| d.txs[id] = &transactionState{tx: d.st.NewWatchableTransaction()} |
| batchType = common.BatchTypeTx |
| break |
| } |
| } |
| } |
| return common.JoinBatchHandle(batchType, id), nil |
| } |
| |
| func (d *database) Commit(ctx *context.T, call rpc.ServerCall, bh wire.BatchHandle) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| if bh == "" { |
| return wire.NewErrNotBoundToBatch(ctx) |
| } |
| _, ts, batchId, err := d.batchLookupInternal(ctx, bh) |
| if err != nil { |
| return err |
| } |
| if ts == nil { |
| return wire.NewErrReadOnlyBatch(ctx) |
| } |
| if !ts.validatePermissionChanges(ctx, call.Security()) { |
| return wire.NewErrInvalidPermissionsChange(ctx) |
| } |
| if err = ts.tx.Commit(); err == nil { |
| d.mu.Lock() |
| delete(d.txs, batchId) |
| d.mu.Unlock() |
| } |
| // TODO(ivanpi): Best effort abort if commit fails? Watchable Commit can fail |
| // before the underlying snapshot is aborted. |
| if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID { |
| return verror.New(wire.ErrConcurrentBatch, ctx, err) |
| } |
| return err |
| } |
| |
| func (d *database) Abort(ctx *context.T, call rpc.ServerCall, bh wire.BatchHandle) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| if bh == "" { |
| return wire.NewErrNotBoundToBatch(ctx) |
| } |
| sn, ts, batchId, err := d.batchLookupInternal(ctx, bh) |
| if err != nil { |
| return err |
| } |
| if ts != nil { |
| d.mu.Lock() |
| delete(d.txs, batchId) |
| d.mu.Unlock() |
| // TODO(ivanpi): If tx.Abort fails, retry later? |
| return ts.tx.Abort() |
| } else { |
| d.mu.Lock() |
| delete(d.sns, batchId) |
| d.mu.Unlock() |
| // TODO(ivanpi): If sn.Abort fails, retry later? |
| return sn.Abort() |
| } |
| } |
| |
| func (d *database) Exec(ctx *context.T, call wire.DatabaseExecServerCall, bh wire.BatchHandle, q string, params []*vom.RawBytes) error { |
| // RunInTransaction() cannot be used here because we may or may not be |
| // creating a transaction. qe.Exec must be called and the statement must be |
| // parsed before we know if a snapshot or a transaction should be created. To |
| // duplicate the semantics of RunInTransaction, if we are not inside a batch, |
| // we attempt the Exec up to 100 times and retry on ErrConcurrentTransaction. |
| // TODO(ivanpi): Refactor query parsing into a separate step, simplify request |
| // handling. Consider separate Query and Exec methods. |
| maxAttempts := 100 |
| attempt := 0 |
| for { |
| err := d.execInternal(ctx, call, bh, q, params) |
| if bh != "" || attempt >= maxAttempts || verror.ErrorID(err) != store.ErrConcurrentTransaction.ID { |
| return err |
| } |
| attempt++ |
| } |
| } |
| |
| func (d *database) execInternal(ctx *context.T, call wire.DatabaseExecServerCall, bh wire.BatchHandle, q string, params []*vom.RawBytes) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| impl := func() error { |
| db := &queryDb{ |
| ctx: ctx, |
| call: call, |
| d: d, |
| bh: bh, |
| sntx: nil, // Filled in later with existing or created sn/tx. |
| ts: nil, // Only filled in if a new batch was created. |
| } |
| st, err := engine.Create(db).PrepareStatement(q) |
| if err != nil { |
| return execCommitOrAbort(db, err) |
| } |
| headers, rs, err := st.Exec(params...) |
| if err != nil { |
| return execCommitOrAbort(db, err) |
| } |
| if rs.Err() != nil { |
| return execCommitOrAbort(db, err) |
| } |
| sender := call.SendStream() |
| // Push the headers first -- the client will retrieve them and return |
| // them separately from the results. |
| var resultHeaders []*vom.RawBytes |
| for _, header := range headers { |
| resultHeaders = append(resultHeaders, vom.RawBytesOf(header)) |
| } |
| sender.Send(resultHeaders) |
| for rs.Advance() { |
| result := rs.Result() |
| if err := sender.Send(result); err != nil { |
| rs.Cancel() |
| return execCommitOrAbort(db, err) |
| } |
| } |
| return execCommitOrAbort(db, rs.Err()) |
| } |
| return impl() |
| } |
| |
| func execCommitOrAbort(qdb *queryDb, err error) error { |
| if qdb.bh != "" { |
| return err // part of an enclosing sn/tx |
| } |
| if err != nil { |
| if qdb.sntx != nil { |
| qdb.sntx.Abort() |
| } |
| return err |
| } else { // err is nil |
| if qdb.ts != nil { |
| return qdb.ts.tx.Commit() |
| } else if qdb.sntx != nil { |
| return qdb.sntx.Abort() |
| } |
| return nil |
| } |
| } |
| |
| func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| return d.s.setDatabasePerms(ctx, call, d.id, perms, version) |
| } |
| |
| func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) { |
| if !d.exists { |
| return nil, "", verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| data := &DatabaseData{} |
| if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), data); err != nil { |
| return nil, "", err |
| } |
| return data.Perms, util.FormatVersion(data.Version), nil |
| } |
| |
| func (d *database) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| impl := func(sntx store.SnapshotOrTransaction) error { |
| // Check perms. |
| if err := util.GetWithAuth(ctx, call, sntx, d.stKey(), &DatabaseData{}); err != nil { |
| return err |
| } |
| return util.GlobChildren(ctx, call, matcher, sntx, common.CollectionPermsPrefix) |
| } |
| return store.RunWithSnapshot(d.st, impl) |
| } |
| |
| // See comment in v.io/v23/services/syncbase/service.vdl for why we can't |
| // implement ListCollections using Glob. |
| func (d *database) ListCollections(ctx *context.T, call rpc.ServerCall, bh wire.BatchHandle) ([]wire.Id, error) { |
| if !d.exists { |
| return nil, verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| var res []wire.Id |
| impl := func(sntx store.SnapshotOrTransaction) error { |
| // Check perms. |
| if err := util.GetWithAuth(ctx, call, sntx, d.stKey(), &DatabaseData{}); err != nil { |
| return err |
| } |
| it := sntx.Scan(common.ScanPrefixArgs(common.CollectionPermsPrefix, "")) |
| keyBytes := []byte{} |
| for it.Advance() { |
| keyBytes = it.Key(keyBytes) |
| id, err := common.ParseCollectionPermsKey(string(keyBytes)) |
| if err != nil { |
| it.Cancel() |
| return verror.New(verror.ErrInternal, ctx, err) |
| } |
| res = append(res, id) |
| } |
| if err := it.Err(); err != nil { |
| return err |
| } |
| return nil |
| } |
| if err := d.runWithExistingBatchOrNewSnapshot(ctx, bh, impl); err != nil { |
| return nil, err |
| } |
| return res, nil |
| } |
| |
| func (d *database) PauseSync(ctx *context.T, call rpc.ServerCall) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| return d.runInTransaction(func(ts *transactionState) error { |
| return sbwatchable.AddDbStateChangeRequestOp(ctx, ts.tx, sbwatchable.StateChangePauseSync) |
| }) |
| } |
| |
| func (d *database) ResumeSync(ctx *context.T, call rpc.ServerCall) error { |
| if !d.exists { |
| return verror.New(verror.ErrNoExist, ctx, d.id) |
| } |
| return d.runInTransaction(func(ts *transactionState) error { |
| return sbwatchable.AddDbStateChangeRequestOp(ctx, ts.tx, sbwatchable.StateChangeResumeSync) |
| }) |
| } |
| |
| //////////////////////////////////////// |
| // interfaces.Database methods |
| |
| func (d *database) St() *watchable.Store { |
| if !d.exists { |
| vlog.Fatalf("database %v does not exist", d.id) |
| } |
| return d.st |
| } |
| |
| func (d *database) Service() interfaces.Service { |
| return d.s |
| } |
| |
| func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error { |
| if !d.exists { |
| vlog.Fatalf("database %v does not exist", d.id) |
| } |
| return util.GetWithAuth(ctx, call, st, d.stKey(), &DatabaseData{}) |
| } |
| |
| func (d *database) Id() wire.Id { |
| return d.id |
| } |
| |
| func (d *database) CrConnectionStream() wire.ConflictManagerStartConflictResolverServerStream { |
| d.crMu.Lock() |
| defer d.crMu.Unlock() |
| return d.crStream |
| } |
| |
| func (d *database) ResetCrConnectionStream() { |
| d.crMu.Lock() |
| defer d.crMu.Unlock() |
| // TODO(jlodhia): figure out a way for the connection to gracefully shutdown |
| // so that the client can get an appropriate error msg. |
| d.crStream = nil |
| } |
| |
| //////////////////////////////////////// |
| // query interface implementations |
| |
| // queryDb implements ds.Database. |
| type queryDb struct { |
| ctx *context.T |
| call rpc.ServerCall |
| d *database |
| bh wire.BatchHandle |
| sntx store.SnapshotOrTransaction |
| ts *transactionState // Will only be set if in a transaction (else nil) |
| } |
| |
| func (qdb *queryDb) GetContext() *context.T { |
| return qdb.ctx |
| } |
| |
| func (qdb *queryDb) GetTable(name string, writeAccessReq bool) (ds.Table, error) { |
| // At this point, when the query package calls GetTable with the |
| // writeAccessReq arg, we know whether or not we need a [writable] transaction |
| // or a snapshot. If batchId is already set, there's nothing to do; but if |
| // not, the writeAccessReq arg dictates whether a snapshot or a transaction is |
| // should be created. |
| // TODO(ivanpi): Allow passing in non-default user blessings. |
| userBlessings, _ := security.RemoteBlessingNames(qdb.ctx, qdb.call.Security()) |
| _, user, err := pubutil.AppAndUserPatternFromBlessings(userBlessings...) |
| if err != nil { |
| return nil, err |
| } |
| qt := &queryTable{ |
| qdb: qdb, |
| cReq: &collectionReq{ |
| id: wire.Id{Blessing: string(user), Name: name}, |
| d: qdb.d, |
| }, |
| } |
| if qt.qdb.bh != "" { |
| var err error |
| if writeAccessReq { |
| // We are in a batch (could be snapshot or transaction) |
| // and Write access is required. Attempt to get a |
| // transaction from the request. |
| qt.qdb.ts, err = qt.qdb.d.batchTransaction(qt.qdb.GetContext(), qt.qdb.bh) |
| if err != nil { |
| if verror.ErrorID(err) == wire.ErrReadOnlyBatch.ID { |
| // We are in a snapshot batch, write access cannot be provided. |
| // Return NotWritable. |
| return nil, syncql.NewErrNotWritable(qt.qdb.GetContext(), pubutil.EncodeId(qt.cReq.id)) |
| } |
| return nil, err |
| } |
| qt.qdb.sntx = qt.qdb.ts.tx |
| } else { |
| qt.qdb.sntx, err = qt.qdb.d.batchReader(qt.qdb.GetContext(), qt.qdb.bh) |
| if err != nil { |
| return nil, err |
| } |
| } |
| } else { |
| // Now that we know if write access is required, create a snapshot |
| // or transaction. |
| if !writeAccessReq { |
| qt.qdb.sntx = qt.qdb.d.st.NewSnapshot() |
| } else { // writeAccessReq |
| qt.qdb.ts = &transactionState{tx: qt.qdb.d.st.NewWatchableTransaction()} |
| qt.qdb.sntx = qt.qdb.ts.tx |
| } |
| } |
| // Now that we have a collection, we need to check permissions. |
| collectionPerms, err := qt.cReq.checkAccess(qdb.ctx, qdb.call, qdb.sntx) |
| if err != nil { |
| return nil, err |
| } |
| if writeAccessReq { |
| qt.qdb.ts.MarkDataChanged(qt.cReq.id, collectionPerms) |
| } |
| return qt, nil |
| } |
| |
| // queryTable implements ds.Table. |
| type queryTable struct { |
| qdb *queryDb |
| cReq *collectionReq |
| } |
| |
| func (t *queryTable) GetIndexFields() []ds.Index { |
| // TODO(jkline): If and when secondary indexes are supported, they |
| // would be supplied here. |
| return []ds.Index{} |
| } |
| |
| func (t *queryTable) Delete(k string) (bool, error) { |
| // Create a rowReq and call delete. Permissions will be checked. |
| rowReq := &rowReq{ |
| key: k, |
| c: t.cReq, |
| } |
| if err := rowReq.delete(t.qdb.GetContext(), t.qdb.call, t.qdb.ts); err != nil { |
| return false, err |
| } |
| return true, nil |
| } |
| |
| func (t *queryTable) Scan(indexRanges ...ds.IndexRanges) (ds.KeyValueStream, error) { |
| streams := []store.Stream{} |
| // Syncbase does not currently support secondary indexes. As such, indexRanges |
| // is guaranteed to be one in size as it will only specify the key ranges; |
| // hence, indexRanges[0] below. |
| for _, keyRange := range *indexRanges[0].StringRanges { |
| // TODO(jkline): For now, acquire all of the streams at once to minimize the |
| // race condition. Need a way to Scan multiple ranges at the same state of |
| // uncommitted changes. |
| streams = append(streams, t.qdb.sntx.Scan(common.ScanRangeArgs(common.JoinKeyParts(common.RowPrefix, t.cReq.stKeyPart()), keyRange.Start, keyRange.Limit))) |
| } |
| return &kvs{ |
| t: t, |
| curr: 0, |
| validRow: false, |
| it: streams, |
| err: nil, |
| }, nil |
| } |
| |
| // kvs implements ds.KeyValueStream. |
| type kvs struct { |
| t *queryTable |
| curr int |
| validRow bool |
| currKey string |
| currValue *vom.RawBytes |
| it []store.Stream // array of store.Streams |
| err error |
| } |
| |
| func (s *kvs) Advance() bool { |
| if s.err != nil { |
| return false |
| } |
| for s.curr < len(s.it) { |
| if s.it[s.curr].Advance() { |
| // key |
| keyBytes := s.it[s.curr].Key(nil) |
| parts := common.SplitNKeyParts(string(keyBytes), 3) |
| // TODO(rogulenko): Check access for the key. |
| s.currKey = parts[2] |
| // value |
| valueBytes := s.it[s.curr].Value(nil) |
| var currValue *vom.RawBytes |
| if err := vom.Decode(valueBytes, &currValue); err != nil { |
| s.validRow = false |
| s.err = err |
| s.Cancel() // to cancel iterators after s.curr |
| return false |
| } |
| s.currValue = currValue |
| s.validRow = true |
| return true |
| } |
| // Advance returned false. It could be an err, or it could |
| // be we've reached the end. |
| if err := s.it[s.curr].Err(); err != nil { |
| s.validRow = false |
| s.err = err |
| s.Cancel() // to cancel iterators after s.curr |
| return false |
| } |
| // We've reached the end of the iterator for this keyRange. |
| // Jump to the next one. |
| s.it[s.curr] = nil |
| s.curr++ |
| s.validRow = false |
| } |
| // There are no more prefixes to scan. |
| return false |
| } |
| |
| func (s *kvs) KeyValue() (string, *vom.RawBytes) { |
| if !s.validRow { |
| return "", nil |
| } |
| return s.currKey, s.currValue |
| } |
| |
| func (s *kvs) Err() error { |
| return s.err |
| } |
| |
| func (s *kvs) Cancel() { |
| if s.it != nil { |
| for i := s.curr; i < len(s.it); i++ { |
| s.it[i].Cancel() |
| } |
| s.it = nil |
| } |
| // set curr to end of keyRanges so Advance will return false |
| s.curr = len(s.it) |
| } |
| |
| //////////////////////////////////////// |
| // Internal helpers |
| |
| func (d *database) stKey() string { |
| return common.DatabasePrefix |
| } |
| |
| func (d *database) runWithExistingBatchOrNewSnapshot(ctx *context.T, bh wire.BatchHandle, fn func(sntx store.SnapshotOrTransaction) error) error { |
| if bh != "" { |
| if sntx, err := d.batchReader(ctx, bh); err != nil { |
| // Batch does not exist. |
| return err |
| } else { |
| return fn(sntx) |
| } |
| } else { |
| return store.RunWithSnapshot(d.st, fn) |
| } |
| } |
| |
| func (d *database) runInExistingBatchOrNewTransaction(ctx *context.T, bh wire.BatchHandle, fn func(ts *transactionState) error) error { |
| if bh != "" { |
| if batch, err := d.batchTransaction(ctx, bh); err != nil { |
| // Batch does not exist or is readonly (snapshot). |
| return err |
| } else { |
| return fn(batch) |
| } |
| } else { |
| return d.runInTransaction(fn) |
| } |
| } |
| |
| func (d *database) batchReader(ctx *context.T, bh wire.BatchHandle) (store.SnapshotOrTransaction, error) { |
| sn, ts, _, err := d.batchLookupInternal(ctx, bh) |
| if err != nil { |
| return nil, err |
| } |
| if sn != nil { |
| return sn, nil |
| } |
| return ts.tx, nil |
| } |
| |
| func (d *database) batchTransaction(ctx *context.T, bh wire.BatchHandle) (*transactionState, error) { |
| sn, ts, _, err := d.batchLookupInternal(ctx, bh) |
| if err != nil { |
| return nil, err |
| } |
| if sn != nil { |
| return nil, wire.NewErrReadOnlyBatch(ctx) |
| } |
| return ts, nil |
| } |
| |
| // batchLookupInternal parses the batch handle and retrieves the corresponding |
| // snapshot or transaction. It returns an error if the handle is malformed or |
| // the batch does not exist. Otherwise, exactly one of sn and ts will be != nil. |
| func (d *database) batchLookupInternal(ctx *context.T, bh wire.BatchHandle) (sn store.Snapshot, ts *transactionState, batchId uint64, _ error) { |
| if bh == "" { |
| return nil, nil, 0, verror.New(verror.ErrInternal, ctx, "batch lookup for empty handle") |
| } |
| bType, bId, err := common.SplitBatchHandle(bh) |
| if err != nil { |
| return nil, nil, 0, err |
| } |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| var found bool |
| switch bType { |
| case common.BatchTypeSn: |
| sn, found = d.sns[bId] |
| case common.BatchTypeTx: |
| ts, found = d.txs[bId] |
| } |
| if !found { |
| return nil, nil, bId, wire.NewErrUnknownBatch(ctx) |
| } |
| return sn, ts, bId, nil |
| } |
| |
| func (d *database) setPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error { |
| if !d.exists { |
| vlog.Fatalf("database %v does not exist", d.id) |
| } |
| if err := common.ValidatePerms(ctx, perms, wire.AllDatabaseTags); err != nil { |
| return err |
| } |
| return store.RunInTransaction(d.st, func(tx store.Transaction) error { |
| data := &DatabaseData{} |
| return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error { |
| if err := util.CheckVersion(ctx, version, data.Version); err != nil { |
| return err |
| } |
| data.Perms = perms |
| data.Version++ |
| return nil |
| }) |
| }) |
| } |
| |
| // runInTransaction runs the given fn in a transaction, managing retries and |
| // commit/abort. |
| func (d *database) runInTransaction(fn func(ts *transactionState) error) error { |
| // TODO(rogulenko): Change the default number of attempts to 3. Currently, |
| // some storage engine tests fail when the number of attempts is that low. |
| return d.runInTransactionWithOpts(&store.TransactionOptions{NumAttempts: 100}, fn) |
| } |
| |
| // runInTransactionWithOpts runs the given fn in a transaction, managing retries |
| // and commit/abort. |
| func (d *database) runInTransactionWithOpts(opts *store.TransactionOptions, fn func(ts *transactionState) error) error { |
| var err error |
| for i := 0; i < opts.NumAttempts; i++ { |
| // TODO(sadovsky): Should NewTransaction return an error? If not, how will |
| // we deal with RPC errors when talking to remote storage engines? (Note, |
| // client-side BeginBatch returns an error.) |
| ts := &transactionState{tx: d.st.NewWatchableTransaction()} |
| if err = fn(ts); err != nil { |
| ts.tx.Abort() |
| return err |
| } |
| // TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC |
| // failure or ErrConcurrentTransaction. Depending on the cause of failure, |
| // it may be desirable to retry the Commit() and/or to call Abort(). |
| if err = ts.tx.Commit(); verror.ErrorID(err) != store.ErrConcurrentTransaction.ID { |
| return err |
| } |
| } |
| return err |
| } |