syncbase: watchable store with versioning support

Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 2151b1a..463c498 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -89,7 +89,8 @@
 }
 
 func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	// In the future this API should be replaced by one that streams the database names.
+	// In the future this API will likely be replaced by one that streams the
+	// database names.
 	a.mu.Lock()
 	defer a.mu.Unlock()
 	dbNames := make([]string, 0, len(a.dbs))
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index c69af62..5a44625 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -8,9 +8,9 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
-	"v.io/syncbase/x/ref/services/syncbase/store/watchable"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
@@ -39,7 +39,9 @@
 		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
 	// TODO(sadovsky): Make storage engine pluggable.
-	st, err := watchable.Wrap(memstore.New())
+	st, err := watchable.Wrap(memstore.New(), &watchable.Options{
+		ManagedPrefixes: []string{},
+	})
 	if err != nil {
 		return nil, err
 	}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index ce2c63e..c2a283d 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -115,7 +115,8 @@
 }
 
 func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	// In the future this API should be replaced by one that streams the app names.
+	// In the future this API will likely be replaced by one that streams the app
+	// names.
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	appNames := make([]string, 0, len(s.apps))
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 815ef82..88e1115 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -5,17 +5,22 @@
 package util
 
 // TODO(sadovsky): Consider using shorter strings.
+
+// Constants related to storage engine keys.
 const (
-	ServicePrefix  = "$service"
 	AppPrefix      = "$app"
-	DbInfoPrefix   = "$dbInfo"
 	DatabasePrefix = "$database"
-	TablePrefix    = "$table"
+	DbInfoPrefix   = "$dbInfo"
+	LogPrefix      = "$log"
 	RowPrefix      = "$row"
+	ServicePrefix  = "$service"
 	SyncPrefix     = "$sync"
+	TablePrefix    = "$table"
+	VersionPrefix  = "$version"
 )
 
+// Constants related to object names.
 const (
-	// Service object name suffix for Syncbase internal communication.
+	// Service object name suffix for Syncbase-to-Syncbase RPCs.
 	SyncbaseSuffix = "$internal"
 )
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 0feb974..87a204e 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -54,9 +54,10 @@
 }
 
 // Takes ownership of sn.
-// TODO(sadovsky): It sucks that Glob must be implemented differently from other
-// streaming RPC handlers. I don't have much confidence that I've implemented
-// both types of streaming correctly.
+// TODO(sadovsky): Why do we make developers implement Glob differently from
+// other streaming RPCs? It's confusing that Glob must return immediately and
+// write its results to a channel, while other streaming RPC handlers must block
+// and write their results to the output stream. See nlacasse's TODO below, too.
 func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
