syncbase: watchable store with versioning support
Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 2151b1a..463c498 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -89,7 +89,8 @@
}
func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- // In the future this API should be replaced by one that streams the database names.
+ // In the future this API will likely be replaced by one that streams the
+ // database names.
a.mu.Lock()
defer a.mu.Unlock()
dbNames := make([]string, 0, len(a.dbs))
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index c69af62..5a44625 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -8,9 +8,9 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
- "v.io/syncbase/x/ref/services/syncbase/store/watchable"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
@@ -39,7 +39,9 @@
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
// TODO(sadovsky): Make storage engine pluggable.
- st, err := watchable.Wrap(memstore.New())
+ st, err := watchable.Wrap(memstore.New(), &watchable.Options{
+ ManagedPrefixes: []string{},
+ })
if err != nil {
return nil, err
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index ce2c63e..c2a283d 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -115,7 +115,8 @@
}
func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- // In the future this API should be replaced by one that streams the app names.
+ // In the future this API will likely be replaced by one that streams the app
+ // names.
s.mu.Lock()
defer s.mu.Unlock()
appNames := make([]string, 0, len(s.apps))
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 815ef82..88e1115 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -5,17 +5,22 @@
package util
// TODO(sadovsky): Consider using shorter strings.
+
+// Constants related to storage engine keys.
const (
- ServicePrefix = "$service"
AppPrefix = "$app"
- DbInfoPrefix = "$dbInfo"
DatabasePrefix = "$database"
- TablePrefix = "$table"
+ DbInfoPrefix = "$dbInfo"
+ LogPrefix = "$log"
RowPrefix = "$row"
+ ServicePrefix = "$service"
SyncPrefix = "$sync"
+ TablePrefix = "$table"
+ VersionPrefix = "$version"
)
+// Constants related to object names.
const (
- // Service object name suffix for Syncbase internal communication.
+ // Service object name suffix for Syncbase-to-Syncbase RPCs.
SyncbaseSuffix = "$internal"
)
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 0feb974..87a204e 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -54,9 +54,10 @@
}
// Takes ownership of sn.
-// TODO(sadovsky): It sucks that Glob must be implemented differently from other
-// streaming RPC handlers. I don't have much confidence that I've implemented
-// both types of streaming correctly.
+// TODO(sadovsky): Why do we make developers implement Glob differently from
+// other streaming RPCs? It's confusing that Glob must return immediately and
+// write its results to a channel, while other streaming RPC handlers must block
+// and write their results to the output stream. See nlacasse's TODO below, too.
func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
new file mode 100644
index 0000000..ac3694f
--- /dev/null
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type snapshot struct {
+ isn store.Snapshot
+ st *wstore
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(st *wstore) *snapshot {
+ return &snapshot{
+ isn: st.ist.NewSnapshot(),
+ st: st,
+ }
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+ return s.isn.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ if !s.st.managesKey(key) {
+ return s.isn.Get(key, valbuf)
+ }
+ return getVersioned(s.isn, key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
+ if !s.st.managesRange(start, limit) {
+ return s.isn.Scan(start, limit)
+ }
+ return newStreamVersioned(s.isn, start, limit)
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
new file mode 100644
index 0000000..ed8e15f
--- /dev/null
+++ b/services/syncbase/server/watchable/store.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a Syncbase-specific store.Store wrapper that
+// provides versioned storage for specified prefixes and maintains a watchable
+// log of operations performed on versioned records. This log forms the basis
+// for the implementation of client-facing watch as well as the sync module's
+// internal watching of store updates.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%016x", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+//
+// Version number records are stored with keys of the form "$version:<key>",
+// where <key> is the client-specified key.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+
+ pubutil "v.io/syncbase/v23/syncbase/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+const (
+ MaxUint16 uint64 = 1<<16 - 1 // 0xffff
+ MaxUint64 uint64 = 1<<64 - 1 // 0xffffffffffffffff
+)
+
+// Store is a store.Store that provides versioned storage and a watchable oplog.
+// TODO(sadovsky): Extend interface.
+type Store interface {
+ store.Store
+}
+
+// Options configures a watchable.Store.
+type Options struct {
+ // Key prefixes to version and log. If nil, all keys are managed.
+ ManagedPrefixes []string
+}
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store, opts *Options) (Store, error) {
+ // Determine initial value for seq.
+ var seq uint64 = 0
+ it := st.Scan([]byte(util.LogPrefix), []byte(""))
+ for it.Advance() {
+ key := string(it.Key(nil))
+ parts := split(key)
+ if len(parts) != 3 {
+ panic("wrong number of parts: " + key)
+ }
+ invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+ if err != nil {
+ panic("failed to parse invSeq: " + key)
+ }
+ seq = MaxUint64 - invSeq
+ it.Cancel()
+ }
+ if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+ return nil, err
+ }
+ return &wstore{ist: st, opts: opts, seq: seq}, nil
+}
+
+type wstore struct {
+ ist store.Store
+ opts *Options
+ mu sync.Mutex // held during transaction commits; protects seq
+ seq uint64 // sequence number, for commits
+}
+
+var _ Store = (*wstore)(nil)
+
+// TODO(sadovsky): Decide whether to copy []byte's vs. requiring clients not to
+// modify passed-in []byte's. (In fact, this should be spelled out in the
+// store.Store API contract.)
+
+// Close implements the store.Store interface.
+func (st *wstore) Close() error {
+ return st.ist.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (st *wstore) Get(key, valbuf []byte) ([]byte, error) {
+ if !st.managesKey(key) {
+ return st.ist.Get(key, valbuf)
+ }
+ sn := newSnapshot(st)
+ defer sn.Close()
+ return sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *wstore) Scan(start, limit []byte) store.Stream {
+ if !st.managesRange(start, limit) {
+ return st.ist.Scan(start, limit)
+ }
+ // TODO(sadovsky): Close snapshot once stream is finished or canceled.
+ return newSnapshot(st).Scan(start, limit)
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *wstore) Put(key, value []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Put(key, value)
+ })
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *wstore) Delete(key []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Delete(key)
+ })
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *wstore) NewTransaction() store.Transaction {
+ return newTransaction(st)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *wstore) NewSnapshot() store.Snapshot {
+ return newSnapshot(st)
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (st *wstore) managesKey(key []byte) bool {
+ if st.opts.ManagedPrefixes == nil {
+ return true
+ }
+ ikey := string(key)
+ // TODO(sadovsky): Optimize, e.g. use binary search (here and below).
+ for _, p := range st.opts.ManagedPrefixes {
+ if strings.HasPrefix(ikey, p) {
+ return true
+ }
+ }
+ return false
+}
+
+func (st *wstore) managesRange(start, limit []byte) bool {
+ if st.opts.ManagedPrefixes == nil {
+ return true
+ }
+ istart, ilimit := string(start), string(limit)
+ for _, p := range st.opts.ManagedPrefixes {
+ pstart, plimit := pubutil.PrefixRangeStart(p), pubutil.PrefixRangeLimit(p)
+ if pstart <= istart && ilimit <= plimit {
+ return true
+ }
+ if !(plimit <= istart || ilimit <= pstart) {
+ // If this happens, there's a bug in the Syncbase server implementation.
+ panic(fmt.Sprintf("partial overlap: %q %q %q", p, start, limit))
+ }
+ }
+ return false
+}
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
new file mode 100644
index 0000000..18b8e4d
--- /dev/null
+++ b/services/syncbase/server/watchable/store_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "io/ioutil"
+ "runtime"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+ "v.io/syncbase/x/ref/services/syncbase/store/memstore"
+ "v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+ runtime.GOMAXPROCS(10)
+}
+
+func TestStream(t *testing.T) {
+ runTest(t, []string{}, test.RunStreamTest)
+ runTest(t, nil, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, []string{}, test.RunSnapshotTest)
+ runTest(t, nil, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ runTest(t, []string{}, test.RunStoreStateTest)
+ runTest(t, nil, test.RunStoreStateTest)
+}
+
+func TestClose(t *testing.T) {
+ runTest(t, []string{}, test.RunCloseTest)
+ runTest(t, nil, test.RunCloseTest)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+ runTest(t, []string{}, test.RunReadWriteBasicTest)
+ runTest(t, nil, test.RunReadWriteBasicTest)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+ runTest(t, []string{}, test.RunReadWriteRandomTest)
+ runTest(t, nil, test.RunReadWriteRandomTest)
+}
+
+func TestConcurrentTransactions(t *testing.T) {
+ runTest(t, []string{}, test.RunConcurrentTransactionsTest)
+ runTest(t, nil, test.RunConcurrentTransactionsTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, []string{}, test.RunTransactionStateTest)
+ runTest(t, nil, test.RunTransactionStateTest)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+ runTest(t, []string{}, test.RunTransactionsWithGetTest)
+ runTest(t, nil, test.RunTransactionsWithGetTest)
+}
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstore = false
+
+func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
+ var st store.Store
+ if useMemstore {
+ st = memstore.New()
+ } else {
+ var dbPath string
+ st, dbPath = newLevelDB()
+ defer destroyLevelDB(st, dbPath)
+ }
+ st, err := Wrap(st, &Options{ManagedPrefixes: mp})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer st.Close()
+ f(t, st)
+}
+
+func newLevelDB() (store.Store, string) {
+ path, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ panic(fmt.Sprintf("can't create temp dir: %v", err))
+ }
+ st, err := leveldb.Open(path)
+ if err != nil {
+ panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+ }
+ return st, path
+}
+
+func destroyLevelDB(st store.Store, path string) {
+ st.Close()
+ if err := leveldb.Destroy(path); err != nil {
+ panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+ }
+}
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
new file mode 100644
index 0000000..cd41835
--- /dev/null
+++ b/services/syncbase/server/watchable/stream.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// stream streams keys and values for versioned records.
+type stream struct {
+ iit store.Stream
+ st store.StoreReader
+ mu sync.Mutex
+ err error
+ hasValue bool
+ key []byte
+ value []byte
+}
+
+var _ store.Stream = (*stream)(nil)
+
+// newStreamVersioned creates a new stream. It assumes all records in range
+// [start, limit) are managed, i.e. versioned.
+func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
+ checkTransactionOrSnapshot(st)
+ return &stream{
+ iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
+ st: st,
+ }
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.hasValue = false
+ if s.err != nil {
+ return false
+ }
+ if advanced := s.iit.Advance(); !advanced {
+ return false
+ }
+ versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
+ s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
+ s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+ if s.err != nil {
+ return false
+ }
+ s.hasValue = true
+ return true
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(keybuf, s.key)
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(valbuf, s.value)
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return store.WrapError(s.err)
+ }
+ return s.iit.Err()
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return
+ }
+ s.iit.Cancel()
+}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+type transaction struct {
+ itx store.Transaction
+ st *wstore
+ mu sync.Mutex // protects the fields below
+ err error
+ ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+ return &transaction{
+ itx: st.ist.NewTransaction(),
+ st: st,
+ }
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return valbuf, store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ valbuf, err = tx.itx.Get(key, valbuf)
+ } else {
+ valbuf, err = getVersioned(tx.itx, key, valbuf)
+ tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+ }
+ return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return &store.InvalidStream{tx.err}
+ }
+ var it store.Stream
+ if !tx.st.managesRange(start, limit) {
+ it = tx.itx.Scan(start, limit)
+ } else {
+ it = newStreamVersioned(tx.itx, start, limit)
+ tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+ }
+ return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Put(key, value)
+ } else {
+ err = putVersioned(tx.itx, key, value)
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ }
+ return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Delete(key)
+ } else {
+ err = deleteVersioned(tx.itx, key)
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ }
+ return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ // Check sequence numbers.
+ if uint64(len(tx.ops)) > MaxUint16 {
+ return verror.New(verror.ErrInternal, nil, "too many ops")
+ }
+ if tx.st.seq == MaxUint64 {
+ return verror.New(verror.ErrInternal, nil, "seq maxed out")
+ }
+ // Write LogEntry records.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+ for txSeq, op := range tx.ops {
+ key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+ value := &LogEntry{
+ Op: op,
+ // TODO(sadovsky): This information is also captured in LogEntry keys.
+ // Optimize to avoid redundancy.
+ Continued: txSeq < len(tx.ops)-1,
+ }
+ if err := util.PutObject(tx.itx, key, value); err != nil {
+ return err
+ }
+ }
+ if err := tx.itx.Commit(); err != nil {
+ return err
+ }
+ tx.st.seq++
+ return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+ return tx.itx.Abort()
+}
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
new file mode 100644
index 0000000..7a8f629
--- /dev/null
+++ b/services/syncbase/server/watchable/types.vdl
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// GetOp represents a store get operation.
+type GetOp struct {
+ Key []byte
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+ Start []byte
+ Limit []byte
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+ Key []byte
+ Value []byte
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+ Key []byte
+}
+
+// Op represents a store operation.
+type Op union {
+ Get GetOp
+ Scan ScanOp
+ Put PutOp
+ Delete DeleteOp
+}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+ // The store operation that was performed.
+ Op Op
+
+ // If true, this entry is followed by more entries that belong to the same
+ // commit as this entry.
+ Continued bool
+}
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
new file mode 100644
index 0000000..12b8e0b
--- /dev/null
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -0,0 +1,136 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package watchable
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// GetOp represents a store get operation.
+type GetOp struct {
+ Key []byte
+}
+
+func (GetOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.GetOp"`
+}) {
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+ Start []byte
+ Limit []byte
+}
+
+func (ScanOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.ScanOp"`
+}) {
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+ Key []byte
+ Value []byte
+}
+
+func (PutOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.PutOp"`
+}) {
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+ Key []byte
+}
+
+func (DeleteOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.DeleteOp"`
+}) {
+}
+
+type (
+ // Op represents any single field of the Op union type.
+ //
+ // Op represents a store operation.
+ Op interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the Op union type.
+ __VDLReflect(__OpReflect)
+ }
+ // OpGet represents field Get of the Op union type.
+ OpGet struct{ Value GetOp }
+ // OpScan represents field Scan of the Op union type.
+ OpScan struct{ Value ScanOp }
+ // OpPut represents field Put of the Op union type.
+ OpPut struct{ Value PutOp }
+ // OpDelete represents field Delete of the Op union type.
+ OpDelete struct{ Value DeleteOp }
+ // __OpReflect describes the Op union type.
+ __OpReflect struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
+ Type Op
+ Union struct {
+ Get OpGet
+ Scan OpScan
+ Put OpPut
+ Delete OpDelete
+ }
+ }
+)
+
+func (x OpGet) Index() int { return 0 }
+func (x OpGet) Interface() interface{} { return x.Value }
+func (x OpGet) Name() string { return "Get" }
+func (x OpGet) __VDLReflect(__OpReflect) {}
+
+func (x OpScan) Index() int { return 1 }
+func (x OpScan) Interface() interface{} { return x.Value }
+func (x OpScan) Name() string { return "Scan" }
+func (x OpScan) __VDLReflect(__OpReflect) {}
+
+func (x OpPut) Index() int { return 2 }
+func (x OpPut) Interface() interface{} { return x.Value }
+func (x OpPut) Name() string { return "Put" }
+func (x OpPut) __VDLReflect(__OpReflect) {}
+
+func (x OpDelete) Index() int { return 3 }
+func (x OpDelete) Interface() interface{} { return x.Value }
+func (x OpDelete) Name() string { return "Delete" }
+func (x OpDelete) __VDLReflect(__OpReflect) {}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+ // The store operation that was performed.
+ Op Op
+ // If true, this entry is followed by more entries that belong to the same
+ // commit as this entry.
+ Continued bool
+}
+
+func (LogEntry) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.LogEntry"`
+}) {
+}
+
+func init() {
+ vdl.Register((*GetOp)(nil))
+ vdl.Register((*ScanOp)(nil))
+ vdl.Register((*PutOp)(nil))
+ vdl.Register((*DeleteOp)(nil))
+ vdl.Register((*Op)(nil))
+ vdl.Register((*LogEntry)(nil))
+}
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
new file mode 100644
index 0000000..cd37d3e
--- /dev/null
+++ b/services/syncbase/server/watchable/util.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
+// We should probably convert incoming strings to []byte's as early as possible,
+// and deal exclusively in []byte's internally.
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func makeVersionKey(key []byte) []byte {
+ return []byte(join(util.VersionPrefix, string(key)))
+}
+
+func makeAtVersionKey(key, version []byte) []byte {
+ return []byte(join(string(key), string(version)))
+}
+
+func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
+ return st.Get(makeVersionKey(key), nil)
+}
+
+func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+ return st.Get(makeAtVersionKey(key, version), valbuf)
+}
+
+func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
+ checkTransactionOrSnapshot(st)
+ version, err := getVersion(st, key)
+ if err != nil {
+ return valbuf, err
+ }
+ return getAtVersion(st, key, valbuf, version)
+}
+
+func putVersioned(tx store.Transaction, key, value []byte) error {
+ version := []byte(fmt.Sprintf("%x", rng.Int63()))
+ if err := tx.Put(makeVersionKey(key), version); err != nil {
+ return err
+ }
+ return tx.Put(makeAtVersionKey(key, version), value)
+}
+
+func deleteVersioned(tx store.Transaction, key []byte) error {
+ return tx.Delete(makeVersionKey(key))
+}
+
+func checkTransactionOrSnapshot(st store.StoreReader) {
+ _, isTransaction := st.(store.Transaction)
+ _, isSnapshot := st.(store.Snapshot)
+ if !isTransaction && !isSnapshot {
+ panic("neither a Transaction nor a Snapshot")
+ }
+}
+
+func join(parts ...string) string {
+ return util.JoinKeyParts(parts...)
+}
+
+func split(key string) []string {
+ return util.SplitKeyParts(key)
+}