TBR syncbase: fix mem ownership bug in watchable txn

(Submitting TBR because Ivan +1'ed but didn't +2 before his vacation.)

watchable.Transaction should've made defensive copies of mutable inputs since it
holds (and dereferences) references to them after the various methods (get, put,
etc.) return.

Change-Id: Icd025a2ae4a4121b919ba8b1ae14c431b9683ef7
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 465ca69..0f56ba3 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -33,6 +33,10 @@
 	// do not exist.
 	Exists(schemaVersion int32) (bool | error) {access.Read}
 
+	// Exec executes a syncQL query and returns all results as specified by in the
+	// query's select clause. Concurrency semantics are documented in model.go.
+	Exec(schemaVersion int32, query string) stream<_, []any> error {access.Read}
+
 	// BeginBatch creates a new batch. It returns an App-relative name for a
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
@@ -45,10 +49,6 @@
 	// ErrNotBoundToBatch.
 	Commit(schemaVersion int32) error {access.Read}
 
-	// Exec executes a syncQL query and returns all results as specified by in the
-	// query's select clause. Concurrency semantics are documented in model.go.
-	Exec(schemaVersion int32, query string) stream<_, []any> error {access.Read}
-
 	// Abort notifies the server that any pending changes can be discarded.
 	// It is not strictly required, but it may allow the server to release locks
 	// or other resources sooner than if it was not called.
@@ -95,6 +95,7 @@
 
 	// Delete deletes all rows in the given half-open range [start, limit). If
 	// limit is "", all rows with keys >= start are included.
+	// TODO(sadovsky): Delete prefix perms fully covered by the row range?
 	DeleteRowRange(schemaVersion int32, start, limit []byte) error {access.Write}
 
 	// Scan returns all rows in the given half-open range [start, limit). If limit
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index da6bf6d..624036c 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1528,6 +1528,9 @@
 	// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy
 	// do not exist.
 	Exists(ctx *context.T, schemaVersion int32, opts ...rpc.CallOpt) (bool, error)
+	// Exec executes a syncQL query and returns all results as specified by in the
+	// query's select clause. Concurrency semantics are documented in model.go.
+	Exec(ctx *context.T, schemaVersion int32, query string, opts ...rpc.CallOpt) (DatabaseExecClientCall, error)
 	// BeginBatch creates a new batch. It returns an App-relative name for a
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
@@ -1538,9 +1541,6 @@
 	// If this Database is not bound to a batch, Commit() will fail with
 	// ErrNotBoundToBatch.
 	Commit(ctx *context.T, schemaVersion int32, opts ...rpc.CallOpt) error
-	// Exec executes a syncQL query and returns all results as specified by in the
-	// query's select clause. Concurrency semantics are documented in model.go.
-	Exec(ctx *context.T, schemaVersion int32, query string, opts ...rpc.CallOpt) (DatabaseExecClientCall, error)
 	// Abort notifies the server that any pending changes can be discarded.
 	// It is not strictly required, but it may allow the server to release locks
 	// or other resources sooner than if it was not called.
@@ -1585,6 +1585,15 @@
 	return
 }
 
+func (c implDatabaseClientStub) Exec(ctx *context.T, i0 int32, i1 string, opts ...rpc.CallOpt) (ocall DatabaseExecClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Exec", []interface{}{i0, i1}, opts...); err != nil {
+		return
+	}
+	ocall = &implDatabaseExecClientCall{ClientCall: call}
+	return
+}
+
 func (c implDatabaseClientStub) BeginBatch(ctx *context.T, i0 int32, i1 BatchOptions, opts ...rpc.CallOpt) (o0 string, err error) {
 	err = v23.GetClient(ctx).Call(ctx, c.name, "BeginBatch", []interface{}{i0, i1}, []interface{}{&o0}, opts...)
 	return
@@ -1595,15 +1604,6 @@
 	return
 }
 
