store: change Stream API and key type string->[]byte
Here are logic steps that caused this CL:
1) returning strings/slices that point to buffers allocated on C heap
as a result of Stream.Value() doesn't look like a good idea because
the stream may be cancelled concurrently but the underlying buffers
should not be freed while they are in use, so it's better to copy
buffers from C heap to Go heap;
2) to avoid multiple allocation/deallocation of buffers it should be
possible to reuse buffers when iterating through the stream, to do it
we can pass a slice that would hold key/value;
3) in order to reuse buffers you have to use []bytes, not strings, so
the type of key is changing from string to []byte
4) updating other methods to match this logic.
This change consists of two parts: store/model.go API change and updates
across the codebase to pass the tests.
Change-Id: I8a99e5ccfa36226d389b0d34a3c338da12f0d65a
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index 5c09c13..f2c5fc9 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -83,7 +83,7 @@
// Returns a VDL-compatible error.
// If you need to perform an authorization check, call Get() first.
func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
- if err := st.Delete(l.StKey()); err != nil {
+ if err := st.Delete([]byte(l.StKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -109,7 +109,7 @@
// RPC-oblivious, lower-level get/put
func GetObject(st store.StoreReader, k string, v interface{}) error {
- bytes, err := st.Get(k)
+ bytes, err := st.Get([]byte(k), nil)
if err != nil {
return err
}
@@ -121,5 +121,5 @@
if err != nil {
return err
}
- return st.Put(k, bytes)
+ return st.Put([]byte(k), bytes)
}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index bd251c3..57556c1 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -85,25 +85,25 @@
}
// Scan implements the store.StoreReader interface.
-func (db *DB) Scan(start, end string) (store.Stream, error) {
+func (db *DB) Scan(start, end []byte) (store.Stream, error) {
return newStream(db, start, end, db.readOptions), nil
}
// Get implements the store.StoreReader interface.
-func (db *DB) Get(key string) ([]byte, error) {
- return db.getWithOpts(key, db.readOptions)
+func (db *DB) Get(key, valbuf []byte) ([]byte, error) {
+ return db.getWithOpts(key, valbuf, db.readOptions)
}
// Put implements the store.StoreWriter interface.
-func (db *DB) Put(key string, v []byte) error {
+func (db *DB) Put(key, value []byte) error {
// TODO(rogulenko): improve performance.
return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
- return st.Put(key, v)
+ return st.Put(key, value)
})
}
// Delete implements the store.StoreWriter interface.
-func (db *DB) Delete(key string) error {
+func (db *DB) Delete(key []byte) error {
// TODO(rogulenko): improve performance.
return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
return st.Delete(key)
@@ -112,7 +112,7 @@
// getWithOpts returns the value for the given key.
// cOpts may contain a pointer to a snapshot.
-func (db *DB) getWithOpts(key string, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+func (db *DB) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
var cError *C.char
var valLen C.size_t
cStr, cLen := cSlice(key)
@@ -121,8 +121,8 @@
return nil, err
}
if val == nil {
- return nil, &store.ErrUnknownKey{Key: key}
+ return nil, &store.ErrUnknownKey{Key: string(key)}
}
defer C.leveldb_free(unsafe.Pointer(val))
- return C.GoBytes(unsafe.Pointer(val), C.int(valLen)), nil
+ return copyAll(valbuf, goBytes(val, valLen)), nil
}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 802e67c..eda5802 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -40,11 +40,11 @@
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end string) (store.Stream, error) {
+func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
return newStream(s.db, start, end, s.cOpts), nil
}
// Get implements the store.StoreReader interface.
-func (s *snapshot) Get(key string) ([]byte, error) {
- return s.db.getWithOpts(key, s.cOpts)
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ return s.db.getWithOpts(key, valbuf, s.cOpts)
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index 1f3baa7..f1bddf7 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -8,6 +8,7 @@
// #include "syncbase_leveldb.h"
import "C"
import (
+ "bytes"
"errors"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -19,7 +20,7 @@
// TODO(rogulenko): ensure thread safety.
type stream struct {
cIter *C.syncbase_leveldb_iterator_t
- end string
+ end []byte
advancedOnce bool
err error
@@ -27,7 +28,7 @@
var _ store.Stream = (*stream)(nil)
-func newStream(db *DB, start, end string, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(db *DB, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
cIter := C.syncbase_leveldb_create_iterator(db.cDb, cOpts, cStr, size)
return &stream{
@@ -53,7 +54,7 @@
} else {
C.syncbase_leveldb_iter_next(it.cIter)
}
- if it.cIter.is_valid != 0 && it.end > it.key() {
+ if it.cIter.is_valid != 0 && bytes.Compare(it.end, it.cKey()) > 0 {
return true
}
@@ -64,20 +65,26 @@
return false
}
-// Value implements the store.Stream interface.
-// The returned values point to buffers allocated on C heap.
-// The data is valid until the next call to Advance or Cancel.
-func (it *stream) Value() store.KeyValue {
+// Key implements the store.Stream interface.
+func (it *stream) Key(keybuf []byte) []byte {
if !it.advancedOnce {
vlog.Fatal("stream has never been advanced")
}
if it.cIter == nil {
vlog.Fatal("illegal state")
}
- return store.KeyValue{
- Key: it.key(),
- Value: it.val(),
+ return copyAll(keybuf, it.cKey())
+}
+
+// Value implements the store.Stream interface.
+func (it *stream) Value(valbuf []byte) []byte {
+ if !it.advancedOnce {
+ vlog.Fatal("stream has never been advanced")
}
+ if it.cIter == nil {
+ vlog.Fatal("illegal state")
+ }
+ return copyAll(valbuf, it.cVal())
}
// Err implements the store.Stream interface.
@@ -90,18 +97,18 @@
if it.cIter == nil {
return
}
- it.err = errors.New("cancelled")
+ it.err = errors.New("canceled")
it.destroyLeveldbIter()
}
-// key returns the key. The key points to buffer allocated on C heap.
+// cKey returns the key. The key points to buffer allocated on C heap.
// The data is valid until the next call to Advance or Cancel.
-func (it *stream) key() string {
- return goString(it.cIter.key, it.cIter.key_len)
+func (it *stream) cKey() []byte {
+ return goBytes(it.cIter.key, it.cIter.key_len)
}
-// val returns the value. The value points to buffer allocated on C heap.
+// cVal returns the value. The value points to buffer allocated on C heap.
// The data is valid until the next call to Advance or Cancel.
-func (it *stream) val() []byte {
+func (it *stream) cVal() []byte {
return goBytes(it.cIter.val, it.cIter.val_len)
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 3a9f4b2..6aa079d 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -69,25 +69,25 @@
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end string) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
return tx.snapshot.Scan(start, end)
}
// Get implements the store.StoreReader interface.
-func (tx *transaction) Get(key string) ([]byte, error) {
- return tx.snapshot.Get(key)
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ return tx.snapshot.Get(key, valbuf)
}
// Put implements the store.StoreWriter interface.
-func (tx *transaction) Put(key string, v []byte) error {
+func (tx *transaction) Put(key, value []byte) error {
cKey, cKeyLen := cSlice(key)
- cVal, cValLen := cSliceFromBytes(v)
+ cVal, cValLen := cSlice(value)
C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
return nil
}
// Delete implements the store.StoreWriter interface.
-func (tx *transaction) Delete(key string) error {
+func (tx *transaction) Delete(key []byte) error {
cKey, cKeyLen := cSlice(key)
C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
return nil
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
index d7b9717..333743f 100644
--- a/services/syncbase/store/leveldb/util.go
+++ b/services/syncbase/store/leveldb/util.go
@@ -22,11 +22,11 @@
return err
}
-// cSlice converts Go string to C string without copying the data.
+// cSlice converts Go []byte to C string without copying the data.
//
// This function behaves similarly to standard Go slice copying or sub-slicing;
// the caller need not worry about ownership or garbage collection.
-func cSlice(str string) (*C.char, C.size_t) {
+func cSlice(str []byte) (*C.char, C.size_t) {
if len(str) == 0 {
return nil, 0
}
@@ -34,26 +34,6 @@
return (*C.char)(data), C.size_t(len(str))
}
-// cSliceFromBytes converts Go []byte to C string without copying the data.
-// This function behaves similarly to cSlice.
-func cSliceFromBytes(bytes []byte) (*C.char, C.size_t) {
- if len(bytes) == 0 {
- return nil, 0
- }
- data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&bytes)).Data)
- return (*C.char)(data), C.size_t(len(bytes))
-}
-
-// goString converts C string to Go string without copying the data.
-// This function behaves similarly to cSlice.
-func goString(str *C.char, size C.size_t) string {
- ptr := unsafe.Pointer(&reflect.StringHeader{
- Data: uintptr(unsafe.Pointer(str)),
- Len: int(size),
- })
- return *(*string)(ptr)
-}
-
// goBytes converts C string to Go []byte without copying the data.
// This function behaves similarly to cSlice.
func goBytes(str *C.char, size C.size_t) []byte {
@@ -64,3 +44,18 @@
})
return *(*[]byte)(ptr)
}
+
+// copyAll copies elements from a source slice into a destination slice.
+// The returned slice may be a sub-slice of dst if dst was large enough to hold
+// src. Otherwise, a newly allocated slice will be returned.
+func copyAll(dst, src []byte) []byte {
+ if cap(dst) < len(src) {
+ newlen := cap(dst)*2 + 2
+ if newlen < len(src) {
+ newlen = len(src)
+ }
+ dst = make([]byte, newlen)
+ }
+ copy(dst, src)
+ return dst[:len(src)]
+}
diff --git a/services/syncbase/store/memstore/memstore.go b/services/syncbase/store/memstore/memstore.go
index ebd9b2a..99c551e 100644
--- a/services/syncbase/store/memstore/memstore.go
+++ b/services/syncbase/store/memstore/memstore.go
@@ -83,43 +83,43 @@
return nil
}
-func (tx *transaction) Scan(start, end string) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
vlog.Fatal("not implemented")
return nil, nil
}
-func (tx *transaction) Get(k string) ([]byte, error) {
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
tx.st.mu.Lock()
defer tx.st.mu.Unlock()
if err := tx.checkError(); err != nil {
return nil, err
}
- v, ok := tx.st.data[k]
+ v, ok := tx.st.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: k}
+ return nil, &store.ErrUnknownKey{Key: string(key)}
}
return v, nil
}
-func (tx *transaction) Put(k string, v []byte) error {
+func (tx *transaction) Put(key, value []byte) error {
tx.st.mu.Lock()
defer tx.st.mu.Unlock()
if err := tx.checkError(); err != nil {
return err
}
- delete(tx.deletes, k)
- tx.puts[k] = v
+ delete(tx.deletes, string(key))
+ tx.puts[string(key)] = value
return nil
}
-func (tx *transaction) Delete(k string) error {
+func (tx *transaction) Delete(key []byte) error {
tx.st.mu.Lock()
defer tx.st.mu.Unlock()
if err := tx.checkError(); err != nil {
return err
}
- delete(tx.puts, k)
- tx.deletes[k] = struct{}{}
+ delete(tx.puts, string(key))
+ tx.deletes[string(key)] = struct{}{}
return nil
}
@@ -163,30 +163,30 @@
////////////////////////////////////////
// memstore methods
-func (st *memstore) Scan(start, end string) (store.Stream, error) {
+func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
vlog.Fatal("not implemented")
return nil, nil
}
-func (st *memstore) Get(k string) ([]byte, error) {
+func (st *memstore) Get(key, valbuf []byte) ([]byte, error) {
st.mu.Lock()
defer st.mu.Unlock()
- v, ok := st.data[k]
+ v, ok := st.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: k}
+ return nil, &store.ErrUnknownKey{Key: string(key)}
}
return v, nil
}
-func (st *memstore) Put(k string, v []byte) error {
+func (st *memstore) Put(key, value []byte) error {
return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(k, v)
+ return st.Put(key, value)
})
}
-func (st *memstore) Delete(k string) error {
+func (st *memstore) Delete(key []byte) error {
return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(k)
+ return st.Delete(key)
})
}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 5fda7e2..ee02ac3 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -9,21 +9,24 @@
// StoreReader reads data from a CRUD-capable storage engine.
type StoreReader interface {
// Scan returns all rows with keys in range [start, end).
- Scan(start, end string) (Stream, error)
+ Scan(start, end []byte) (Stream, error)
- // Get returns the value for the given key.
+ // Get returns the value for the given key. The returned slice may be a
+ // sub-slice of valbuf if valbuf was large enough to hold the entire value.
+ // Otherwise, a newly allocated slice will be returned. It is valid to pass
+ // a nil valbuf.
// Fails if the given key is unknown (ErrUnknownKey).
- Get(key string) ([]byte, error)
+ Get(key, valbuf []byte) ([]byte, error)
}
// StoreWriter writes data to a CRUD-capable storage engine.
type StoreWriter interface {
// Put writes the given value for the given key.
- Put(key string, value []byte) error
+ Put(key, value []byte) error
// Delete deletes the entry for the given key.
// Succeeds (no-op) if the given key is unknown.
- Delete(key string) error
+ Delete(key []byte) error
}
// StoreReadWriter combines StoreReader and StoreWriter.
@@ -83,25 +86,30 @@
Close() error
}
-// KeyValue is a wrapper for the key and value from a single row.
-type KeyValue struct {
- Key string
- Value []byte
-}
-
// Stream is an interface for iterating through a collection of key-value pairs.
type Stream interface {
- // Advance stages an element so the client can retrieve it with Value. Advance
- // returns true iff there is an element to retrieve. The client must call
- // Advance before calling Value. The client must call Cancel if it does not
- // iterate through all elements (i.e. until Advance returns false). Advance
- // may block if an element is not immediately available.
+ // Advance stages an element so the client can retrieve it with Key or Value.
+ // Advance returns true iff there is an element to retrieve. The client must
+ // call Advance before calling Key or Value. The client must call Cancel if it
+ // does not iterate through all elements (i.e. until Advance returns false).
+ // Advance may block if an element is not immediately available.
Advance() bool
- // Value returns the element that was staged by Advance. Value may panic if
- // Advance returned false or was not called at all. Value does not block.
- // The data is valid until the next call to Advance or Cancel.
- Value() KeyValue
+ // Key returns the key of the element that was staged by Advance. The returned
+ // slice may be a sub-slice of keybuf if keybuf was large enough to hold the
+ // entire key. Otherwise, a newly allocated slice will be returned. It is
+ // valid to pass a nil keybuf.
+ // Key may panic if Advance returned false or was not called at all.
+ // Key does not block.
+ Key(keybuf []byte) []byte
+
+ // Value returns the value of the element that was staged by Advance. The
+ // returned slice may be a sub-slice of valbuf if valbuf was large enough to
+ // hold the entire value. Otherwise, a newly allocated slice will be returned.
+ // It is valid to pass a nil valbuf.
+ // Value may panic if Advance returned false or was not called at all.
+ // Value does not block.
+ Value(valbuf []byte) []byte
// Err returns a non-nil error iff the stream encountered any errors. Err does
// not block.
@@ -111,8 +119,8 @@
// Cancel notifies the stream provider that it can stop producing elements.
// The client must call Cancel if it does not iterate through all elements
// (i.e. until Advance returns false). Cancel is idempotent and can be called
- // concurrently with a goroutine that is iterating via Advance/Value. Cancel
- // causes Advance to subsequently return false. Cancel does not block.
+ // concurrently with a goroutine that is iterating via Advance/Key/Value.
+ // Cancel causes Advance to subsequently return false. Cancel does not block.
Cancel()
}
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
index fc80ee9..6f46d50 100644
--- a/services/syncbase/store/test/test.go
+++ b/services/syncbase/store/test/test.go
@@ -82,18 +82,21 @@
// verify ensures that various read operation on store.Store and memtable
// return the same result.
func (s *dbState) verify(t *testing.T, st store.StoreReader) {
+ var key, value []byte
+ var err error
// Verify Get().
for i := 0; i < s.dbSize; i++ {
- key := fmt.Sprintf("%05d", i)
- answer, ok := s.memtable[key]
- value, err := st.Get(key)
+ keystr := fmt.Sprintf("%05d", i)
+ answer, ok := s.memtable[keystr]
+ key = []byte(keystr)
+ value, err = st.Get(key, value)
if ok {
if err != nil || !bytes.Equal(value, answer) {
- t.Fatalf("unexpected get result for %v: got {%#v: %#v}, want {%#v: nil}", key, value, err, answer)
+ t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
}
} else {
- if !reflect.DeepEqual(&store.ErrUnknownKey{Key: key}, err) {
- t.Fatalf("unexpected get error for key %v: %v", key, err)
+ if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
+ t.Fatalf("unexpected get error for key %q: %v", keystr, err)
}
}
}
@@ -104,19 +107,19 @@
start, end = end, start
}
end++
- stream, err := st.Scan(fmt.Sprintf("%05d", start), fmt.Sprintf("%05d", end))
+ stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
if err != nil {
t.Fatalf("can't create stream: %v", err)
}
for stream.Advance() {
start = s.lowerBound(start)
- key := fmt.Sprintf("%05d", start)
- kv := stream.Value()
- if kv.Key != key {
- t.Fatalf("unexpected key during scan: got %s, want %s", kv.Key, key)
+ keystr := fmt.Sprintf("%05d", start)
+ key, value = stream.Key(key), stream.Value(value)
+ if string(key) != keystr {
+ t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
}
- if !bytes.Equal(kv.Value, s.memtable[key]) {
- t.Fatalf("unexpected value during scan: got %s, want %s", kv.Value, s.memtable[key])
+ if !bytes.Equal(value, s.memtable[keystr]) {
+ t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
}
start++
}
@@ -143,12 +146,12 @@
key := fmt.Sprintf("%05d", step.key)
value := randomBytes(s.rnd, 100)
s.memtable[key] = value
- st.Put(key, value)
+ st.Put([]byte(key), value)
case Delete:
key := fmt.Sprintf("%05d", step.key)
if _, ok := s.memtable[key]; ok {
delete(s.memtable, key)
- st.Delete(key)
+ st.Delete([]byte(key))
}
default:
t.Fatalf("invalid test step %v", step)
@@ -197,11 +200,10 @@
// Invariant: value mapped to n stores sum of values of 0..n-1.
// Each of k transactions takes m distinct random values from 0..n-1,
// adds 1 to each and m to value mapped to n.
- // The correctness of sums is checked after all transactions are
- // commited.
+ // The correctness of sums is checked after all transactions are committed.
n, m, k := 10, 3, 100
for i := 0; i <= n; i++ {
- if err := st.Put(fmt.Sprintf("%05d", i), []byte{'0'}); err != nil {
+ if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
t.Fatalf("can't write to database")
}
}
@@ -213,19 +215,20 @@
perm := rnd.Perm(n)
if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
for j := 0; j <= m; j++ {
- var key string
+ var keystr string
if j < m {
- key = fmt.Sprintf("%05d", perm[j])
+ keystr = fmt.Sprintf("%05d", perm[j])
} else {
- key = fmt.Sprintf("%05d", n)
+ keystr = fmt.Sprintf("%05d", n)
}
- val, err := st.Get(key)
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
if err != nil {
- return fmt.Errorf("can't get key %s: %v", key, err)
+ return fmt.Errorf("can't get key %q: %v", key, err)
}
intValue, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
- return fmt.Errorf("can't parse int from %s: %v", val, err)
+ return fmt.Errorf("can't parse int from %q: %v", val, err)
}
var newValue int64
if j < m {
@@ -234,7 +237,7 @@
newValue = intValue + int64(m)
}
if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
- return fmt.Errorf("can't put {%#v: %#v}: %v", key, newValue, err)
+ return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
}
}
return nil
@@ -247,14 +250,15 @@
wg.Wait()
var sum int64
for j := 0; j <= n; j++ {
- key := fmt.Sprintf("%05d", j)
- val, err := st.Get(key)
+ keystr := fmt.Sprintf("%05d", j)
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
if err != nil {
- t.Fatalf("can't get key %s: %v", key, err)
+ t.Fatalf("can't get key %q: %v", key, err)
}
intValue, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
- t.Fatalf("can't parse int from %s: %v", val, err)
+ t.Fatalf("can't parse int from %q: %v", val, err)
}
if j < n {
sum += intValue