new file mode 100644
index 0000000..ac3694f
--- /dev/null
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type snapshot struct {
+	isn store.Snapshot
+	st  *wstore
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(st *wstore) *snapshot {
+	return &snapshot{
+		isn: st.ist.NewSnapshot(),
+		st:  st,
+	}
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+	return s.isn.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+	if !s.st.managesKey(key) {
+		return s.isn.Get(key, valbuf)
+	}
+	return getVersioned(s.isn, key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
+	if !s.st.managesRange(start, limit) {
+		return s.isn.Scan(start, limit)
+	}
+	return newStreamVersioned(s.isn, start, limit)
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
new file mode 100644
index 0000000..ed8e15f
--- /dev/null
+++ b/services/syncbase/server/watchable/store.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a Syncbase-specific store.Store wrapper that
+// provides versioned storage for specified prefixes and maintains a watchable
+// log of operations performed on versioned records. This log forms the basis
+// for the implementation of client-facing watch as well as the sync module's
+// internal watching of store updates.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%016x", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+//
+// Version number records are stored with keys of the form "$version:<key>",
+// where <key> is the client-specified key.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+
+	pubutil "v.io/syncbase/v23/syncbase/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+const (
+	MaxUint16 uint64 = 1<<16 - 1 // 0xffff
+	MaxUint64 uint64 = 1<<64 - 1 // 0xffffffffffffffff
+)
+
+// Store is a store.Store that provides versioned storage and a watchable oplog.
+// TODO(sadovsky): Extend interface.
+type Store interface {
+	store.Store
+}
+
+// Options configures a watchable.Store.
+type Options struct {
+	// Key prefixes to version and log. If nil, all keys are managed.
+	ManagedPrefixes []string
+}
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store, opts *Options) (Store, error) {
+	// Determine initial value for seq.
+	var seq uint64 = 0
+	it := st.Scan([]byte(util.LogPrefix), []byte(""))
+	for it.Advance() {
+		key := string(it.Key(nil))
+		parts := split(key)
+		if len(parts) != 3 {
+			panic("wrong number of parts: " + key)
+		}
+		invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+		if err != nil {
+			panic("failed to parse invSeq: " + key)
+		}
+		seq = MaxUint64 - invSeq
+		it.Cancel()
+	}
+	if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+		return nil, err
+	}
+	return &wstore{ist: st, opts: opts, seq: seq}, nil
+}
+
+type wstore struct {
+	ist  store.Store
+	opts *Options
+	mu   sync.Mutex // held during transaction commits; protects seq
+	seq  uint64     // sequence number, for commits
+}
+
+var _ Store = (*wstore)(nil)
+
+// TODO(sadovsky): Decide whether to copy []byte's vs. requiring clients not to
+// modify passed-in []byte's. (In fact, this should be spelled out in the
+// store.Store API contract.)
+
+// Close implements the store.Store interface.
+func (st *wstore) Close() error {
+	return st.ist.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (st *wstore) Get(key, valbuf []byte) ([]byte, error) {
+	if !st.managesKey(key) {
+		return st.ist.Get(key, valbuf)
+	}
+	sn := newSnapshot(st)
+	defer sn.Close()
+	return sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *wstore) Scan(start, limit []byte) store.Stream {
+	if !st.managesRange(start, limit) {
+		return st.ist.Scan(start, limit)
+	}
+	// TODO(sadovsky): Close snapshot once stream is finished or canceled.
+	return newSnapshot(st).Scan(start, limit)
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *wstore) Put(key, value []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Put(key, value)
+	})
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *wstore) Delete(key []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Delete(key)
+	})
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *wstore) NewTransaction() store.Transaction {
+	return newTransaction(st)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *wstore) NewSnapshot() store.Snapshot {
+	return newSnapshot(st)
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (st *wstore) managesKey(key []byte) bool {
+	if st.opts.ManagedPrefixes == nil {
+		return true
+	}
+	ikey := string(key)
+	// TODO(sadovsky): Optimize, e.g. use binary search (here and below).
+	for _, p := range st.opts.ManagedPrefixes {
+		if strings.HasPrefix(ikey, p) {
+			return true
+		}
+	}
+	return false
+}
+
+func (st *wstore) managesRange(start, limit []byte) bool {
+	if st.opts.ManagedPrefixes == nil {
+		return true
+	}
+	istart, ilimit := string(start), string(limit)
+	for _, p := range st.opts.ManagedPrefixes {
+		pstart, plimit := pubutil.PrefixRangeStart(p), pubutil.PrefixRangeLimit(p)
+		if pstart <= istart && ilimit <= plimit {
+			return true
+		}
+		if !(plimit <= istart || ilimit <= pstart) {
+			// If this happens, there's a bug in the Syncbase server implementation.
+			panic(fmt.Sprintf("partial overlap: %q %q %q", p, start, limit))
+		}
+	}
+	return false
+}
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
new file mode 100644
index 0000000..18b8e4d
--- /dev/null
+++ b/services/syncbase/server/watchable/store_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"io/ioutil"
+	"runtime"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+	"v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+	runtime.GOMAXPROCS(10)
+}
+
+func TestStream(t *testing.T) {
+	runTest(t, []string{}, test.RunStreamTest)
+	runTest(t, nil, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, []string{}, test.RunSnapshotTest)
+	runTest(t, nil, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	runTest(t, []string{}, test.RunStoreStateTest)
+	runTest(t, nil, test.RunStoreStateTest)
+}
+
+func TestClose(t *testing.T) {
+	runTest(t, []string{}, test.RunCloseTest)
+	runTest(t, nil, test.RunCloseTest)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+	runTest(t, []string{}, test.RunReadWriteBasicTest)
+	runTest(t, nil, test.RunReadWriteBasicTest)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+	runTest(t, []string{}, test.RunReadWriteRandomTest)
+	runTest(t, nil, test.RunReadWriteRandomTest)
+}
+
+func TestConcurrentTransactions(t *testing.T) {
+	runTest(t, []string{}, test.RunConcurrentTransactionsTest)
+	runTest(t, nil, test.RunConcurrentTransactionsTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, []string{}, test.RunTransactionStateTest)
+	runTest(t, nil, test.RunTransactionStateTest)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+	runTest(t, []string{}, test.RunTransactionsWithGetTest)
+	runTest(t, nil, test.RunTransactionsWithGetTest)
+}
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstore = false
+
+func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
+	var st store.Store
+	if useMemstore {
+		st = memstore.New()
+	} else {
+		var dbPath string
+		st, dbPath = newLevelDB()
+		defer destroyLevelDB(st, dbPath)
+	}
+	st, err := Wrap(st, &Options{ManagedPrefixes: mp})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer st.Close()
+	f(t, st)
+}
+
+func newLevelDB() (store.Store, string) {
+	path, err := ioutil.TempDir("", "syncbase_leveldb")
+	if err != nil {
+		panic(fmt.Sprintf("can't create temp dir: %v", err))
+	}
+	st, err := leveldb.Open(path)
+	if err != nil {
+		panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+	}
+	return st, path
+}
+
+func destroyLevelDB(st store.Store, path string) {
+	st.Close()
+	if err := leveldb.Destroy(path); err != nil {
+		panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+	}
+}
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
new file mode 100644
index 0000000..cd41835
--- /dev/null
+++ b/services/syncbase/server/watchable/stream.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// stream streams keys and values for versioned records.
+type stream struct {
+	iit      store.Stream
+	st       store.StoreReader
+	mu       sync.Mutex
+	err      error
+	hasValue bool
+	key      []byte
+	value    []byte
+}
+
+var _ store.Stream = (*stream)(nil)
+
+// newStreamVersioned creates a new stream. It assumes all records in range
+// [start, limit) are managed, i.e. versioned.
+func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
+	checkTransactionOrSnapshot(st)
+	return &stream{
+		iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
+		st:  st,
+	}
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.hasValue = false
+	if s.err != nil {
+		return false
+	}
+	if advanced := s.iit.Advance(); !advanced {
+		return false
+	}
+	versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
+	s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
+	s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+	if s.err != nil {
+		return false
+	}
+	s.hasValue = true
+	return true
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(keybuf, s.key)
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(valbuf, s.value)
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return store.WrapError(s.err)
+	}
+	return s.iit.Err()
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return
+	}
+	s.iit.Cancel()
+}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+type transaction struct {
+	itx store.Transaction
+	st  *wstore
+	mu  sync.Mutex // protects the fields below
+	err error
+	ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+	return &transaction{
+		itx: st.ist.NewTransaction(),
+		st:  st,
+	}
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return valbuf, store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		valbuf, err = tx.itx.Get(key, valbuf)
+	} else {
+		valbuf, err = getVersioned(tx.itx, key, valbuf)
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+	}
+	return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return &store.InvalidStream{tx.err}
+	}
+	var it store.Stream
+	if !tx.st.managesRange(start, limit) {
+		it = tx.itx.Scan(start, limit)
+	} else {
+		it = newStreamVersioned(tx.itx, start, limit)
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+	}
+	return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Put(key, value)
+	} else {
+		err = putVersioned(tx.itx, key, value)
+		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+	}
+	return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Delete(key)
+	} else {
+		err = deleteVersioned(tx.itx, key)
+		tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	}
+	return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	// Check sequence numbers.
+	if uint64(len(tx.ops)) > MaxUint16 {
+		return verror.New(verror.ErrInternal, nil, "too many ops")
+	}
+	if tx.st.seq == MaxUint64 {
+		return verror.New(verror.ErrInternal, nil, "seq maxed out")
+	}
+	// Write LogEntry records.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+	for txSeq, op := range tx.ops {
+		key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+		value := &LogEntry{
+			Op: op,
+			// TODO(sadovsky): This information is also captured in LogEntry keys.
+			// Optimize to avoid redundancy.
+			Continued: txSeq < len(tx.ops)-1,
+		}
+		if err := util.PutObject(tx.itx, key, value); err != nil {
+			return err
+		}
+	}
+	if err := tx.itx.Commit(); err != nil {
+		return err
+	}
+	tx.st.seq++
+	return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+	return tx.itx.Abort()
+}
diff --git a/services/syncbase/store/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
similarity index 100%
rename from services/syncbase/store/watchable/types.vdl
rename to services/syncbase/server/watchable/types.vdl
diff --git a/services/syncbase/store/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
similarity index 86%
rename from services/syncbase/store/watchable/types.vdl.go
rename to services/syncbase/server/watchable/types.vdl.go
index 3b6da63..12b8e0b 100644
--- a/services/syncbase/store/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -18,7 +18,7 @@
 }
 
 func (GetOp) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.GetOp"`
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.GetOp"`
 }) {
 }
 