-func (c implDatabaseClientStub) Exec(ctx *context.T, i0 int32, i1 string, opts ...rpc.CallOpt) (ocall DatabaseExecClientCall, err error) {
-	var call rpc.ClientCall
-	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Exec", []interface{}{i0, i1}, opts...); err != nil {
-		return
-	}
-	ocall = &implDatabaseExecClientCall{ClientCall: call}
-	return
-}
-
 func (c implDatabaseClientStub) Abort(ctx *context.T, i0 int32, opts ...rpc.CallOpt) (err error) {
 	err = v23.GetClient(ctx).Call(ctx, c.name, "Abort", []interface{}{i0}, nil, opts...)
 	return
@@ -1773,6 +1773,9 @@
 	// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy
 	// do not exist.
 	Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error)
+	// Exec executes a syncQL query and returns all results as specified by in the
+	// query's select clause. Concurrency semantics are documented in model.go.
+	Exec(ctx *context.T, call DatabaseExecServerCall, schemaVersion int32, query string) error
 	// BeginBatch creates a new batch. It returns an App-relative name for a
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
@@ -1783,9 +1786,6 @@
 	// If this Database is not bound to a batch, Commit() will fail with
 	// ErrNotBoundToBatch.
 	Commit(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error
-	// Exec executes a syncQL query and returns all results as specified by in the
-	// query's select clause. Concurrency semantics are documented in model.go.
-	Exec(ctx *context.T, call DatabaseExecServerCall, schemaVersion int32, query string) error
 	// Abort notifies the server that any pending changes can be discarded.
 	// It is not strictly required, but it may allow the server to release locks
 	// or other resources sooner than if it was not called.
@@ -1884,6 +1884,9 @@
 	// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy
 	// do not exist.
 	Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error)
+	// Exec executes a syncQL query and returns all results as specified by in the
+	// query's select clause. Concurrency semantics are documented in model.go.
+	Exec(ctx *context.T, call *DatabaseExecServerCallStub, schemaVersion int32, query string) error
 	// BeginBatch creates a new batch. It returns an App-relative name for a
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
@@ -1894,9 +1897,6 @@
 	// If this Database is not bound to a batch, Commit() will fail with
 	// ErrNotBoundToBatch.
 	Commit(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error
-	// Exec executes a syncQL query and returns all results as specified by in the
-	// query's select clause. Concurrency semantics are documented in model.go.
-	Exec(ctx *context.T, call *DatabaseExecServerCallStub, schemaVersion int32, query string) error
 	// Abort notifies the server that any pending changes can be discarded.
 	// It is not strictly required, but it may allow the server to release locks
 	// or other resources sooner than if it was not called.
@@ -1956,6 +1956,10 @@
 	return s.impl.Exists(ctx, call, i0)
 }
 
+func (s implDatabaseServerStub) Exec(ctx *context.T, call *DatabaseExecServerCallStub, i0 int32, i1 string) error {
+	return s.impl.Exec(ctx, call, i0, i1)
+}
+
 func (s implDatabaseServerStub) BeginBatch(ctx *context.T, call rpc.ServerCall, i0 int32, i1 BatchOptions) (string, error) {
 	return s.impl.BeginBatch(ctx, call, i0, i1)
 }
@@ -1964,10 +1968,6 @@
 	return s.impl.Commit(ctx, call, i0)
 }
 
-func (s implDatabaseServerStub) Exec(ctx *context.T, call *DatabaseExecServerCallStub, i0 int32, i1 string) error {
-	return s.impl.Exec(ctx, call, i0, i1)
-}
-
 func (s implDatabaseServerStub) Abort(ctx *context.T, call rpc.ServerCall, i0 int32) error {
 	return s.impl.Abort(ctx, call, i0)
 }
