blob: 0643bae6661238d571c43797985bc31e6911e895 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TODO(rogulenko): add more tests.
package test
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"strconv"
"sync"
"testing"
"v.io/syncbase/x/ref/services/syncbase/store"
)
type operation int
const (
Put operation = 0
Delete operation = 1
)
type testStep struct {
op operation
key int
}
func randomBytes(rnd *rand.Rand, length int) []byte {
var res []byte
for i := 0; i < length; i++ {
res = append(res, '0'+byte(rnd.Intn(10)))
}
return res
}
// storeState is the in-memory representation of the store state.
type storeState struct {
// We assume that the database has keys [0..size).
size int
rnd *rand.Rand
memtable map[string][]byte
}
func newStoreState(size int) *storeState {
return &storeState{
size,
rand.New(rand.NewSource(239017)),
make(map[string][]byte),
}
}
func (s *storeState) clone() *storeState {
other := &storeState{
s.size,
s.rnd,
make(map[string][]byte),
}
for k, v := range s.memtable {
other.memtable[k] = v
}
return other
}
// nextKey returns the smallest key in the store that is not less than the
// provided key. If there is no such key, returns size.
func (s *storeState) lowerBound(key int) int {
for key < s.size {
if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
return key
}
key++
}
return key
}
// verify checks that various read operations on store.Store and memtable return
// the same results.
func (s *storeState) verify(t *testing.T, st store.StoreReader) {
var key, value []byte
var err error
// Verify Get().
for i := 0; i < s.size; i++ {
keystr := fmt.Sprintf("%05d", i)
answer, ok := s.memtable[keystr]
key = []byte(keystr)
value, err = st.Get(key, value)
if ok {
if err != nil || !bytes.Equal(value, answer) {
t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
}
} else {
if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
t.Fatalf("unexpected get error for key %q: %v", keystr, err)
}
}
}
// Verify 10 random Scan() calls.
for i := 0; i < 10; i++ {
start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
if start > end {
start, end = end, start
}
end++
stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
if err != nil {
t.Fatalf("can't create stream: %v", err)
}
for stream.Advance() {
start = s.lowerBound(start)
keystr := fmt.Sprintf("%05d", start)
key, value = stream.Key(key), stream.Value(value)
if string(key) != keystr {
t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
}
if !bytes.Equal(value, s.memtable[keystr]) {
t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
}
start++
}
if start = s.lowerBound(start); start < end {
t.Fatalf("stream ended unexpectedly")
}
}
}
// runReadWriteTest verifies read/write/snapshot operations.
func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
s := newStoreState(size)
// We verify database state no more than ~100 times to prevent the test from
// being slow.
frequency := (len(steps) + 99) / 100
var states []*storeState
var snapshots []store.Snapshot
for i, step := range steps {
if step.key < 0 || step.key >= s.size {
t.Fatalf("invalid test step %v", step)
}
switch step.op {
case Put:
key := fmt.Sprintf("%05d", step.key)
value := randomBytes(s.rnd, 100)
s.memtable[key] = value
st.Put([]byte(key), value)
case Delete:
key := fmt.Sprintf("%05d", step.key)
if _, ok := s.memtable[key]; ok {
delete(s.memtable, key)
st.Delete([]byte(key))
}
default:
t.Fatalf("invalid test step %v", step)
}
if i%frequency == 0 {
s.verify(t, st)
states = append(states, s.clone())
snapshots = append(snapshots, st.NewSnapshot())
}
}
s.verify(t, st)
for i := 0; i < len(states); i++ {
states[i].verify(t, snapshots[i])
snapshots[i].Close()
}
}
// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
// snapshots.
func RunReadWriteBasicTest(t *testing.T, st store.Store) {
runReadWriteTest(t, st, 3, []testStep{
testStep{Put, 1},
testStep{Put, 2},
testStep{Delete, 1},
testStep{Put, 1},
testStep{Put, 2},
})
}
// RunReadWriteRandomTest runs a random-generated test that verifies reads,
// writes and snapshots.
func RunReadWriteRandomTest(t *testing.T, st store.Store) {
rnd := rand.New(rand.NewSource(239017))
var testcase []testStep
size := 50
for i := 0; i < 10000; i++ {
testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
}
runReadWriteTest(t, st, size, testcase)
}
// RunTransactionsWithGetTest tests transactions that use Put and Get
// operations.
// NOTE: consider setting GOMAXPROCS to something greater than 1.
func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
// Invariant: value mapped to n is sum of values of 0..n-1.
// Each of k transactions takes m distinct random values from 0..n-1, adds 1
// to each and m to value mapped to n.
// The correctness of sums is checked after all transactions have been
// committed.
n, m, k := 10, 3, 100
for i := 0; i <= n; i++ {
if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
t.Fatalf("can't write to database")
}
}
var wg sync.WaitGroup
wg.Add(k)
// TODO(sadovsky): This configuration creates huge resource contention.
// Perhaps we should add some random sleep's to reduce the contention.
for i := 0; i < k; i++ {
go func() {
rnd := rand.New(rand.NewSource(239017 * int64(i)))
perm := rnd.Perm(n)
if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
for j := 0; j <= m; j++ {
var keystr string
if j < m {
keystr = fmt.Sprintf("%05d", perm[j])
} else {
keystr = fmt.Sprintf("%05d", n)
}
key := []byte(keystr)
val, err := st.Get(key, nil)
if err != nil {
return fmt.Errorf("can't get key %q: %v", key, err)
}
intValue, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
return fmt.Errorf("can't parse int from %q: %v", val, err)
}
var newValue int64
if j < m {
newValue = intValue + 1
} else {
newValue = intValue + int64(m)
}
if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
}
}
return nil
}); err != nil {
panic(fmt.Errorf("can't commit transaction: %v", err))
}
wg.Done()
}()
}
wg.Wait()
var sum int64
for j := 0; j <= n; j++ {
keystr := fmt.Sprintf("%05d", j)
key := []byte(keystr)
val, err := st.Get(key, nil)
if err != nil {
t.Fatalf("can't get key %q: %v", key, err)
}
intValue, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
t.Fatalf("can't parse int from %q: %v", val, err)
}
if j < n {
sum += intValue
} else {
if intValue != int64(m*k) {
t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
}
}
}
if sum != int64(m*k) {
t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
}
}