@@ -29,7 +29,7 @@
 }
 
 func (ScanOp) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.ScanOp"`
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.ScanOp"`
 }) {
 }
 
@@ -40,7 +40,7 @@
 }
 
 func (PutOp) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.PutOp"`
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.PutOp"`
 }) {
 }
 
@@ -50,7 +50,7 @@
 }
 
 func (DeleteOp) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.DeleteOp"`
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.DeleteOp"`
 }) {
 }
 
@@ -78,7 +78,7 @@
 	OpDelete struct{ Value DeleteOp }
 	// __OpReflect describes the Op union type.
 	__OpReflect struct {
-		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.Op"`
+		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
 		Type  Op
 		Union struct {
 			Get    OpGet
@@ -122,7 +122,7 @@
 }
 
 func (LogEntry) __VDLReflect(struct {
-	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.LogEntry"`
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.LogEntry"`
 }) {
 }
 
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
new file mode 100644
index 0000000..cd37d3e
--- /dev/null
+++ b/services/syncbase/server/watchable/util.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
+// We should probably convert incoming strings to []byte's as early as possible,
+// and deal exclusively in []byte's internally.
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func makeVersionKey(key []byte) []byte {
+	return []byte(join(util.VersionPrefix, string(key)))
+}
+
+func makeAtVersionKey(key, version []byte) []byte {
+	return []byte(join(string(key), string(version)))
+}
+
+func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
+	return st.Get(makeVersionKey(key), nil)
+}
+
+func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+	return st.Get(makeAtVersionKey(key, version), valbuf)
+}
+
+func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
+	checkTransactionOrSnapshot(st)
+	version, err := getVersion(st, key)
+	if err != nil {
+		return valbuf, err
+	}
+	return getAtVersion(st, key, valbuf, version)
+}
+
+func putVersioned(tx store.Transaction, key, value []byte) error {
+	version := []byte(fmt.Sprintf("%x", rng.Int63()))
+	if err := tx.Put(makeVersionKey(key), version); err != nil {
+		return err
+	}
+	return tx.Put(makeAtVersionKey(key, version), value)
+}
+
+func deleteVersioned(tx store.Transaction, key []byte) error {
+	return tx.Delete(makeVersionKey(key))
+}
+
+func checkTransactionOrSnapshot(st store.StoreReader) {
+	_, isTransaction := st.(store.Transaction)
+	_, isSnapshot := st.(store.Snapshot)
+	if !isTransaction && !isSnapshot {
+		panic("neither a Transaction nor a Snapshot")
+	}
+}
+
+func join(parts ...string) string {
+	return util.JoinKeyParts(parts...)
+}
+
+func split(key string) []string {
+	return util.SplitKeyParts(key)
+}
diff --git a/services/syncbase/store/constants.go b/services/syncbase/store/constants.go
new file mode 100644
index 0000000..f97f2fd
--- /dev/null
+++ b/services/syncbase/store/constants.go
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// TODO(sadovsky): Maybe define verrors for these.
+const (
+	ErrMsgClosedStore    = "closed store"
+	ErrMsgClosedSnapshot = "closed snapshot"
+	ErrMsgCanceledStream = "canceled stream"
+	ErrMsgCommittedTxn   = "already called commit"
+	ErrMsgAbortedTxn     = "already called abort"
+	ErrMsgExpiredTxn     = "expired transaction"
+)
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index e163833..62250c6 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -90,7 +90,7 @@
 	d.readOptions = nil
 	C.leveldb_writeoptions_destroy(d.writeOptions)
 	d.writeOptions = nil
