blob: b8093f6cc263873a1783b7d796c4ff113d2d2aa8 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package leveldb
// #include "leveldb/c.h"
import "C"
import (
"bytes"
"container/list"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/verror"
"v.io/x/lib/vlog"
)
// commitedTransaction is only used as an element of db.txEvents.
type commitedTransaction struct {
seq uint64
batch [][]byte
}
// transaction is a wrapper around LevelDB WriteBatch that implements
// the store.Transaction interface.
type transaction struct {
// mu protects the state of the transaction.
mu sync.Mutex
node *store.ResourceNode
d *db
seq uint64
event *list.Element // pointer to element of db.txEvents
snapshot store.Snapshot
reads store.ReadSet
writes []store.WriteOp
cOpts *C.leveldb_writeoptions_t
err error
}
var _ store.Transaction = (*transaction)(nil)
func newTransaction(d *db, parent *store.ResourceNode) *transaction {
node := store.NewResourceNode()
snapshot := newSnapshot(d, node)
tx := &transaction{
node: node,
d: d,
snapshot: snapshot,
seq: d.txSequenceNumber,
cOpts: d.writeOptions,
}
tx.event = d.txEvents.PushFront(tx)
parent.AddChild(tx.node, func() {
tx.Abort()
})
return tx
}
// close frees allocated C objects and releases acquired locks.
// Assumes mu is held.
func (tx *transaction) close() {
tx.removeEvent()
tx.node.Close()
if tx.cOpts != tx.d.writeOptions {
C.leveldb_writeoptions_destroy(tx.cOpts)
}
tx.cOpts = nil
}
// removeEvent removes this transaction from the db.txEvents queue.
// Assumes mu is held.
func (tx *transaction) removeEvent() {
// This can happen if the transaction was committed, since Commit()
// explicitly calls removeEvent().
if tx.event == nil {
return
}
tx.d.txmu.Lock()
tx.d.txEvents.Remove(tx.event)
tx.d.txmu.Unlock()
tx.event = nil
}
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return valbuf, convertError(tx.err)
}
tx.reads.Keys = append(tx.reads.Keys, key)
// Reflect the state of the transaction: the "writes" (puts and
// deletes) override the values in the transaction snapshot.
// Find the last "writes" entry for this key, if one exists.
// Note: this step could be optimized by using maps (puts and
// deletes) instead of an array.
for i := len(tx.writes) - 1; i >= 0; i-- {
op := &tx.writes[i]
if bytes.Equal(op.Key, key) {
if op.T == store.PutOp {
return op.Value, nil
}
return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
}
}
return tx.snapshot.Get(key, valbuf)
}
// Scan implements the store.StoreReader interface.
func (tx *transaction) Scan(start, limit []byte) store.Stream {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return &store.InvalidStream{Error: tx.err}
}
tx.reads.Ranges = append(tx.reads.Ranges, store.ScanRange{
Start: start,
Limit: limit,
})
// Return a stream which merges the snaphot stream with the uncommitted changes.
return store.MergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
}
// Put implements the store.StoreWriter interface.
func (tx *transaction) Put(key, value []byte) error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return convertError(tx.err)
}
tx.writes = append(tx.writes, store.WriteOp{
T: store.PutOp,
Key: key,
Value: value,
})
return nil
}
// Delete implements the store.StoreWriter interface.
func (tx *transaction) Delete(key []byte) error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return convertError(tx.err)
}
tx.writes = append(tx.writes, store.WriteOp{
T: store.DeleteOp,
Key: key,
})
return nil
}
// validateReadSet returns true iff the read set of this transaction has not
// been invalidated by other transactions.
// Assumes tx.d.txmu is held.
func (tx *transaction) validateReadSet() bool {
for _, key := range tx.reads.Keys {
if tx.d.txTable.get(key) > tx.seq {
vlog.VI(3).Infof("key conflict: %q", key)
return false
}
}
for _, r := range tx.reads.Ranges {
if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
return false
}
}
return true
}
// Commit implements the store.Transaction interface.
func (tx *transaction) Commit() error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return convertError(tx.err)
}
tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
// Explicitly remove this transaction from the event queue. If this was the
// only active transaction, the event queue becomes empty and writeLocked will
// not add this transaction's write set to txTable.
tx.removeEvent()
defer tx.close()
tx.d.txmu.Lock()
defer tx.d.txmu.Unlock()
if !tx.validateReadSet() {
return store.NewErrConcurrentTransaction(nil)
}
return tx.d.writeLocked(tx.writes, tx.cOpts)
}
// Abort implements the store.Transaction interface.
func (tx *transaction) Abort() error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
return convertError(tx.err)
}
tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
tx.close()
return nil
}