@@ -2025,6 +2025,15 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
+			Name: "Exec",
+			Doc:  "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
+			InArgs: []rpc.ArgDesc{
+				{"schemaVersion", ``}, // int32
+				{"query", ``},         // string
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+		},
+		{
 			Name: "BeginBatch",
 			Doc:  "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
 			InArgs: []rpc.ArgDesc{
@@ -2045,15 +2054,6 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
-			Name: "Exec",
-			Doc:  "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
-			InArgs: []rpc.ArgDesc{
-				{"schemaVersion", ``}, // int32
-				{"query", ``},         // string
-			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
-		},
-		{
 			Name: "Abort",
 			Doc:  "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
 			InArgs: []rpc.ArgDesc{
@@ -2127,6 +2127,7 @@
 	Exists(ctx *context.T, schemaVersion int32, opts ...rpc.CallOpt) (bool, error)
 	// Delete deletes all rows in the given half-open range [start, limit). If
 	// limit is "", all rows with keys >= start are included.
+	// TODO(sadovsky): Delete prefix perms fully covered by the row range?
 	DeleteRowRange(ctx *context.T, schemaVersion int32, start []byte, limit []byte, opts ...rpc.CallOpt) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. Concurrency semantics are
@@ -2300,6 +2301,7 @@
 	Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error)
 	// Delete deletes all rows in the given half-open range [start, limit). If
 	// limit is "", all rows with keys >= start are included.
+	// TODO(sadovsky): Delete prefix perms fully covered by the row range?
 	DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start []byte, limit []byte) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. Concurrency semantics are
@@ -2342,6 +2344,7 @@
 	Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error)
 	// Delete deletes all rows in the given half-open range [start, limit). If
 	// limit is "", all rows with keys >= start are included.
+	// TODO(sadovsky): Delete prefix perms fully covered by the row range?
 	DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start []byte, limit []byte) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. Concurrency semantics are
