syncbase: watchable store with versioning support

Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 2151b1a..463c498 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -89,7 +89,8 @@
 }
 
 func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	// In the future this API should be replaced by one that streams the database names.
+	// In the future this API will likely be replaced by one that streams the
+	// database names.
 	a.mu.Lock()
 	defer a.mu.Unlock()
 	dbNames := make([]string, 0, len(a.dbs))
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index c69af62..5a44625 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -8,9 +8,9 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
-	"v.io/syncbase/x/ref/services/syncbase/store/watchable"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
@@ -39,7 +39,9 @@
 		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
 	// TODO(sadovsky): Make storage engine pluggable.
-	st, err := watchable.Wrap(memstore.New())
+	st, err := watchable.Wrap(memstore.New(), &watchable.Options{
+		ManagedPrefixes: []string{},
+	})
 	if err != nil {
 		return nil, err
 	}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index ce2c63e..c2a283d 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -115,7 +115,8 @@
 }
 
 func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	// In the future this API should be replaced by one that streams the app names.
+	// In the future this API will likely be replaced by one that streams the app
+	// names.
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	appNames := make([]string, 0, len(s.apps))
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 815ef82..88e1115 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -5,17 +5,22 @@
 package util
 
 // TODO(sadovsky): Consider using shorter strings.
+
+// Constants related to storage engine keys.
 const (
-	ServicePrefix  = "$service"
 	AppPrefix      = "$app"
-	DbInfoPrefix   = "$dbInfo"
 	DatabasePrefix = "$database"
-	TablePrefix    = "$table"
+	DbInfoPrefix   = "$dbInfo"
+	LogPrefix      = "$log"
 	RowPrefix      = "$row"
+	ServicePrefix  = "$service"
 	SyncPrefix     = "$sync"
+	TablePrefix    = "$table"
+	VersionPrefix  = "$version"
 )
 
+// Constants related to object names.
 const (
-	// Service object name suffix for Syncbase internal communication.
+	// Service object name suffix for Syncbase-to-Syncbase RPCs.
 	SyncbaseSuffix = "$internal"
 )
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 0feb974..87a204e 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -54,9 +54,10 @@
 }
 
 // Takes ownership of sn.
-// TODO(sadovsky): It sucks that Glob must be implemented differently from other
-// streaming RPC handlers. I don't have much confidence that I've implemented
-// both types of streaming correctly.
+// TODO(sadovsky): Why do we make developers implement Glob differently from
+// other streaming RPCs? It's confusing that Glob must return immediately and
+// write its results to a channel, while other streaming RPC handlers must block
+// and write their results to the output stream. See nlacasse's TODO below, too.
 func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
