| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package leveldb provides a LevelDB-based implementation of store.Store. |
| package leveldb |
| |
| // #cgo LDFLAGS: -lleveldb -lsnappy |
| // #include <stdlib.h> |
| // #include "leveldb/c.h" |
| // #include "syncbase_leveldb.h" |
| import "C" |
| import ( |
| "container/list" |
| "fmt" |
| "sync" |
| "unsafe" |
| |
| "v.io/syncbase/x/ref/services/syncbase/store" |
| "v.io/v23/verror" |
| ) |
| |
| // db is a wrapper around LevelDB that implements the store.Store interface. |
| type db struct { |
| // mu protects the state of the db. |
| mu sync.RWMutex |
| node *store.ResourceNode |
| cDb *C.leveldb_t |
| // Default read/write options. |
| readOptions *C.leveldb_readoptions_t |
| writeOptions *C.leveldb_writeoptions_t |
| err error |
| |
| // txmu protects the transaction-related variables below, and is also held |
| // during transaction commits. It must always be acquired before mu. |
| txmu sync.Mutex |
| // txEvents is a queue of create/commit transaction events. |
| txEvents *list.List |
| txSequenceNumber uint64 |
| // txTable is a set of keys written by recent transactions. This set |
| // includes all write sets of transactions committed after the oldest living |
| // (in-flight) transaction. |
| txTable *trie |
| } |
| |
| var _ store.Store = (*db)(nil) |
| |
| type OpenOptions struct { |
| CreateIfMissing bool |
| ErrorIfExists bool |
| } |
| |
| // Open opens the database located at the given path. |
| func Open(path string, opts OpenOptions) (store.Store, error) { |
| var cError *C.char |
| cPath := C.CString(path) |
| defer C.free(unsafe.Pointer(cPath)) |
| |
| var cOptsCreateIfMissing, cOptsErrorIfExists C.uchar |
| if opts.CreateIfMissing { |
| cOptsCreateIfMissing = 1 |
| } |
| if opts.ErrorIfExists { |
| cOptsErrorIfExists = 1 |
| } |
| |
| cOpts := C.leveldb_options_create() |
| C.leveldb_options_set_create_if_missing(cOpts, cOptsCreateIfMissing) |
| C.leveldb_options_set_error_if_exists(cOpts, cOptsErrorIfExists) |
| C.leveldb_options_set_paranoid_checks(cOpts, 1) |
| defer C.leveldb_options_destroy(cOpts) |
| |
| cDb := C.leveldb_open(cOpts, cPath, &cError) |
| if err := goError(cError); err != nil { |
| return nil, err |
| } |
| readOptions := C.leveldb_readoptions_create() |
| C.leveldb_readoptions_set_verify_checksums(readOptions, 1) |
| return &db{ |
| node: store.NewResourceNode(), |
| cDb: cDb, |
| readOptions: readOptions, |
| writeOptions: C.leveldb_writeoptions_create(), |
| txEvents: list.New(), |
| txTable: newTrie(), |
| }, nil |
| } |
| |
| // Close implements the store.Store interface. |
| func (d *db) Close() error { |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| if d.err != nil { |
| return convertError(d.err) |
| } |
| d.node.Close() |
| C.leveldb_close(d.cDb) |
| d.cDb = nil |
| C.leveldb_readoptions_destroy(d.readOptions) |
| d.readOptions = nil |
| C.leveldb_writeoptions_destroy(d.writeOptions) |
| d.writeOptions = nil |
| d.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore) |
| return nil |
| } |
| |
| // Destroy removes all physical data of the database located at the given path. |
| func Destroy(path string) error { |
| var cError *C.char |
| cPath := C.CString(path) |
| defer C.free(unsafe.Pointer(cPath)) |
| cOpts := C.leveldb_options_create() |
| defer C.leveldb_options_destroy(cOpts) |
| C.leveldb_destroy_db(cOpts, cPath, &cError) |
| return goError(cError) |
| } |
| |
| // Get implements the store.StoreReader interface. |
| func (d *db) Get(key, valbuf []byte) ([]byte, error) { |
| return d.getWithOpts(key, valbuf, d.readOptions) |
| } |
| |
| // Scan implements the store.StoreReader interface. |
| func (d *db) Scan(start, limit []byte) store.Stream { |
| d.mu.RLock() |
| defer d.mu.RUnlock() |
| if d.err != nil { |
| return &store.InvalidStream{d.err} |
| } |
| return newStream(d, d.node, start, limit, d.readOptions) |
| } |
| |
| // Put implements the store.StoreWriter interface. |
| func (d *db) Put(key, value []byte) error { |
| write := store.WriteOp{ |
| T: store.PutOp, |
| Key: key, |
| Value: value, |
| } |
| return d.write([]store.WriteOp{write}, d.writeOptions) |
| } |
| |
| // Delete implements the store.StoreWriter interface. |
| func (d *db) Delete(key []byte) error { |
| write := store.WriteOp{ |
| T: store.DeleteOp, |
| Key: key, |
| } |
| return d.write([]store.WriteOp{write}, d.writeOptions) |
| } |
| |
| // NewTransaction implements the store.Store interface. |
| func (d *db) NewTransaction() store.Transaction { |
| d.txmu.Lock() |
| defer d.txmu.Unlock() |
| d.mu.RLock() |
| defer d.mu.RUnlock() |
| if d.err != nil { |
| return &store.InvalidTransaction{d.err} |
| } |
| return newTransaction(d, d.node) |
| } |
| |
| // NewSnapshot implements the store.Store interface. |
| func (d *db) NewSnapshot() store.Snapshot { |
| d.mu.RLock() |
| defer d.mu.RUnlock() |
| if d.err != nil { |
| return &store.InvalidSnapshot{Error: d.err} |
| } |
| return newSnapshot(d, d.node) |
| } |
| |
| // write writes a batch and adds all written keys to txTable. |
| // TODO(rogulenko): remove this method. |
| func (d *db) write(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error { |
| d.txmu.Lock() |
| defer d.txmu.Unlock() |
| return d.writeLocked(batch, cOpts) |
| } |
| |
| // writeLocked is like write(), but it assumes txmu is held. |
| func (d *db) writeLocked(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error { |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| if d.err != nil { |
| return d.err |
| } |
| cBatch := C.leveldb_writebatch_create() |
| defer C.leveldb_writebatch_destroy(cBatch) |
| for _, write := range batch { |
| switch write.T { |
| case store.PutOp: |
| cKey, cKeyLen := cSlice(write.Key) |
| cVal, cValLen := cSlice(write.Value) |
| C.leveldb_writebatch_put(cBatch, cKey, cKeyLen, cVal, cValLen) |
| case store.DeleteOp: |
| cKey, cKeyLen := cSlice(write.Key) |
| C.leveldb_writebatch_delete(cBatch, cKey, cKeyLen) |
| default: |
| panic(fmt.Sprintf("unknown write operation type: %v", write.T)) |
| } |
| } |
| var cError *C.char |
| C.leveldb_write(d.cDb, cOpts, cBatch, &cError) |
| if err := goError(cError); err != nil { |
| return err |
| } |
| if d.txEvents.Len() == 0 { |
| return nil |
| } |
| d.trackBatch(batch) |
| return nil |
| } |
| |
| // trackBatch writes the batch to txTable and adds a commit event to txEvents. |
| func (d *db) trackBatch(batch []store.WriteOp) { |
| // TODO(rogulenko): do GC. |
| d.txSequenceNumber++ |
| seq := d.txSequenceNumber |
| var keys [][]byte |
| for _, write := range batch { |
| d.txTable.add(write.Key, seq) |
| keys = append(keys, write.Key) |
| } |
| tx := &commitedTransaction{ |
| seq: seq, |
| batch: keys, |
| } |
| d.txEvents.PushBack(tx) |
| } |
| |
| // getWithOpts returns the value for the given key. |
| // cOpts may contain a pointer to a snapshot. |
| func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) { |
| d.mu.RLock() |
| defer d.mu.RUnlock() |
| if d.err != nil { |
| return valbuf, convertError(d.err) |
| } |
| var cError *C.char |
| var valLen C.size_t |
| cStr, cLen := cSlice(key) |
| val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError) |
| if err := goError(cError); err != nil { |
| return valbuf, err |
| } |
| if val == nil { |
| return valbuf, verror.New(store.ErrUnknownKey, nil, string(key)) |
| } |
| defer C.leveldb_free(unsafe.Pointer(val)) |
| return store.CopyBytes(valbuf, goBytes(val, valLen)), nil |
| } |