@@ -2475,7 +2478,7 @@
 		},
 		{
 			Name: "DeleteRowRange",
-			Doc:  "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
+			Doc:  "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.\n// TODO(sadovsky): Delete prefix perms fully covered by the row range?",
 			InArgs: []rpc.ArgDesc{
 				{"schemaVersion", ``}, // int32
 				{"start", ``},         // []byte
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 0d1dacf..560704a 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -177,6 +177,7 @@
 
 	// Delete deletes all rows in the given half-open range [start, limit). If
 	// limit is "", all rows with keys >= start are included.
+	// TODO(sadovsky): Delete prefix perms fully covered by the row range?
 	// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
 	Delete(ctx *context.T, r RowRange) error
 
diff --git a/x/ref/services/syncbase/server/watchable/store_test.go b/x/ref/services/syncbase/server/watchable/store_test.go
index 5ae2562..07b73c4 100644
--- a/x/ref/services/syncbase/server/watchable/store_test.go
+++ b/x/ref/services/syncbase/server/watchable/store_test.go
@@ -61,14 +61,8 @@
 	runTest(t, nil, test.RunTransactionsWithGetTest)
 }
 
-// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
-// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
-// relatively expensive since the entire data map is copied. LevelDB snapshots
-// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
-const useMemstore = false
-
 func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
-	st, destroy := createStore(useMemstore)
+	st, destroy := createStore()
 	defer destroy()
 	st, err := Wrap(st, &Options{ManagedPrefixes: mp})
 	if err != nil {
diff --git a/x/ref/services/syncbase/server/watchable/test_util.go b/x/ref/services/syncbase/server/watchable/test_util.go
index 713b4e1..7db1755 100644
--- a/x/ref/services/syncbase/server/watchable/test_util.go
+++ b/x/ref/services/syncbase/server/watchable/test_util.go
@@ -19,13 +19,18 @@
 
 // This file provides utility methods for tests related to watchable store.
 
-///////  Functions related to creation/cleanup of store instances  ///////
+////////////////////////////////////////////////////////////
+// Functions for store creation/cleanup
 
 // createStore returns a store along with a function to destroy the store
 // once it is no longer needed.
-func createStore(useMemstore bool) (store.Store, func()) {
+func createStore() (store.Store, func()) {
 	var st store.Store
-	if useMemstore {
+	// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+	// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+	// relatively expensive since the entire data map is copied. LevelDB snapshots
+	// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+	if false {
 		st = memstore.New()
 		return st, func() {
 			st.Close()
@@ -61,7 +66,8 @@
 	}
 }
 
-///////  Functions related to watchable store  ///////
+////////////////////////////////////////////////////////////
+// Functions related to watchable store
 
 func getSeq(st Store) uint64 {
 	wst := st.(*wstore)
@@ -73,23 +79,23 @@
 	wst.clock.SetSystemClock(mockClock)
 }
 
-// LogEntryReader provides a stream-like interface to scan over the log entries
+// logEntryReader provides a stream-like interface to scan over the log entries
 // of a single batch, starting for a given sequence number.  It opens a stream
 // that scans the log from the sequence number given.  It stops after reading
 // the last entry in that batch (indicated by a false Continued flag).
-type LogEntryReader struct {
+type logEntryReader struct {
 	stream store.Stream // scan stream on the store Database
 	done   bool         // true after reading the last batch entry
 	key    string       // key of most recent log entry read
 	entry  LogEntry     // most recent log entry read
 }
 
-func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
+func newLogEntryReader(st store.Store, seq uint64) *logEntryReader {
 	stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
-	return &LogEntryReader{stream: stream}
+	return &logEntryReader{stream: stream}
 }
 
-func (ler *LogEntryReader) Advance() bool {
+func (ler *logEntryReader) Advance() bool {
 	if ler.done {
 		return false
 	}
@@ -110,29 +116,29 @@
 	return false
 }
 
-func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
+func (ler *logEntryReader) GetEntry() (string, LogEntry) {
 	return ler.key, ler.entry
 }
 
-///////  Clock related utility code  ///////
+////////////////////////////////////////////////////////////
+// Clock related utility code
 
-// Mock Implementation for SystemClock
-type MockSystemClock struct {
+type mockSystemClock struct {
 	time      time.Time     // current time returned by call to Now()
 	increment time.Duration // how much to increment the clock by for subsequent calls to Now()
 }
 
-func NewMockSystemClock(firstTimestamp time.Time, increment time.Duration) *MockSystemClock {
-	return &MockSystemClock{
+func newMockSystemClock(firstTimestamp time.Time, increment time.Duration) *mockSystemClock {
+	return &mockSystemClock{
 		time:      firstTimestamp,
 		increment: increment,
 	}
 }
 
-func (sc *MockSystemClock) Now() time.Time {
+func (sc *mockSystemClock) Now() time.Time {
 	now := sc.time
 	sc.time = sc.time.Add(sc.increment)
 	return now
 }
 
-var _ clock.SystemClock = (*MockSystemClock)(nil)
+var _ clock.SystemClock = (*mockSystemClock)(nil)
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 963121e..12f5e1f 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -30,6 +30,22 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
+func cp(src []byte) []byte {
+	dst := make([]byte, len(src))
+	for i := 0; i < len(src); i++ {
+		dst[i] = src[i]
+	}
+	return dst
+}
+
+func cpStrings(src []string) []string {
+	dst := make([]string, len(src))
+	for i := 0; i < len(src); i++ {
+		dst[i] = src[i]
+	}
+	return dst
+}
+
 func newTransaction(st *wstore) *transaction {
 	return &transaction{
 		itx: st.ist.NewTransaction(),
@@ -49,7 +65,7 @@
 		valbuf, err = tx.itx.Get(key, valbuf)
 	} else {
 		valbuf, err = getVersioned(tx.itx, key, valbuf)
-		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: cp(key)}})
 	}
 	return valbuf, err
 }
@@ -66,7 +82,7 @@
 		it = tx.itx.Scan(start, limit)
 	} else {
 		it = newStreamVersioned(tx.itx, start, limit)
-		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: cp(start), Limit: cp(limit)}})
 	}
 	return it
 }
@@ -85,7 +101,7 @@
 	if err != nil {
 		return err
 	}
-	tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	tx.ops = append(tx.ops, &OpPut{PutOp{Key: cp(key), Version: version}})
 	return nil
 }
 
@@ -104,7 +120,7 @@
 	if err != nil {
 		return err
 	}
-	tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: cp(key)}})
 	return nil
 }
 
@@ -168,7 +184,8 @@
 	if wtx.err != nil {
 		return convertError(wtx.err)
 	}
-	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+	// Make a defensive copy of prefixes slice.
+	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: cpStrings(prefixes), Remove: remove}})
 	return nil
 }
 
@@ -186,10 +203,9 @@
 		return convertError(wtx.err)
 	}
 	if !wtx.st.managesKey(key) {
-		return verror.New(verror.ErrInternal, ctx,
-			fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+		return verror.New(verror.ErrInternal, ctx, fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
 	}
-	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: cp(key), Version: cp(version)}})
 	return nil
 }
 