new file mode 100644
index 0000000..ac3694f
--- /dev/null
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type snapshot struct {
+	isn store.Snapshot
+	st  *wstore
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(st *wstore) *snapshot {
+	return &snapshot{
+		isn: st.ist.NewSnapshot(),
+		st:  st,
+	}
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+	return s.isn.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+	if !s.st.managesKey(key) {
+		return s.isn.Get(key, valbuf)
+	}
+	return getVersioned(s.isn, key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
+	if !s.st.managesRange(start, limit) {
+		return s.isn.Scan(start, limit)
+	}
+	return newStreamVersioned(s.isn, start, limit)
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
new file mode 100644
index 0000000..ed8e15f
--- /dev/null
+++ b/services/syncbase/server/watchable/store.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a Syncbase-specific store.Store wrapper that
+// provides versioned storage for specified prefixes and maintains a watchable
+// log of operations performed on versioned records. This log forms the basis
+// for the implementation of client-facing watch as well as the sync module's
+// internal watching of store updates.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%016x", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+//
+// Version number records are stored with keys of the form "$version:<key>",
+// where <key> is the client-specified key.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+
+	pubutil "v.io/syncbase/v23/syncbase/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+const (
+	MaxUint16 uint64 = 1<<16 - 1 // 0xffff
+	MaxUint64 uint64 = 1<<64 - 1 // 0xffffffffffffffff
+)
+
+// Store is a store.Store that provides versioned storage and a watchable oplog.
+// TODO(sadovsky): Extend interface.
+type Store interface {
+	store.Store
+}
+
+// Options configures a watchable.Store.
+type Options struct {
+	// Key prefixes to version and log. If nil, all keys are managed.
+	ManagedPrefixes []string
+}
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store, opts *Options) (Store, error) {
+	// Determine initial value for seq.
+	var seq uint64 = 0
+	it := st.Scan([]byte(util.LogPrefix), []byte(""))
+	for it.Advance() {
+		key := string(it.Key(nil))
+		parts := split(key)
+		if len(parts) != 3 {
+			panic("wrong number of parts: " + key)
+		}
+		invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+		if err != nil {
+			panic("failed to parse invSeq: " + key)
+		}
+		seq = MaxUint64 - invSeq
+		it.Cancel()
+	}
+	if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+		return nil, err
+	}
+	return &wstore{ist: st, opts: opts, seq: seq}, nil
+}
+
+type wstore struct {
+	ist  store.Store
+	opts *Options
+	mu   sync.Mutex // held during transaction commits; protects seq
+	seq  uint64     // sequence number, for commits
+}
+
+var _ Store = (*wstore)(nil)
+
+// TODO(sadovsky): Decide whether to copy []byte's vs. requiring clients not to
+// modify passed-in []byte's. (In fact, this should be spelled out in the
+// store.Store API contract.)
+
+// Close implements the store.Store interface.
+func (st *wstore) Close() error {
+	return st.ist.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (st *wstore) Get(key, valbuf []byte) ([]byte, error) {
+	if !st.managesKey(key) {
+		return st.ist.Get(key, valbuf)
+	}
+	sn := newSnapshot(st)
+	defer sn.Close()
+	return sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *wstore) Scan(start, limit []byte) store.Stream {
+	if !st.managesRange(start, limit) {
+		return st.ist.Scan(start, limit)
+	}
+	// TODO(sadovsky): Close snapshot once stream is finished or canceled.
+	return newSnapshot(st).Scan(start, limit)
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *wstore) Put(key, value []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Put(key, value)
+	})
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *wstore) Delete(key []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Delete(key)
+	})
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *wstore) NewTransaction() store.Transaction {
+	return newTransaction(st)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *wstore) NewSnapshot() store.Snapshot {
+	return newSnapshot(st)
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (st *wstore) managesKey(key []byte) bool {
+	if st.opts.ManagedPrefixes == nil {
+		return true
+	}
+	ikey := string(key)
+	// TODO(sadovsky): Optimize, e.g. use binary search (here and below).
+	for _, p := range st.opts.ManagedPrefixes {
+		if strings.HasPrefix(ikey, p) {
+			return true
+		}
+	}
+	return false
+}
+
+func (st *wstore) managesRange(start, limit []byte) bool {
+	if st.opts.ManagedPrefixes == nil {
+		return true
+	}
+	istart, ilimit := string(start), string(limit)
+	for _, p := range st.opts.ManagedPrefixes {
+		pstart, plimit := pubutil.PrefixRangeStart(p), pubutil.PrefixRangeLimit(p)
+		if pstart <= istart && ilimit <= plimit {
+			return true
+		}
+		if !(plimit <= istart || ilimit <= pstart) {
+			// If this happens, there's a bug in the Syncbase server implementation.
+			panic(fmt.Sprintf("partial overlap: %q %q %q", p, start, limit))
+		}
+	}
+	return false
+}
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
new file mode 100644
index 0000000..18b8e4d
--- /dev/null
+++ b/services/syncbase/server/watchable/store_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"io/ioutil"
+	"runtime"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+	"v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+	runtime.GOMAXPROCS(10)
+}
+
+func TestStream(t *testing.T) {
+	runTest(t, []string{}, test.RunStreamTest)
+	runTest(t, nil, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, []string{}, test.RunSnapshotTest)
+	runTest(t, nil, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	runTest(t, []string{}, test.RunStoreStateTest)
+	runTest(t, nil, test.RunStoreStateTest)
+}
+
+func TestClose(t *testing.T) {
+	runTest(t, []string{}, test.RunCloseTest)
+	runTest(t, nil, test.RunCloseTest)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+	runTest(t, []string{}, test.RunReadWriteBasicTest)
+	runTest(t, nil, test.RunReadWriteBasicTest)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+	runTest(t, []string{}, test.RunReadWriteRandomTest)
+	runTest(t, nil, test.RunReadWriteRandomTest)
+}
+
+func TestConcurrentTransactions(t *testing.T) {
+	runTest(t, []string{}, test.RunConcurrentTransactionsTest)
+	runTest(t, nil, test.RunConcurrentTransactionsTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, []string{}, test.RunTransactionStateTest)
+	runTest(t, nil, test.RunTransactionStateTest)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+	runTest(t, []string{}, test.RunTransactionsWithGetTest)
+	runTest(t, nil, test.RunTransactionsWithGetTest)
+}
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstore = false
+
+func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
+	var st store.Store
+	if useMemstore {
+		st = memstore.New()
+	} else {
+		var dbPath string
+		st, dbPath = newLevelDB()
+		defer destroyLevelDB(st, dbPath)
+	}
+	st, err := Wrap(st, &Options{ManagedPrefixes: mp})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer st.Close()
+	f(t, st)
+}
+
+func newLevelDB() (store.Store, string) {
+	path, err := ioutil.TempDir("", "syncbase_leveldb")
+	if err != nil {
+		panic(fmt.Sprintf("can't create temp dir: %v", err))
+	}
+	st, err := leveldb.Open(path)
+	if err != nil {
+		panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+	}
+	return st, path
+}
+
+func destroyLevelDB(st store.Store, path string) {
+	st.Close()
+	if err := leveldb.Destroy(path); err != nil {
+		panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+	}
+}
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
new file mode 100644
index 0000000..cd41835
--- /dev/null
+++ b/services/syncbase/server/watchable/stream.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// stream streams keys and values for versioned records.
+type stream struct {
+	iit      store.Stream
+	st       store.StoreReader
+	mu       sync.Mutex
+	err      error
+	hasValue bool
+	key      []byte
+	value    []byte
+}
+
+var _ store.Stream = (*stream)(nil)
+
+// newStreamVersioned creates a new stream. It assumes all records in range
+// [start, limit) are managed, i.e. versioned.
+func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
+	checkTransactionOrSnapshot(st)
+	return &stream{
+		iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
+		st:  st,
+	}
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.hasValue = false
+	if s.err != nil {
+		return false
+	}
+	if advanced := s.iit.Advance(); !advanced {
+		return false
+	}
+	versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
+	s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
+	s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+	if s.err != nil {
+		return false
+	}
+	s.hasValue = true
+	return true
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(keybuf, s.key)
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(valbuf, s.value)
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return store.WrapError(s.err)
+	}
+	return s.iit.Err()
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return
+	}
+	s.iit.Cancel()
+}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+type transaction struct {
+	itx store.Transaction
+	st  *wstore
+	mu  sync.Mutex // protects the fields below
+	err error
+	ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+	return &transaction{
+		itx: st.ist.NewTransaction(),
+		st:  st,
+	}
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return valbuf, store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		valbuf, err = tx.itx.Get(key, valbuf)
+	} else {
+		valbuf, err = getVersioned(tx.itx, key, valbuf)
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+	}
+	return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return &store.InvalidStream{tx.err}
+	}
+	var it store.Stream
+	if !tx.st.managesRange(start, limit) {
+		it = tx.itx.Scan(start, limit)
+	} else {
+		it = newStreamVersioned(tx.itx, start, limit)
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+	}
+	return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Put(key, value)
+	} else {
+		err = putVersioned(tx.itx, key, value)
+		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+	}
+	return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Delete(key)
+	} else {
+		err = deleteVersioned(tx.itx, key)
+		tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	}
+	return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	// Check sequence numbers.
+	if uint64(len(tx.ops)) > MaxUint16 {
+		return verror.New(verror.ErrInternal, nil, "too many ops")
+	}
+	if tx.st.seq == MaxUint64 {
+		return verror.New(verror.ErrInternal, nil, "seq maxed out")
+	}
+	// Write LogEntry records.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+	for txSeq, op := range tx.ops {
+		key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+		value := &LogEntry{
+			Op: op,
+			// TODO(sadovsky): This information is also captured in LogEntry keys.
+			// Optimize to avoid redundancy.
+			Continued: txSeq < len(tx.ops)-1,
+		}
+		if err := util.PutObject(tx.itx, key, value); err != nil {
+			return err
+		}
+	}
+	if err := tx.itx.Commit(); err != nil {
+		return err
+	}
+	tx.st.seq++
+	return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+	return tx.itx.Abort()
+}
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
new file mode 100644
index 0000000..7a8f629
--- /dev/null
+++ b/services/syncbase/server/watchable/types.vdl
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// GetOp represents a store get operation.
+type GetOp struct {
+	Key []byte
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+	Start []byte
+	Limit []byte
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+	Key   []byte
+	Value []byte
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+	Key []byte
+}
+
+// Op represents a store operation.
+type Op union {
+	Get    GetOp
+	Scan   ScanOp
+	Put    PutOp
+	Delete DeleteOp
+}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+	// The store operation that was performed.
+	Op Op
+
+	// If true, this entry is followed by more entries that belong to the same
+	// commit as this entry.
+	Continued bool
+}
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
new file mode 100644
index 0000000..12b8e0b
--- /dev/null
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -0,0 +1,136 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package watchable
+
+import (
+	// VDL system imports
+	"v.io/v23/vdl"
+)
+
+// GetOp represents a store get operation.
+type GetOp struct {
+	Key []byte
+}
+
+func (GetOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.GetOp"`
+}) {
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+	Start []byte
+	Limit []byte
+}
+
+func (ScanOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.ScanOp"`
+}) {
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+	Key   []byte
+	Value []byte
+}
+
+func (PutOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.PutOp"`
+}) {
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+	Key []byte
+}
+
+func (DeleteOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.DeleteOp"`
+}) {
+}
+
+type (
+	// Op represents any single field of the Op union type.
+	//
+	// Op represents a store operation.
+	Op interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the Op union type.
+		__VDLReflect(__OpReflect)
+	}
+	// OpGet represents field Get of the Op union type.
+	OpGet struct{ Value GetOp }
+	// OpScan represents field Scan of the Op union type.
+	OpScan struct{ Value ScanOp }
+	// OpPut represents field Put of the Op union type.
+	OpPut struct{ Value PutOp }
+	// OpDelete represents field Delete of the Op union type.
+	OpDelete struct{ Value DeleteOp }
+	// __OpReflect describes the Op union type.
+	__OpReflect struct {
+		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
+		Type  Op
+		Union struct {
+			Get    OpGet
+			Scan   OpScan
+			Put    OpPut
+			Delete OpDelete
+		}
+	}
+)
+
+func (x OpGet) Index() int               { return 0 }
+func (x OpGet) Interface() interface{}   { return x.Value }
+func (x OpGet) Name() string             { return "Get" }
+func (x OpGet) __VDLReflect(__OpReflect) {}
+
+func (x OpScan) Index() int               { return 1 }
+func (x OpScan) Interface() interface{}   { return x.Value }
+func (x OpScan) Name() string             { return "Scan" }
+func (x OpScan) __VDLReflect(__OpReflect) {}
+
+func (x OpPut) Index() int               { return 2 }
+func (x OpPut) Interface() interface{}   { return x.Value }
+func (x OpPut) Name() string             { return "Put" }
+func (x OpPut) __VDLReflect(__OpReflect) {}
+
+func (x OpDelete) Index() int               { return 3 }
+func (x OpDelete) Interface() interface{}   { return x.Value }
+func (x OpDelete) Name() string             { return "Delete" }
+func (x OpDelete) __VDLReflect(__OpReflect) {}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+	// The store operation that was performed.
+	Op Op
+	// If true, this entry is followed by more entries that belong to the same
+	// commit as this entry.
+	Continued bool
+}
+
+func (LogEntry) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.LogEntry"`
+}) {
+}
+
+func init() {
+	vdl.Register((*GetOp)(nil))
+	vdl.Register((*ScanOp)(nil))
+	vdl.Register((*PutOp)(nil))
+	vdl.Register((*DeleteOp)(nil))
+	vdl.Register((*Op)(nil))
+	vdl.Register((*LogEntry)(nil))
+}
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
new file mode 100644
index 0000000..cd37d3e
--- /dev/null
+++ b/services/syncbase/server/watchable/util.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
+// We should probably convert incoming strings to []byte's as early as possible,
+// and deal exclusively in []byte's internally.
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func makeVersionKey(key []byte) []byte {
+	return []byte(join(util.VersionPrefix, string(key)))
+}
+
+func makeAtVersionKey(key, version []byte) []byte {
+	return []byte(join(string(key), string(version)))
+}
+
+func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
+	return st.Get(makeVersionKey(key), nil)
+}
+
+func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+	return st.Get(makeAtVersionKey(key, version), valbuf)
+}
+
+func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
+	checkTransactionOrSnapshot(st)
+	version, err := getVersion(st, key)
+	if err != nil {
+		return valbuf, err
+	}
+	return getAtVersion(st, key, valbuf, version)
+}
+
+func putVersioned(tx store.Transaction, key, value []byte) error {
+	version := []byte(fmt.Sprintf("%x", rng.Int63()))
+	if err := tx.Put(makeVersionKey(key), version); err != nil {
+		return err
+	}
+	return tx.Put(makeAtVersionKey(key, version), value)
+}
+
+func deleteVersioned(tx store.Transaction, key []byte) error {
+	return tx.Delete(makeVersionKey(key))
+}
+
+func checkTransactionOrSnapshot(st store.StoreReader) {
+	_, isTransaction := st.(store.Transaction)
+	_, isSnapshot := st.(store.Snapshot)
+	if !isTransaction && !isSnapshot {
+		panic("neither a Transaction nor a Snapshot")
+	}
+}
+
+func join(parts ...string) string {
+	return util.JoinKeyParts(parts...)
+}
+
+func split(key string) []string {
+	return util.SplitKeyParts(key)
+}