blob: 67f253cc5b875f8b85133856f568e0311b8129cc [file] [log] [blame]
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07005// Package leveldb provides a LevelDB-based implementation of store.Store.
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07006package leveldb
7
Sergey Rogulenko120385a2015-05-18 14:47:55 -07008// #cgo LDFLAGS: -lleveldb -lsnappy
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07009// #include <stdlib.h>
10// #include "leveldb/c.h"
11// #include "syncbase_leveldb.h"
12import "C"
13import (
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070014 "container/list"
15 "fmt"
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070016 "sync"
17 "unsafe"
18
19 "v.io/syncbase/x/ref/services/syncbase/store"
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070020 "v.io/v23/verror"
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070021)
22
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070023// db is a wrapper around LevelDB that implements the store.Store interface.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070024type db struct {
Sergey Rogulenkodef3b302015-05-20 17:33:24 -070025 // mu protects the state of the db.
26 mu sync.RWMutex
Sergey Rogulenko95baa662015-05-22 15:07:06 -070027 node *store.ResourceNode
Sergey Rogulenkodef3b302015-05-20 17:33:24 -070028 cDb *C.leveldb_t
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070029 // Default read/write options.
30 readOptions *C.leveldb_readoptions_t
31 writeOptions *C.leveldb_writeoptions_t
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070032 err error
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070033
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070034 // txmu protects the transaction-related variables below, and is also held
35 // during transaction commits. It must always be acquired before mu.
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070036 txmu sync.Mutex
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070037 // txEvents is a queue of create/commit transaction events.
38 txEvents *list.List
39 txSequenceNumber uint64
40 // txTable is a set of keys written by recent transactions. This set
41 // includes all write sets of transactions committed after the oldest living
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070042 // (in-flight) transaction.
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070043 txTable *trie
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070044}
45
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070046var _ store.Store = (*db)(nil)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070047
48// Open opens the database located at the given path, creating it if it doesn't
49// exist.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070050func Open(path string) (store.Store, error) {
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070051 var cError *C.char
52 cPath := C.CString(path)
53 defer C.free(unsafe.Pointer(cPath))
54
55 cOpts := C.leveldb_options_create()
56 C.leveldb_options_set_create_if_missing(cOpts, 1)
57 C.leveldb_options_set_paranoid_checks(cOpts, 1)
58 defer C.leveldb_options_destroy(cOpts)
59
60 cDb := C.leveldb_open(cOpts, cPath, &cError)
61 if err := goError(cError); err != nil {
62 return nil, err
63 }
64 readOptions := C.leveldb_readoptions_create()
65 C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070066 return &db{
Sergey Rogulenko95baa662015-05-22 15:07:06 -070067 node: store.NewResourceNode(),
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070068 cDb: cDb,
69 readOptions: readOptions,
70 writeOptions: C.leveldb_writeoptions_create(),
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070071 txEvents: list.New(),
72 txTable: newTrie(),
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070073 }, nil
74}
75
76// Close implements the store.Store interface.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070077func (d *db) Close() error {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070078 d.mu.Lock()
79 defer d.mu.Unlock()
80 if d.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070081 return convertError(d.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070082 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -070083 d.node.Close()
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070084 C.leveldb_close(d.cDb)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070085 d.cDb = nil
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070086 C.leveldb_readoptions_destroy(d.readOptions)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070087 d.readOptions = nil
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070088 C.leveldb_writeoptions_destroy(d.writeOptions)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070089 d.writeOptions = nil
Adam Sadovsky8db74432015-05-29 17:37:32 -070090 d.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070091 return nil
92}
93
94// Destroy removes all physical data of the database located at the given path.
95func Destroy(path string) error {
96 var cError *C.char
97 cPath := C.CString(path)
98 defer C.free(unsafe.Pointer(cPath))
99 cOpts := C.leveldb_options_create()
100 defer C.leveldb_options_destroy(cOpts)
101 C.leveldb_destroy_db(cOpts, cPath, &cError)
102 return goError(cError)
103}
104
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700105// Get implements the store.StoreReader interface.
106func (d *db) Get(key, valbuf []byte) ([]byte, error) {
107 return d.getWithOpts(key, valbuf, d.readOptions)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700108}
109
110// Scan implements the store.StoreReader interface.
Adam Sadovskyf437f332015-05-19 23:03:22 -0700111func (d *db) Scan(start, limit []byte) store.Stream {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700112 d.mu.RLock()
113 defer d.mu.RUnlock()
114 if d.err != nil {
115 return &store.InvalidStream{d.err}
116 }
Sergey Rogulenkodef3b302015-05-20 17:33:24 -0700117 return newStream(d, d.node, start, limit, d.readOptions)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700118}
119
120// Put implements the store.StoreWriter interface.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700121func (d *db) Put(key, value []byte) error {
John Kline18834bd2015-06-26 10:07:46 -0700122 write := store.WriteOp{
123 T: store.PutOp,
124 Key: key,
125 Value: value,
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700126 }
John Kline18834bd2015-06-26 10:07:46 -0700127 return d.write([]store.WriteOp{write}, d.writeOptions)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700128}
129
130// Delete implements the store.StoreWriter interface.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700131func (d *db) Delete(key []byte) error {
John Kline18834bd2015-06-26 10:07:46 -0700132 write := store.WriteOp{
133 T: store.DeleteOp,
134 Key: key,
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700135 }
John Kline18834bd2015-06-26 10:07:46 -0700136 return d.write([]store.WriteOp{write}, d.writeOptions)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700137}
138
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700139// NewTransaction implements the store.Store interface.
140func (d *db) NewTransaction() store.Transaction {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700141 d.txmu.Lock()
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700142 defer d.txmu.Unlock()
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700143 d.mu.RLock()
144 defer d.mu.RUnlock()
145 if d.err != nil {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700146 return &store.InvalidTransaction{d.err}
147 }
Sergey Rogulenkodef3b302015-05-20 17:33:24 -0700148 return newTransaction(d, d.node)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700149}
150
151// NewSnapshot implements the store.Store interface.
152func (d *db) NewSnapshot() store.Snapshot {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700153 d.mu.RLock()
154 defer d.mu.RUnlock()
155 if d.err != nil {
156 return &store.InvalidSnapshot{d.err}
157 }
Sergey Rogulenkodef3b302015-05-20 17:33:24 -0700158 return newSnapshot(d, d.node)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700159}
160
Adam Sadovsky8db74432015-05-29 17:37:32 -0700161// write writes a batch and adds all written keys to txTable.
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700162// TODO(rogulenko): remove this method.
John Kline18834bd2015-06-26 10:07:46 -0700163func (d *db) write(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700164 d.txmu.Lock()
165 defer d.txmu.Unlock()
166 return d.writeLocked(batch, cOpts)
167}
168
169// writeLocked is like write(), but it assumes txmu is held.
John Kline18834bd2015-06-26 10:07:46 -0700170func (d *db) writeLocked(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700171 d.mu.Lock()
172 defer d.mu.Unlock()
173 if d.err != nil {
174 return d.err
175 }
176 cBatch := C.leveldb_writebatch_create()
177 defer C.leveldb_writebatch_destroy(cBatch)
178 for _, write := range batch {
John Kline18834bd2015-06-26 10:07:46 -0700179 switch write.T {
180 case store.PutOp:
181 cKey, cKeyLen := cSlice(write.Key)
182 cVal, cValLen := cSlice(write.Value)
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700183 C.leveldb_writebatch_put(cBatch, cKey, cKeyLen, cVal, cValLen)
John Kline18834bd2015-06-26 10:07:46 -0700184 case store.DeleteOp:
185 cKey, cKeyLen := cSlice(write.Key)
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700186 C.leveldb_writebatch_delete(cBatch, cKey, cKeyLen)
187 default:
John Kline18834bd2015-06-26 10:07:46 -0700188 panic(fmt.Sprintf("unknown write operation type: %v", write.T))
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700189 }
190 }
191 var cError *C.char
192 C.leveldb_write(d.cDb, cOpts, cBatch, &cError)
193 if err := goError(cError); err != nil {
194 return err
195 }
196 if d.txEvents.Len() == 0 {
197 return nil
198 }
199 d.trackBatch(batch)
200 return nil
201}
202
Adam Sadovsky8db74432015-05-29 17:37:32 -0700203// trackBatch writes the batch to txTable and adds a commit event to txEvents.
John Kline18834bd2015-06-26 10:07:46 -0700204func (d *db) trackBatch(batch []store.WriteOp) {
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700205 // TODO(rogulenko): do GC.
206 d.txSequenceNumber++
207 seq := d.txSequenceNumber
208 var keys [][]byte
209 for _, write := range batch {
John Kline18834bd2015-06-26 10:07:46 -0700210 d.txTable.add(write.Key, seq)
211 keys = append(keys, write.Key)
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700212 }
213 tx := &commitedTransaction{
214 seq: seq,
215 batch: keys,
216 }
217 d.txEvents.PushBack(tx)
218}
219
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700220// getWithOpts returns the value for the given key.
221// cOpts may contain a pointer to a snapshot.
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700222func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700223 d.mu.RLock()
224 defer d.mu.RUnlock()
225 if d.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700226 return valbuf, convertError(d.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700227 }
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700228 var cError *C.char
229 var valLen C.size_t
230 cStr, cLen := cSlice(key)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700231 val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700232 if err := goError(cError); err != nil {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700233 return valbuf, err
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700234 }
235 if val == nil {
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -0700236 return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700237 }
238 defer C.leveldb_free(unsafe.Pointer(val))
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700239 return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700240}