@@ -282,7 +298,7 @@
 	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
 		return err
 	}
-	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 3625896..43b7b94 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -7,18 +7,14 @@
 import (
 	"bytes"
 	"fmt"
+	"reflect"
+	"runtime/debug"
 	"testing"
 	"time"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
-// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
-// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
-// relatively expensive since the entire data map is copied. LevelDB snapshots
-// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
-const useMemstoreForTest = false
-
 type testData struct {
 	key       string
 	createVal string
@@ -37,16 +33,47 @@
 	updateVal: "val-b2",
 }
 
+func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+	// check and update data1
+	keyBytes := []byte(data.key)
+	val, err := st.Get(keyBytes, nil)
+	if err != nil {
+		return fmt.Errorf("can't get key %q: %v", data.key, err)
+	}
+	if !bytes.Equal(val, []byte(data.createVal)) {
+		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
+	}
+	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+		return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
+	}
+	return nil
+}
+
+func verifyCommitLog(t *testing.T, st store.Store, seq uint64, wantNumEntries int, wantTimestamp time.Time) {
+	ler := newLogEntryReader(st, seq)
+	numEntries := 0
+	for ler.Advance() {
+		_, entry := ler.GetEntry()
+		numEntries++
+		if entry.CommitTimestamp != wantTimestamp.UnixNano() {
+			t.Errorf("Unexpected timestamp found for entry: got %v, want %v", entry.CommitTimestamp, wantTimestamp.UnixNano())
+		}
+	}
+	if numEntries != wantNumEntries {
+		t.Errorf("Unexpected number of log entries: got %v, want %v", numEntries, wantNumEntries)
+	}
+}
+
 func TestLogEntryTimestamps(t *testing.T) {
-	stImpl, destroy := createStore(useMemstoreForTest)
+	ist, destroy := createStore()
 	defer destroy()
 	t1 := time.Now()
 	inc := time.Duration(1) * time.Second
-	var mockClock *MockSystemClock = NewMockSystemClock(t1, inc)
+	mockClock := newMockSystemClock(t1, inc)
 
-	wst1, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+	wst1, err := Wrap(ist, &Options{ManagedPrefixes: nil})
 	if err != nil {
-		t.Errorf("Failed to wrap store for create")
+		t.Fatalf("Wrap failed: %v", err)
 	}
 	seqForCreate := getSeq(wst1)
 	setMockSystemClock(wst1, mockClock)
@@ -55,13 +82,11 @@
 	if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
 		// add data1
 		if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
-			return fmt.Errorf("can't put {%q: %v}: %v",
-				data1.key, data1.createVal, err)
+			return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
 		}
 		// add data2
 		if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
-			return fmt.Errorf("can't put {%q: %v}: %v",
-				data2.key, data2.createVal, err)
+			return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
 		}
 		return nil
 	}); err != nil {
@@ -71,14 +96,14 @@
 	// read and verify LogEntries written as part of above transaction
 	// We expect 2 entries in the log for the two puts.
 	// Timestamp from mockclock for the commit should be t1
-	verifyCommitLog(t, stImpl, seqForCreate, 2, t1)
+	verifyCommitLog(t, ist, seqForCreate, 2, t1)
 
 	// Update data already present in store with a new watchable store
-	wst2, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
-	setMockSystemClock(wst2, mockClock)
+	wst2, err := Wrap(ist, &Options{ManagedPrefixes: nil})
 	if err != nil {
-		t.Errorf("Failed to wrap store for update")
+		t.Fatalf("Wrap failed: %v", err)
 	}
+	setMockSystemClock(wst2, mockClock)
 	seqForUpdate := getSeq(wst2)
 	// We expect the sequence number to have moved by +2 for the two puts.
 	if seqForUpdate != (seqForCreate + 2) {
@@ -101,39 +126,93 @@
 	// We expect 4 entries in the log for the two gets and two puts.
 	// Timestamp from mockclock for the commit should be t1 + 1 sec
 	t2 := t1.Add(inc)
-	verifyCommitLog(t, stImpl, seqForUpdate, 4, t2)
+	verifyCommitLog(t, ist, seqForUpdate, 4, t2)
 }
 
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
-	// check and update data1
-	keyBytes := []byte(data.key)
-	val, err := st.Get(keyBytes, nil)
+func eq(t *testing.T, got, want interface{}) {
+	if !reflect.DeepEqual(got, want) {
+		debug.PrintStack()
+		t.Fatalf("got %v, want %v", got, want)
+	}
+}
+
+func TestOpLogConsistency(t *testing.T) {
+	ist, destroy := createStore()
+	defer destroy()
+	wst, err := Wrap(ist, &Options{ManagedPrefixes: nil})
 	if err != nil {
-		return fmt.Errorf("can't get key %q: %v", data.key, err)
+		t.Fatalf("Wrap failed: %v", err)
 	}
-	if !bytes.Equal(val, []byte(data.createVal)) {
-		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
-	}
-	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
-		return fmt.Errorf("can't put {%q: %v}: %v",
-			data.key, data.updateVal, err)
-	}
-	return nil
-}
 
