blob: 276fd2cc5ac15a0133730673aeb4d524dff0db1d [file] [log] [blame]
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"errors"
"net/http"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
pb "google.golang.org/cloud/internal/datastore"
)
// ErrConcurrentTransaction is returned when a transaction is rolled back due
// to a conflict with a concurrent transaction.
var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
var errExpiredTransaction = errors.New("datastore: transaction expired")
// A TransactionOption configures the Transaction returned by NewTransaction.
type TransactionOption interface {
apply(*pb.BeginTransactionRequest)
}
type isolation struct {
level pb.BeginTransactionRequest_IsolationLevel
}
func (i isolation) apply(req *pb.BeginTransactionRequest) {
req.IsolationLevel = i.level.Enum()
}
var (
// Snapshot causes the transaction to enforce a snapshot isolation level.
Snapshot TransactionOption = isolation{pb.BeginTransactionRequest_SNAPSHOT}
// Serializable causes the transaction to enforce a serializable isolation level.
Serializable TransactionOption = isolation{pb.BeginTransactionRequest_SERIALIZABLE}
)
// Transaction represents a set of datastore operations to be committed atomically.
//
// Operations are enqueued by calling the Put and Delete methods on Transaction
// (or their Multi-equivalents). These operations are only committed when the
// Commit method is invoked. To ensure consistency, reads must be performed by
// using Transaction's Get method or by using the Transaction method when
// building a query.
//
// A Transaction must be committed or rolled back exactly once.
type Transaction struct {
id []byte
ctx context.Context
mutation *pb.Mutation // The mutations to apply.
pending []*PendingKey // Incomplete keys pending transaction completion.
}
// NewTransaction starts a new transaction.
func NewTransaction(ctx context.Context, opts ...TransactionOption) (*Transaction, error) {
req, resp := &pb.BeginTransactionRequest{}, &pb.BeginTransactionResponse{}
for _, o := range opts {
o.apply(req)
}
if err := call(ctx, "beginTransaction", req, resp); err != nil {
return nil, err
}
return &Transaction{
id: resp.Transaction,
ctx: ctx,
mutation: &pb.Mutation{},
}, nil
}
// Commit applies the enqueued operations atomically.
func (t *Transaction) Commit() (*Commit, error) {
if t.id == nil {
return nil, errExpiredTransaction
}
req := &pb.CommitRequest{
Transaction: t.id,
Mutation: t.mutation,
Mode: pb.CommitRequest_TRANSACTIONAL.Enum(),
}
t.id = nil
resp := &pb.CommitResponse{}
if err := call(t.ctx, "commit", req, resp); err != nil {
if e, ok := err.(*errHTTP); ok && e.StatusCode == http.StatusConflict {
// TODO(jbd): Make sure that we explicitly handle the case where response
// has an HTTP 409 and the error message indicates that it's an concurrent
// transaction error.
return nil, ErrConcurrentTransaction
}
return nil, err
}
// Copy any newly minted keys into the returned keys.
if len(t.pending) != len(resp.MutationResult.InsertAutoIdKey) {
return nil, errors.New("datastore: internal error: server returned the wrong number of keys")
}
commit := &Commit{}
for i, p := range t.pending {
p.key = protoToKey(resp.MutationResult.InsertAutoIdKey[i])
p.commit = commit
}
return commit, nil
}
// Rollback abandons a pending transaction.
func (t *Transaction) Rollback() error {
if t.id == nil {
return errExpiredTransaction
}
id := t.id
t.id = nil
return call(t.ctx, "rollback", &pb.RollbackRequest{Transaction: id}, &pb.RollbackResponse{})
}
// Get is the transaction-specific version of the package function Get.
// All reads performed during the transaction will come from a single consistent
// snapshot. Furthermore, if the transaction is set to a serializable isolation
// level, another transaction cannot concurrently modify the data that is read
// or modified by this transaction.
func (t *Transaction) Get(key *Key, dst interface{}) error {
err := get(t.ctx, []*Key{key}, []interface{}{dst}, &pb.ReadOptions{Transaction: t.id})
if me, ok := err.(MultiError); ok {
return me[0]
}
return err
}
// GetMulti is a batch version of Get.
func (t *Transaction) GetMulti(keys []*Key, dst interface{}) error {
if t.id == nil {
return errExpiredTransaction
}
return get(t.ctx, keys, dst, &pb.ReadOptions{Transaction: t.id})
}
// Put is the transaction-specific version of the package function Put.
//
// Put returns a PendingKey which can be resolved into a Key using the
// return value from a successful Commit. If key is an incomplete key, the
// returned pending key will resolve to a unique key generated by the
// datastore.
func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
h, err := t.PutMulti([]*Key{key}, []interface{}{src})
if err != nil {
if me, ok := err.(MultiError); ok {
return nil, me[0]
}
return nil, err
}
return h[0], nil
}
// PutMulti is a batch version of Put. One PendingKey is returned for each
// element of src in the same order.
func (t *Transaction) PutMulti(keys []*Key, src interface{}) ([]*PendingKey, error) {
if t.id == nil {
return nil, errExpiredTransaction
}
mutation, err := putMutation(keys, src)
if err != nil {
return nil, err
}
proto.Merge(t.mutation, mutation)
// Prepare the returned handles, pre-populating where possible.
ret := make([]*PendingKey, len(keys))
for _, key := range keys {
h := &PendingKey{}
if key.Incomplete() {
// This key will be in the final commit result.
t.pending = append(t.pending, h)
} else {
h.key = key
}
ret = append(ret, h)
}
return ret, nil
}
// Delete is the transaction-specific version of the package function Delete.
// Delete enqueues the deletion of the entity for the given key, to be
// committed atomically upon calling Commit.
func (t *Transaction) Delete(key *Key) error {
err := t.DeleteMulti([]*Key{key})
if me, ok := err.(MultiError); ok {
return me[0]
}
return err
}
// DeleteMulti is a batch version of Delete.
func (t *Transaction) DeleteMulti(keys []*Key) error {
if t.id == nil {
return errExpiredTransaction
}
mutation, err := deleteMutation(keys)
if err != nil {
return err
}
proto.Merge(t.mutation, mutation)
return nil
}
// Commit represents the result of a committed transaction.
type Commit struct{}
// Key resolves a pending key handle into a final key.
func (c *Commit) Key(p *PendingKey) *Key {
if c != p.commit {
panic("PendingKey was not created by corresponding transaction")
}
return p.key
}
// PendingKey represents the key for newly-inserted entity. It can be
// resolved into a Key by calling the Key method of Commit.
type PendingKey struct {
key *Key
commit *Commit
}