-	d.err = verror.New(verror.ErrCanceled, nil, "closed store")
+	d.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
 	return nil
 }
 
@@ -161,7 +161,7 @@
 	return newSnapshot(d, d.node)
 }
 
-// write writes a batch and adds all written keys to  txTable.
+// write writes a batch and adds all written keys to txTable.
 // TODO(rogulenko): remove this method.
 func (d *db) write(batch []writeOp, cOpts *C.leveldb_writeoptions_t) error {
 	d.txmu.Lock()
@@ -203,7 +203,7 @@
 	return nil
 }
 
-// trackBatch writes the batch to txTable, adds a commit event to txEvents.
+// trackBatch writes the batch to txTable and adds a commit event to txEvents.
 func (d *db) trackBatch(batch []writeOp) {
 	// TODO(rogulenko): do GC.
 	d.txSequenceNumber++
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 9a4ad6e..4bf59b6 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -56,7 +56,7 @@
 	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
 	s.cSnapshot = nil
-	s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
 	return nil
 }
 
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index abf230e..2840e5d 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -124,14 +124,14 @@
 	if s.err != nil {
 		return
 	}
-	// s.hasValue might be false if Advance was never called.
+	// s.hasValue will be false if Advance has never been called.
 	if s.hasValue {
 		// We copy the key and the value from the C heap to the Go heap before
 		// deallocating the C iterator.
 		s.key = store.CopyBytes(nil, s.cKey())
 		s.value = store.CopyBytes(nil, s.cVal())
 	}
-	s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
 	s.destroyLeveldbIter()
 }
 
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index e7a1b8a..308b4da 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -181,9 +181,10 @@
 	if tx.err != nil {
 		return store.WrapError(tx.err)
 	}
+	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
 	// Explicitly remove this transaction from the event queue. If this was the
 	// only active transaction, the event queue becomes empty and writeLocked will
-	// not add the write set of this transaction to the txTable.
+	// not add this transaction's write set to txTable.
 	tx.removeEvent()
 	defer tx.close()
 	tx.d.txmu.Lock()
@@ -191,14 +192,7 @@
 	if !tx.validateReadSet() {
 		return store.NewErrConcurrentTransaction(nil)
 	}