-func verifyCommitLog(t *testing.T, st store.Store, seq uint64, expectedEntries int, expectedTimestamp time.Time) {
-	var ler *LogEntryReader = NewLogEntryReader(st, seq)
-	var entryCount int = 0
+	if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+		putKey, putVal := []byte("foo"), []byte("bar")
+		if err := st.Put(putKey, putVal); err != nil {
+			return err
+		}
+		getKey := []byte("foo")
+		if getVal, err := st.Get(getKey, nil); err != nil {
+			return err
+		} else {
+			eq(t, getVal, putVal)
+		}
+		start, limit := []byte("aaa"), []byte("bbb")
+		st.Scan(start, limit)
+		delKey := []byte("foo")
+		if err := st.Delete(delKey); err != nil {
+			return err
+		}
+		sgPrefixes := []string{"sga", "sgb"}
+		if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+			return err
+		}
+		snKey, snVersion := []byte("aa"), []byte("123")
+		if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+			return err
+		}
+		pvKey, pvVersion := []byte("pv"), []byte("456")
+		if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+			return err
+		}
+		for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
+			buf[0] = '#'
+		}
+		sgPrefixes[0] = "zebra"
+		return nil
+	}); err != nil {
+		t.Fatalf("failed to commit txn: %v", err)
+	}
+
+	// Read first (and only) batch.
+	ler := newLogEntryReader(ist, 0)
+	numEntries, wantNumEntries := 0, 7
+	sawPut := false
 	for ler.Advance() {
 		_, entry := ler.GetEntry()
-		entryCount++
-		if entry.CommitTimestamp != expectedTimestamp.UnixNano() {
-			errStr := "Unexpected timestamp found for entry." +
-				" Expected: %d, found: %d"
-			t.Errorf(errStr, expectedTimestamp.UnixNano(), entry.CommitTimestamp)
+		numEntries++
+		switch op := entry.Op.(type) {
+		case OpGet:
+			eq(t, string(op.Value.Key), "foo")
+		case OpScan:
+			eq(t, string(op.Value.Start), "aaa")
+			eq(t, string(op.Value.Limit), "bbb")
+		case OpPut:
+			if !sawPut {
+				eq(t, string(op.Value.Key), "foo")
+				sawPut = true
+			} else {
+				eq(t, string(op.Value.Key), "pv")
+				eq(t, string(op.Value.Version), "456")
+			}
+		case OpDelete:
+			eq(t, string(op.Value.Key), "foo")
+		case OpSyncGroup:
+			eq(t, op.Value.Prefixes, []string{"sga", "sgb"})
+		case OpSyncSnapshot:
+			eq(t, string(op.Value.Key), "aa")
+			eq(t, string(op.Value.Version), "123")
+		default:
+			t.Fatalf("Unexpected op type in entry: %v", entry)
 		}
 	}
-	if entryCount != expectedEntries {
-		t.Errorf("Unexpected number of log entries found. Expected: %d, found: %d", expectedEntries, entryCount)
-	}
+	eq(t, numEntries, wantNumEntries)
 }