TBR syncbase: fix mem ownership bug in watchable txn
(Submitting TBR because Ivan +1'ed but didn't +2 before his vacation.)
watchable.Transaction should've made defensive copies of mutable inputs since it
holds (and dereferences) references to them after the various methods (get, put,
etc.) return.
Change-Id: Icd025a2ae4a4121b919ba8b1ae14c431b9683ef7
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
index 5ae2562..07b73c4 100644
--- a/services/syncbase/server/watchable/store_test.go
+++ b/services/syncbase/server/watchable/store_test.go
@@ -61,14 +61,8 @@
runTest(t, nil, test.RunTransactionsWithGetTest)
}
-// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
-// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
-// relatively expensive since the entire data map is copied. LevelDB snapshots
-// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
-const useMemstore = false
-
func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
- st, destroy := createStore(useMemstore)
+ st, destroy := createStore()
defer destroy()
st, err := Wrap(st, &Options{ManagedPrefixes: mp})
if err != nil {
diff --git a/services/syncbase/server/watchable/test_util.go b/services/syncbase/server/watchable/test_util.go
index 713b4e1..7db1755 100644
--- a/services/syncbase/server/watchable/test_util.go
+++ b/services/syncbase/server/watchable/test_util.go
@@ -19,13 +19,18 @@
// This file provides utility methods for tests related to watchable store.
-/////// Functions related to creation/cleanup of store instances ///////
+////////////////////////////////////////////////////////////
+// Functions for store creation/cleanup
// createStore returns a store along with a function to destroy the store
// once it is no longer needed.
-func createStore(useMemstore bool) (store.Store, func()) {
+func createStore() (store.Store, func()) {
var st store.Store
- if useMemstore {
+ // With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+ // every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+ // relatively expensive since the entire data map is copied. LevelDB snapshots
+ // are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+ if false {
st = memstore.New()
return st, func() {
st.Close()
@@ -61,7 +66,8 @@
}
}
-/////// Functions related to watchable store ///////
+////////////////////////////////////////////////////////////
+// Functions related to watchable store
func getSeq(st Store) uint64 {
wst := st.(*wstore)
@@ -73,23 +79,23 @@
wst.clock.SetSystemClock(mockClock)
}
-// LogEntryReader provides a stream-like interface to scan over the log entries
+// logEntryReader provides a stream-like interface to scan over the log entries
// of a single batch, starting for a given sequence number. It opens a stream
// that scans the log from the sequence number given. It stops after reading
// the last entry in that batch (indicated by a false Continued flag).
-type LogEntryReader struct {
+type logEntryReader struct {
stream store.Stream // scan stream on the store Database
done bool // true after reading the last batch entry
key string // key of most recent log entry read
entry LogEntry // most recent log entry read
}
-func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
+func newLogEntryReader(st store.Store, seq uint64) *logEntryReader {
stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
- return &LogEntryReader{stream: stream}
+ return &logEntryReader{stream: stream}
}
-func (ler *LogEntryReader) Advance() bool {
+func (ler *logEntryReader) Advance() bool {
if ler.done {
return false
}
@@ -110,29 +116,29 @@
return false
}
-func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
+func (ler *logEntryReader) GetEntry() (string, LogEntry) {
return ler.key, ler.entry
}
-/////// Clock related utility code ///////
+////////////////////////////////////////////////////////////
+// Clock related utility code
-// Mock Implementation for SystemClock
-type MockSystemClock struct {
+type mockSystemClock struct {
time time.Time // current time returned by call to Now()
increment time.Duration // how much to increment the clock by for subsequent calls to Now()
}
-func NewMockSystemClock(firstTimestamp time.Time, increment time.Duration) *MockSystemClock {
- return &MockSystemClock{
+func newMockSystemClock(firstTimestamp time.Time, increment time.Duration) *mockSystemClock {
+ return &mockSystemClock{
time: firstTimestamp,
increment: increment,
}
}
-func (sc *MockSystemClock) Now() time.Time {
+func (sc *mockSystemClock) Now() time.Time {
now := sc.time
sc.time = sc.time.Add(sc.increment)
return now
}
-var _ clock.SystemClock = (*MockSystemClock)(nil)
+var _ clock.SystemClock = (*mockSystemClock)(nil)
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 963121e..12f5e1f 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -30,6 +30,22 @@
var _ store.Transaction = (*transaction)(nil)
+func cp(src []byte) []byte {
+ dst := make([]byte, len(src))
+ for i := 0; i < len(src); i++ {
+ dst[i] = src[i]
+ }
+ return dst
+}
+
+func cpStrings(src []string) []string {
+ dst := make([]string, len(src))
+ for i := 0; i < len(src); i++ {
+ dst[i] = src[i]
+ }
+ return dst
+}
+
func newTransaction(st *wstore) *transaction {
return &transaction{
itx: st.ist.NewTransaction(),
@@ -49,7 +65,7 @@
valbuf, err = tx.itx.Get(key, valbuf)
} else {
valbuf, err = getVersioned(tx.itx, key, valbuf)
- tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+ tx.ops = append(tx.ops, &OpGet{GetOp{Key: cp(key)}})
}
return valbuf, err
}
@@ -66,7 +82,7 @@
it = tx.itx.Scan(start, limit)
} else {
it = newStreamVersioned(tx.itx, start, limit)
- tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+ tx.ops = append(tx.ops, &OpScan{ScanOp{Start: cp(start), Limit: cp(limit)}})
}
return it
}
@@ -85,7 +101,7 @@
if err != nil {
return err
}
- tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: cp(key), Version: version}})
return nil
}
@@ -104,7 +120,7 @@
if err != nil {
return err
}
- tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: cp(key)}})
return nil
}
@@ -168,7 +184,8 @@
if wtx.err != nil {
return convertError(wtx.err)
}
- wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+ // Make a defensive copy of prefixes slice.
+ wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: cpStrings(prefixes), Remove: remove}})
return nil
}
@@ -186,10 +203,9 @@
return convertError(wtx.err)
}
if !wtx.st.managesKey(key) {
- return verror.New(verror.ErrInternal, ctx,
- fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+ return verror.New(verror.ErrInternal, ctx, fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
}
- wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+ wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: cp(key), Version: cp(version)}})
return nil
}
@@ -282,7 +298,7 @@
if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
return err
}
- wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
return nil
}
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
index 3625896..43b7b94 100644
--- a/services/syncbase/server/watchable/transaction_test.go
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -7,18 +7,14 @@
import (
"bytes"
"fmt"
+ "reflect"
+ "runtime/debug"
"testing"
"time"
"v.io/syncbase/x/ref/services/syncbase/store"
)
-// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
-// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
-// relatively expensive since the entire data map is copied. LevelDB snapshots
-// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
-const useMemstoreForTest = false
-
type testData struct {
key string
createVal string
@@ -37,16 +33,47 @@
updateVal: "val-b2",
}
+func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+ // check and update data1
+ keyBytes := []byte(data.key)
+ val, err := st.Get(keyBytes, nil)
+ if err != nil {
+ return fmt.Errorf("can't get key %q: %v", data.key, err)
+ }
+ if !bytes.Equal(val, []byte(data.createVal)) {
+ return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
+ }
+ if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
+ }
+ return nil
+}
+
+func verifyCommitLog(t *testing.T, st store.Store, seq uint64, wantNumEntries int, wantTimestamp time.Time) {
+ ler := newLogEntryReader(st, seq)
+ numEntries := 0
+ for ler.Advance() {
+ _, entry := ler.GetEntry()
+ numEntries++
+ if entry.CommitTimestamp != wantTimestamp.UnixNano() {
+ t.Errorf("Unexpected timestamp found for entry: got %v, want %v", entry.CommitTimestamp, wantTimestamp.UnixNano())
+ }
+ }
+ if numEntries != wantNumEntries {
+ t.Errorf("Unexpected number of log entries: got %v, want %v", numEntries, wantNumEntries)
+ }
+}
+
func TestLogEntryTimestamps(t *testing.T) {
- stImpl, destroy := createStore(useMemstoreForTest)
+ ist, destroy := createStore()
defer destroy()
t1 := time.Now()
inc := time.Duration(1) * time.Second
- var mockClock *MockSystemClock = NewMockSystemClock(t1, inc)
+ mockClock := newMockSystemClock(t1, inc)
- wst1, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+ wst1, err := Wrap(ist, &Options{ManagedPrefixes: nil})
if err != nil {
- t.Errorf("Failed to wrap store for create")
+ t.Fatalf("Wrap failed: %v", err)
}
seqForCreate := getSeq(wst1)
setMockSystemClock(wst1, mockClock)
@@ -55,13 +82,11 @@
if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
// add data1
if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
- return fmt.Errorf("can't put {%q: %v}: %v",
- data1.key, data1.createVal, err)
+ return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
}
// add data2
if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
- return fmt.Errorf("can't put {%q: %v}: %v",
- data2.key, data2.createVal, err)
+ return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
}
return nil
}); err != nil {
@@ -71,14 +96,14 @@
// read and verify LogEntries written as part of above transaction
// We expect 2 entries in the log for the two puts.
// Timestamp from mockclock for the commit should be t1
- verifyCommitLog(t, stImpl, seqForCreate, 2, t1)
+ verifyCommitLog(t, ist, seqForCreate, 2, t1)
// Update data already present in store with a new watchable store
- wst2, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
- setMockSystemClock(wst2, mockClock)
+ wst2, err := Wrap(ist, &Options{ManagedPrefixes: nil})
if err != nil {
- t.Errorf("Failed to wrap store for update")
+ t.Fatalf("Wrap failed: %v", err)
}
+ setMockSystemClock(wst2, mockClock)
seqForUpdate := getSeq(wst2)
// We expect the sequence number to have moved by +2 for the two puts.
if seqForUpdate != (seqForCreate + 2) {
@@ -101,39 +126,93 @@
// We expect 4 entries in the log for the two gets and two puts.
// Timestamp from mockclock for the commit should be t1 + 1 sec
t2 := t1.Add(inc)
- verifyCommitLog(t, stImpl, seqForUpdate, 4, t2)
+ verifyCommitLog(t, ist, seqForUpdate, 4, t2)
}
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
- // check and update data1
- keyBytes := []byte(data.key)
- val, err := st.Get(keyBytes, nil)
+func eq(t *testing.T, got, want interface{}) {
+ if !reflect.DeepEqual(got, want) {
+ debug.PrintStack()
+ t.Fatalf("got %v, want %v", got, want)
+ }
+}
+
+func TestOpLogConsistency(t *testing.T) {
+ ist, destroy := createStore()
+ defer destroy()
+ wst, err := Wrap(ist, &Options{ManagedPrefixes: nil})
if err != nil {
- return fmt.Errorf("can't get key %q: %v", data.key, err)
+ t.Fatalf("Wrap failed: %v", err)
}
- if !bytes.Equal(val, []byte(data.createVal)) {
- return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
- }
- if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
- return fmt.Errorf("can't put {%q: %v}: %v",
- data.key, data.updateVal, err)
- }
- return nil
-}
-func verifyCommitLog(t *testing.T, st store.Store, seq uint64, expectedEntries int, expectedTimestamp time.Time) {
- var ler *LogEntryReader = NewLogEntryReader(st, seq)
- var entryCount int = 0
+ if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+ putKey, putVal := []byte("foo"), []byte("bar")
+ if err := st.Put(putKey, putVal); err != nil {
+ return err
+ }
+ getKey := []byte("foo")
+ if getVal, err := st.Get(getKey, nil); err != nil {
+ return err
+ } else {
+ eq(t, getVal, putVal)
+ }
+ start, limit := []byte("aaa"), []byte("bbb")
+ st.Scan(start, limit)
+ delKey := []byte("foo")
+ if err := st.Delete(delKey); err != nil {
+ return err
+ }
+ sgPrefixes := []string{"sga", "sgb"}
+ if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+ return err
+ }
+ snKey, snVersion := []byte("aa"), []byte("123")
+ if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+ return err
+ }
+ pvKey, pvVersion := []byte("pv"), []byte("456")
+ if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+ return err
+ }
+ for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
+ buf[0] = '#'
+ }
+ sgPrefixes[0] = "zebra"
+ return nil
+ }); err != nil {
+ t.Fatalf("failed to commit txn: %v", err)
+ }
+
+ // Read first (and only) batch.
+ ler := newLogEntryReader(ist, 0)
+ numEntries, wantNumEntries := 0, 7
+ sawPut := false
for ler.Advance() {
_, entry := ler.GetEntry()
- entryCount++
- if entry.CommitTimestamp != expectedTimestamp.UnixNano() {
- errStr := "Unexpected timestamp found for entry." +
- " Expected: %d, found: %d"
- t.Errorf(errStr, expectedTimestamp.UnixNano(), entry.CommitTimestamp)
+ numEntries++
+ switch op := entry.Op.(type) {
+ case OpGet:
+ eq(t, string(op.Value.Key), "foo")
+ case OpScan:
+ eq(t, string(op.Value.Start), "aaa")
+ eq(t, string(op.Value.Limit), "bbb")
+ case OpPut:
+ if !sawPut {
+ eq(t, string(op.Value.Key), "foo")
+ sawPut = true
+ } else {
+ eq(t, string(op.Value.Key), "pv")
+ eq(t, string(op.Value.Version), "456")
+ }
+ case OpDelete:
+ eq(t, string(op.Value.Key), "foo")
+ case OpSyncGroup:
+ eq(t, op.Value.Prefixes, []string{"sga", "sgb"})
+ case OpSyncSnapshot:
+ eq(t, string(op.Value.Key), "aa")
+ eq(t, string(op.Value.Version), "123")
+ default:
+ t.Fatalf("Unexpected op type in entry: %v", entry)
}
}
- if entryCount != expectedEntries {
- t.Errorf("Unexpected number of log entries found. Expected: %d, found: %d", expectedEntries, entryCount)
- }
+ eq(t, numEntries, wantNumEntries)
}