-	if err := tx.d.writeLocked(tx.writes, tx.cOpts); err != nil {
-		// Once Commit() has failed, subsequent ops on the transaction will fail
-		// with the following error.
-		tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction")
-		return err
-	}
-	tx.err = verror.New(verror.ErrBadState, nil, "committed transaction")
-	return nil
+	return tx.d.writeLocked(tx.writes, tx.cOpts)
 }
 
 // Abort implements the store.Transaction interface.
@@ -208,7 +202,7 @@
 	if tx.err != nil {
 		return store.WrapError(tx.err)
 	}
-	tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
+	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
 	tx.close()
 	return nil
 }
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index cd39f77..53bd6ef 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -22,7 +22,7 @@
 
 // Assumes st lock is held.
 func newSnapshot(st *memstore, parent *store.ResourceNode) *snapshot {
-	dataCopy := map[string][]byte{}
+	dataCopy := make(map[string][]byte, len(st.data))
 	for k, v := range st.data {
 		dataCopy[k] = v
 	}
@@ -44,7 +44,7 @@
 		return store.WrapError(s.err)
 	}
 	s.node.Close()
-	s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
 	return nil
 }
 
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index a3e98b1..d24d5d1 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -42,7 +42,7 @@
 		return store.WrapError(st.err)
 	}
 	st.node.Close()
-	st.err = verror.New(verror.ErrCanceled, nil, "closed store")
+	st.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
 	return nil
 }
 
@@ -67,6 +67,7 @@
 	if st.err != nil {
 		return &store.InvalidStream{st.err}
 	}
+	// TODO(sadovsky): Close snapshot once stream is closed or canceled.
 	return newSnapshot(st, st.node).Scan(start, limit)
 }
 
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index af4c693..f7e5945 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -96,5 +96,5 @@
 		return
 	}
 	s.node.Close()
-	s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
 }
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 81b51d0..f7e99c4 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -35,11 +35,10 @@
 
 func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
 	node := store.NewResourceNode()
-	sn := newSnapshot(st, node)
 	tx := &transaction{
 		node:        node,
 		st:          st,
-		sn:          sn,
+		sn:          newSnapshot(st, node),
 		seq:         seq,
 		createdTime: time.Now(),
 		puts:        map[string][]byte{},
@@ -60,7 +59,7 @@
 		return store.WrapError(tx.err)
 	}
 	if tx.expired() {
-		return verror.New(verror.ErrBadState, nil, "expired transaction")
+		return verror.New(verror.ErrBadState, nil, store.ErrMsgExpiredTxn)
 	}
 	return nil
 }
@@ -116,16 +115,13 @@
 	if err := tx.error(); err != nil {
 		return err
 	}
+	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
 	tx.node.Close()
 	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+	defer tx.st.mu.Unlock()
 	if tx.seq <= tx.st.lastCommitSeq {
-		// Once Commit() has failed with store.ErrConcurrentTransaction, subsequent
-		// ops on the transaction will fail with the following error.
-		tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction")
 		return store.NewErrConcurrentTransaction(nil)
 	}
-	tx.err = verror.New(verror.ErrBadState, nil, "committed transaction")
 	for k, v := range tx.puts {
 		tx.st.data[k] = v
 	}
@@ -144,6 +140,6 @@
 		return err
 	}
 	tx.node.Close()
-	tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
+	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
 	return nil
 }
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
index 0c65de7..4dbee5f 100644
--- a/services/syncbase/store/test/snapshot.go
+++ b/services/syncbase/store/test/snapshot.go
@@ -30,13 +30,13 @@
 	if err := snapshot.Close(); err != nil {
 		t.Fatalf("can't close the snapshot: %v", err)
 	}
-	expectedErr := "closed snapshot"
-	verifyError(t, snapshot.Close(), expectedErr, verror.ErrCanceled.ID)
+	expectedErrMsg := store.ErrMsgClosedSnapshot
+	verifyError(t, snapshot.Close(), verror.ErrCanceled.ID, expectedErrMsg)
 
 	_, err := snapshot.Get(key1, nil)
-	verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
 
 	s = snapshot.Scan([]byte("a"), []byte("z"))
 	verifyAdvance(t, s, nil, nil)
-	verifyError(t, s.Err(), expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, s.Err(), verror.ErrCanceled.ID, expectedErrMsg)
 }
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
index 784c545..2d6d9a1 100644
--- a/services/syncbase/store/test/store.go
+++ b/services/syncbase/store/test/store.go
@@ -114,14 +114,13 @@
 		if step.key < 0 || step.key >= s.size {
 			t.Fatalf("invalid test step %v", step)
 		}
+		key := fmt.Sprintf("%05d", step.key)
 		switch step.op {
 		case Put:
-			key := fmt.Sprintf("%05d", step.key)
 			value := randomBytes(s.rnd, 100)
 			s.memtable[key] = value
 			st.Put([]byte(key), value)
 		case Delete:
-			key := fmt.Sprintf("%05d", step.key)
 			if _, ok := s.memtable[key]; ok {
 				delete(s.memtable, key)
 				st.Delete([]byte(key))
@@ -158,12 +157,12 @@
 // writes and snapshots.
 func RunReadWriteRandomTest(t *testing.T, st store.Store) {
 	rnd := rand.New(rand.NewSource(239017))
-	var testcase []testStep
+	var steps []testStep
 	size := 50
 	for i := 0; i < 10000; i++ {
-		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
+		steps = append(steps, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
 	}
-	runReadWriteTest(t, st, size, testcase)
+	runReadWriteTest(t, st, size, steps)
 }
 
 // RunStoreStateTest verifies operations that modify the state of a store.Store.
@@ -183,25 +182,25 @@
 	if err := st.Close(); err != nil {
 		t.Fatalf("can't close the store: %v", err)
 	}
-	expectedErr := "closed store"
-	verifyError(t, st.Close(), expectedErr, verror.ErrCanceled.ID)
+	expectedErrMsg := store.ErrMsgClosedStore
+	verifyError(t, st.Close(), verror.ErrCanceled.ID, expectedErrMsg)
 
 	s = st.Scan([]byte("a"), []byte("z"))
 	verifyAdvance(t, s, nil, nil)
-	verifyError(t, s.Err(), expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, s.Err(), verror.ErrCanceled.ID, expectedErrMsg)
 
 	snapshot := st.NewSnapshot()
 	_, err := snapshot.Get(key1, nil)
-	verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
 
 	tx := st.NewTransaction()
 	_, err = tx.Get(key1, nil)
-	verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
 
 	_, err = st.Get(key1, nil)
-	verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
-	verifyError(t, st.Put(key1, value1), expectedErr, verror.ErrCanceled.ID)
-	verifyError(t, st.Delete(key1), expectedErr, verror.ErrCanceled.ID)
+	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
+	verifyError(t, st.Put(key1, value1), verror.ErrCanceled.ID, expectedErrMsg)
+	verifyError(t, st.Delete(key1), verror.ErrCanceled.ID, expectedErrMsg)
 }
 
 // RunCloseTest verifies that child objects are closed when the parent object is
@@ -227,14 +226,14 @@
 	st.Close()
 
 	for _, stream := range streams {
-		verifyError(t, stream.Err(), "canceled stream", verror.ErrCanceled.ID)
+		verifyError(t, stream.Err(), verror.ErrCanceled.ID, store.ErrMsgCanceledStream)
 	}
 	for _, snapshot := range snapshots {
 		_, err := snapshot.Get(key1, nil)
-		verifyError(t, err, "closed snapshot", verror.ErrCanceled.ID)
+		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgClosedSnapshot)
 	}
 	for _, tx := range transactions {
 		_, err := tx.Get(key1, nil)
-		verifyError(t, err, "aborted transaction", verror.ErrCanceled.ID)
+		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgAbortedTxn)
 	}
 }
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 28e6e32..98becff 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -37,5 +37,5 @@
 		}
 	}
 	verifyAdvance(t, s, nil, nil)
-	verifyError(t, s.Err(), "canceled stream", verror.ErrCanceled.ID)
+	verifyError(t, s.Err(), verror.ErrCanceled.ID, store.ErrMsgCanceledStream)
 }
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
index 6aa5861..80581a1 100644
--- a/services/syncbase/store/test/transaction.go
+++ b/services/syncbase/store/test/transaction.go
@@ -16,24 +16,24 @@
 	"v.io/v23/verror"
 )
 
-// RunTransactionTest verifies operations that modify the state of a
+// RunTransactionStateTest verifies operations that modify the state of a
 // store.Transaction.
 func RunTransactionStateTest(t *testing.T, st store.Store) {
-	abortFunctions := []func(t *testing.T, tx store.Transaction) (string, verror.ID){
-		func(t *testing.T, tx store.Transaction) (string, verror.ID) {
+	finalizeFns := []func(t *testing.T, tx store.Transaction) (verror.ID, string){
+		func(t *testing.T, tx store.Transaction) (verror.ID, string) {
 			if err := tx.Abort(); err != nil {
 				Fatalf(t, "can't abort the transaction: %v", err)
 			}
-			return "aborted transaction", verror.ErrCanceled.ID
+			return verror.ErrCanceled.ID, store.ErrMsgAbortedTxn
 		},
-		func(t *testing.T, tx store.Transaction) (string, verror.ID) {
+		func(t *testing.T, tx store.Transaction) (verror.ID, string) {
 			if err := tx.Commit(); err != nil {
 				Fatalf(t, "can't commit the transaction: %v", err)
 			}
-			return "committed transaction", verror.ErrBadState.ID
+			return verror.ErrBadState.ID, store.ErrMsgCommittedTxn
 		},
 	}
-	for _, fn := range abortFunctions {
+	for _, fn := range finalizeFns {
 		key1, value1 := []byte("key1"), []byte("value1")
 		st.Put(key1, value1)
 		key2 := []byte("key2")
@@ -46,19 +46,19 @@
 		verifyAdvance(t, s, key1, value1)
 		verifyAdvance(t, s, nil, nil)
 
-		// Test functions after Abort.
-		expectedErr, expectedID := fn(t, tx)
-		verifyError(t, tx.Abort(), expectedErr, expectedID)
-		verifyError(t, tx.Commit(), expectedErr, expectedID)
+		// Test functions after finalize.
+		expectedID, expectedErrMsg := fn(t, tx)
+		verifyError(t, tx.Abort(), expectedID, expectedErrMsg)
+		verifyError(t, tx.Commit(), expectedID, expectedErrMsg)
 
 		s = tx.Scan([]byte("a"), []byte("z"))
 		verifyAdvance(t, s, nil, nil)
-		verifyError(t, s.Err(), expectedErr, expectedID)
+		verifyError(t, s.Err(), expectedID, expectedErrMsg)
 
 		_, err := tx.Get(key1, nil)
-		verifyError(t, err, expectedErr, expectedID)
-		verifyError(t, tx.Put(key1, value1), expectedErr, expectedID)
-		verifyError(t, tx.Delete(key1), expectedErr, expectedID)
+		verifyError(t, err, expectedID, expectedErrMsg)
+		verifyError(t, tx.Put(key1, value1), expectedID, expectedErrMsg)
+		verifyError(t, tx.Delete(key1), expectedID, expectedErrMsg)
 	}
 }
 
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
index 296a678..55b886a 100644
--- a/services/syncbase/store/test/util.go
+++ b/services/syncbase/store/test/util.go
@@ -28,7 +28,7 @@
 		}
 	} else {
 		valbuf, err = st.Get(key, valbuf)
-		verifyError(t, err, string(key), store.ErrUnknownKey.ID)
+		verifyError(t, err, store.ErrUnknownKey.ID, string(key))
 		valcopy := []byte("tmp")
 		// Verify that valbuf is not modified if the key is not found.
 		if !bytes.Equal(valbuf, valcopy) {
@@ -61,12 +61,15 @@
 	}
 }
 
-func verifyError(t *testing.T, err error, substr string, errorID verror.ID) {
+// verifyError verifies that the given error has the given errorID and that the
+// error string contains the given substr. Pass an empty substr to skip the
+// substr check.
+func verifyError(t *testing.T, err error, errorID verror.ID, substr string) {
 	if got := verror.ErrorID(err); got != errorID {
 		Fatalf(t, "unexpected error ID: got %v, want %v", got, errorID)
 	}
 	if !strings.Contains(err.Error(), substr) {
-		Fatalf(t, "unexpected error: got %v, want %v", err, substr)
+		Fatalf(t, "unexpected error: %q not found in %q", substr, err)
 	}
 }
 
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index 3d0867a..eee882a 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -46,10 +46,10 @@
 	return dst
 }
 
-// WrapError wraps the given err with a verror. It is a no-op if the err is nil.
-// The returned error contains the current stack trace, but retains the input
-// error's IDAction pair. If the given error is not a verror, the IDAction pair
-// of the returned error will be (ErrUnknown.ID, NoRetry).
+// WrapError wraps the given error with a verror. It is a no-op if the given
+// error is nil. The returned error contains the current stack trace, but
+// retains the input error's IDAction pair. If the given error is not a verror,
+// the IDAction pair of the returned error will be (ErrUnknown.ID, NoRetry).
 func WrapError(err error) error {
 	if err == nil {
 		return nil
diff --git a/services/syncbase/store/watchable/store.go b/services/syncbase/store/watchable/store.go
deleted file mode 100644
index e4fd0ee..0000000
--- a/services/syncbase/store/watchable/store.go
+++ /dev/null
@@ -1,197 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package watchable provides a store.Store that maintains a commit log. In
-// Syncbase, this log forms the basis for the implementation of client-facing
-// watch as well as the sync module's watching of store commits.
-//
-// Log entries are keyed in reverse chronological order. More specifically, the
-// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
-// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
-// the lexicographic order matches the numeric order. Thus, clients implementing
-// ResumeMarkers (i.e. implementing the watch API) should use
-// fmt.Sprintf("%020d", MaxUint64-marker) to convert external markers to
-// internal LogEntry key prefixes.
-package watchable
-
-// TODO(sadovsky): Write unit tests. (As of 2015-05-26 we're still iterating on
-// the design for how to expose a "watch" API from the storage engine, and we
-// don't want to write lots of tests prematurely.)
-// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
-// TODO(sadovsky): Allow clients to subscribe via Go channel.
-
-import (
-	"fmt"
-	"strconv"
-	"strings"
-	"sync"
-
-	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
-	"v.io/v23/vom"
-)
-
-const (
-	LogPrefix        = "$log"
-	MaxUint16 uint64 = 1<<16 - 1 // 5 digits
-	MaxUint64 uint64 = 1<<64 - 1 // 20 digits
-)
-
-// Store is a store.Store that maintains a commit log.
-type Store store.Store
-
-// Wrap returns a watchable.Store that wraps the given store.Store.
-func Wrap(st store.Store) (Store, error) {
-	it := st.Scan([]byte(LogPrefix), []byte(""))
-	var seq uint64 = 0
-	for it.Advance() {
-		key := string(it.Key(nil))
-		parts := split(key)
-		if len(parts) != 3 {
-			panic("wrong number of parts: " + key)
-		}
-		invSeq, err := strconv.ParseUint(parts[1], 10, 64)
-		if err != nil {
-			panic("failed to parse invSeq: " + key)
-		}
-		seq = MaxUint64 - invSeq
-		it.Cancel()
-	}
-	if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
-		return nil, err
-	}
-	return &wstore{Store: st, seq: seq}, nil
-}
-
-type wstore struct {
-	store.Store
-	mu  sync.Mutex // held during transaction commits; protects seq
-	seq uint64     // sequence number, for commits
-}
-
-type transaction struct {
-	store.Transaction
-	st  *wstore
-	mu  sync.Mutex // protects the fields below
-	ops []Op
-}
-
-var (
-	_ Store             = (*wstore)(nil)
-	_ store.Transaction = (*transaction)(nil)
-)
-
-// TODO(sadovsky): Decide whether to copy []bytes vs. requiring clients not to
-// modify passed-in []bytes.
-
-func (st *wstore) Put(key, value []byte) error {
-	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
-	})
-}
-
-func (st *wstore) Delete(key []byte) error {
-	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
-	})
-}
-
-func (st *wstore) NewTransaction() store.Transaction {
-	return &transaction{Transaction: st.Store.NewTransaction(), st: st}
-}
-
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	valbuf, err := tx.Transaction.Get(key, valbuf)
-	if err == nil || verror.ErrorID(err) == store.ErrUnknownKey.ID {
-		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
-	}
-	return valbuf, err
-}
-
-func (tx *transaction) Scan(start, limit []byte) store.Stream {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	it := tx.Transaction.Scan(start, limit)
-	if it.Err() == nil {
-		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
-	}
-	return it
-}
-
-func (tx *transaction) Put(key, value []byte) error {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	err := tx.Transaction.Put(key, value)
-	if err == nil {
-		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
-	}
-	return err
-}
-
-func (tx *transaction) Delete(key []byte) error {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	err := tx.Transaction.Delete(key)
-	if err == nil {
-		tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
-	}
-	return err
-}
-
-func (tx *transaction) Commit() error {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	// Check sequence numbers.
-	if uint64(len(tx.ops)) > MaxUint16 {
-		return verror.New(verror.ErrInternal, nil, "too many ops")
-	}
-	if tx.st.seq == MaxUint64 {
-		return verror.New(verror.ErrInternal, nil, "seq maxed out")
-	}
-	// Write LogEntry records.
-	// TODO(sadovsky): Use a more efficient lexicographic number encoding.
-	keyPrefix := join(LogPrefix, fmt.Sprintf("%020d", MaxUint64-tx.st.seq))
-	for txSeq, op := range tx.ops {
-		key := join(keyPrefix, fmt.Sprintf("%05d", MaxUint16-uint64(txSeq)))
-		value := &LogEntry{
-			Op: op,
-			// TODO(sadovsky): This information is also captured in LogEntry keys.
-			// Optimize to avoid redundancy.
-			Continued: txSeq < len(tx.ops)-1,
-		}
-		if err := put(tx.Transaction, key, value); err != nil {
-			return err
-		}
-	}
-	if err := tx.Transaction.Commit(); err != nil {
-		return err
-	}
-	tx.st.seq++
-	return nil
-}
-
-////////////////////////////////////////
-// Internal helpers
-
-func join(parts ...string) string {
-	return strings.Join(parts, ":")
-}
-
-func split(key string) []string {
-	return strings.Split(key, ":")
-}
-
-func put(st store.StoreWriter, k string, v interface{}) error {
-	bytes, err := vom.Encode(v)
-	if err != nil {
-		return err
-	}
-	return st.Put([]byte(k), bytes)
-}
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index 35f935a..44865a3 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -22,7 +22,6 @@
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
-// TODO(sadovsky): Perhaps this should be one of the standard Veyron flags.
 var (
 	name = flag.String("name", "", "Name to mount at.")
 )