syncbase: watchable store with versioning support
Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 2151b1a..463c498 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -89,7 +89,8 @@
}
func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- // In the future this API should be replaced by one that streams the database names.
+ // In the future this API will likely be replaced by one that streams the
+ // database names.
a.mu.Lock()
defer a.mu.Unlock()
dbNames := make([]string, 0, len(a.dbs))
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index c69af62..5a44625 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -8,9 +8,9 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
- "v.io/syncbase/x/ref/services/syncbase/store/watchable"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
@@ -39,7 +39,9 @@
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
// TODO(sadovsky): Make storage engine pluggable.
- st, err := watchable.Wrap(memstore.New())
+ st, err := watchable.Wrap(memstore.New(), &watchable.Options{
+ ManagedPrefixes: []string{},
+ })
if err != nil {
return nil, err
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index ce2c63e..c2a283d 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -115,7 +115,8 @@
}
func (s *service) AppNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- // In the future this API should be replaced by one that streams the app names.
+ // In the future this API will likely be replaced by one that streams the app
+ // names.
s.mu.Lock()
defer s.mu.Unlock()
appNames := make([]string, 0, len(s.apps))
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 815ef82..88e1115 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -5,17 +5,22 @@
package util
// TODO(sadovsky): Consider using shorter strings.
+
+// Constants related to storage engine keys.
const (
- ServicePrefix = "$service"
AppPrefix = "$app"
- DbInfoPrefix = "$dbInfo"
DatabasePrefix = "$database"
- TablePrefix = "$table"
+ DbInfoPrefix = "$dbInfo"
+ LogPrefix = "$log"
RowPrefix = "$row"
+ ServicePrefix = "$service"
SyncPrefix = "$sync"
+ TablePrefix = "$table"
+ VersionPrefix = "$version"
)
+// Constants related to object names.
const (
- // Service object name suffix for Syncbase internal communication.
+ // Service object name suffix for Syncbase-to-Syncbase RPCs.
SyncbaseSuffix = "$internal"
)
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 0feb974..87a204e 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -54,9 +54,10 @@
}
// Takes ownership of sn.
-// TODO(sadovsky): It sucks that Glob must be implemented differently from other
-// streaming RPC handlers. I don't have much confidence that I've implemented
-// both types of streaming correctly.
+// TODO(sadovsky): Why do we make developers implement Glob differently from
+// other streaming RPCs? It's confusing that Glob must return immediately and
+// write its results to a channel, while other streaming RPC handlers must block
+// and write their results to the output stream. See nlacasse's TODO below, too.
func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
new file mode 100644
index 0000000..ac3694f
--- /dev/null
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type snapshot struct {
+ isn store.Snapshot
+ st *wstore
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(st *wstore) *snapshot {
+ return &snapshot{
+ isn: st.ist.NewSnapshot(),
+ st: st,
+ }
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+ return s.isn.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ if !s.st.managesKey(key) {
+ return s.isn.Get(key, valbuf)
+ }
+ return getVersioned(s.isn, key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
+ if !s.st.managesRange(start, limit) {
+ return s.isn.Scan(start, limit)
+ }
+ return newStreamVersioned(s.isn, start, limit)
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
new file mode 100644
index 0000000..ed8e15f
--- /dev/null
+++ b/services/syncbase/server/watchable/store.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a Syncbase-specific store.Store wrapper that
+// provides versioned storage for specified prefixes and maintains a watchable
+// log of operations performed on versioned records. This log forms the basis
+// for the implementation of client-facing watch as well as the sync module's
+// internal watching of store updates.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%016x", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+//
+// Version number records are stored with keys of the form "$version:<key>",
+// where <key> is the client-specified key.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+
+ pubutil "v.io/syncbase/v23/syncbase/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+const (
+ MaxUint16 uint64 = 1<<16 - 1 // 0xffff
+ MaxUint64 uint64 = 1<<64 - 1 // 0xffffffffffffffff
+)
+
+// Store is a store.Store that provides versioned storage and a watchable oplog.
+// TODO(sadovsky): Extend interface.
+type Store interface {
+ store.Store
+}
+
+// Options configures a watchable.Store.
+type Options struct {
+ // Key prefixes to version and log. If nil, all keys are managed.
+ ManagedPrefixes []string
+}
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store, opts *Options) (Store, error) {
+ // Determine initial value for seq.
+ var seq uint64 = 0
+ it := st.Scan([]byte(util.LogPrefix), []byte(""))
+ for it.Advance() {
+ key := string(it.Key(nil))
+ parts := split(key)
+ if len(parts) != 3 {
+ panic("wrong number of parts: " + key)
+ }
+ invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+ if err != nil {
+ panic("failed to parse invSeq: " + key)
+ }
+ seq = MaxUint64 - invSeq
+ it.Cancel()
+ }
+ if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+ return nil, err
+ }
+ return &wstore{ist: st, opts: opts, seq: seq}, nil
+}
+
+type wstore struct {
+ ist store.Store
+ opts *Options
+ mu sync.Mutex // held during transaction commits; protects seq
+ seq uint64 // sequence number, for commits
+}
+
+var _ Store = (*wstore)(nil)
+
+// TODO(sadovsky): Decide whether to copy []byte's vs. requiring clients not to
+// modify passed-in []byte's. (In fact, this should be spelled out in the
+// store.Store API contract.)
+
+// Close implements the store.Store interface.
+func (st *wstore) Close() error {
+ return st.ist.Close()
+}
+
+// Get implements the store.StoreReader interface.
+func (st *wstore) Get(key, valbuf []byte) ([]byte, error) {
+ if !st.managesKey(key) {
+ return st.ist.Get(key, valbuf)
+ }
+ sn := newSnapshot(st)
+ defer sn.Close()
+ return sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *wstore) Scan(start, limit []byte) store.Stream {
+ if !st.managesRange(start, limit) {
+ return st.ist.Scan(start, limit)
+ }
+ // TODO(sadovsky): Close snapshot once stream is finished or canceled.
+ return newSnapshot(st).Scan(start, limit)
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *wstore) Put(key, value []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Put(key, value)
+ })
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *wstore) Delete(key []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Delete(key)
+ })
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *wstore) NewTransaction() store.Transaction {
+ return newTransaction(st)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *wstore) NewSnapshot() store.Snapshot {
+ return newSnapshot(st)
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (st *wstore) managesKey(key []byte) bool {
+ if st.opts.ManagedPrefixes == nil {
+ return true
+ }
+ ikey := string(key)
+ // TODO(sadovsky): Optimize, e.g. use binary search (here and below).
+ for _, p := range st.opts.ManagedPrefixes {
+ if strings.HasPrefix(ikey, p) {
+ return true
+ }
+ }
+ return false
+}
+
+func (st *wstore) managesRange(start, limit []byte) bool {
+ if st.opts.ManagedPrefixes == nil {
+ return true
+ }
+ istart, ilimit := string(start), string(limit)
+ for _, p := range st.opts.ManagedPrefixes {
+ pstart, plimit := pubutil.PrefixRangeStart(p), pubutil.PrefixRangeLimit(p)
+ if pstart <= istart && ilimit <= plimit {
+ return true
+ }
+ if !(plimit <= istart || ilimit <= pstart) {
+ // If this happens, there's a bug in the Syncbase server implementation.
+ panic(fmt.Sprintf("partial overlap: %q %q %q", p, start, limit))
+ }
+ }
+ return false
+}
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
new file mode 100644
index 0000000..18b8e4d
--- /dev/null
+++ b/services/syncbase/server/watchable/store_test.go
@@ -0,0 +1,108 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "io/ioutil"
+ "runtime"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+ "v.io/syncbase/x/ref/services/syncbase/store/memstore"
+ "v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+ runtime.GOMAXPROCS(10)
+}
+
+func TestStream(t *testing.T) {
+ runTest(t, []string{}, test.RunStreamTest)
+ runTest(t, nil, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, []string{}, test.RunSnapshotTest)
+ runTest(t, nil, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ runTest(t, []string{}, test.RunStoreStateTest)
+ runTest(t, nil, test.RunStoreStateTest)
+}
+
+func TestClose(t *testing.T) {
+ runTest(t, []string{}, test.RunCloseTest)
+ runTest(t, nil, test.RunCloseTest)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+ runTest(t, []string{}, test.RunReadWriteBasicTest)
+ runTest(t, nil, test.RunReadWriteBasicTest)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+ runTest(t, []string{}, test.RunReadWriteRandomTest)
+ runTest(t, nil, test.RunReadWriteRandomTest)
+}
+
+func TestConcurrentTransactions(t *testing.T) {
+ runTest(t, []string{}, test.RunConcurrentTransactionsTest)
+ runTest(t, nil, test.RunConcurrentTransactionsTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, []string{}, test.RunTransactionStateTest)
+ runTest(t, nil, test.RunTransactionStateTest)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+ runTest(t, []string{}, test.RunTransactionsWithGetTest)
+ runTest(t, nil, test.RunTransactionsWithGetTest)
+}
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstore = false
+
+func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
+ var st store.Store
+ if useMemstore {
+ st = memstore.New()
+ } else {
+ var dbPath string
+ st, dbPath = newLevelDB()
+ defer destroyLevelDB(st, dbPath)
+ }
+ st, err := Wrap(st, &Options{ManagedPrefixes: mp})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer st.Close()
+ f(t, st)
+}
+
+func newLevelDB() (store.Store, string) {
+ path, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ panic(fmt.Sprintf("can't create temp dir: %v", err))
+ }
+ st, err := leveldb.Open(path)
+ if err != nil {
+ panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+ }
+ return st, path
+}
+
+func destroyLevelDB(st store.Store, path string) {
+ st.Close()
+ if err := leveldb.Destroy(path); err != nil {
+ panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+ }
+}
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
new file mode 100644
index 0000000..cd41835
--- /dev/null
+++ b/services/syncbase/server/watchable/stream.go
@@ -0,0 +1,95 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// stream streams keys and values for versioned records.
+type stream struct {
+ iit store.Stream
+ st store.StoreReader
+ mu sync.Mutex
+ err error
+ hasValue bool
+ key []byte
+ value []byte
+}
+
+var _ store.Stream = (*stream)(nil)
+
+// newStreamVersioned creates a new stream. It assumes all records in range
+// [start, limit) are managed, i.e. versioned.
+func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
+ checkTransactionOrSnapshot(st)
+ return &stream{
+ iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
+ st: st,
+ }
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.hasValue = false
+ if s.err != nil {
+ return false
+ }
+ if advanced := s.iit.Advance(); !advanced {
+ return false
+ }
+ versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
+ s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
+ s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+ if s.err != nil {
+ return false
+ }
+ s.hasValue = true
+ return true
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(keybuf, s.key)
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(valbuf, s.value)
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return store.WrapError(s.err)
+ }
+ return s.iit.Err()
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return
+ }
+ s.iit.Cancel()
+}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+type transaction struct {
+ itx store.Transaction
+ st *wstore
+ mu sync.Mutex // protects the fields below
+ err error
+ ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+ return &transaction{
+ itx: st.ist.NewTransaction(),
+ st: st,
+ }
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return valbuf, store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ valbuf, err = tx.itx.Get(key, valbuf)
+ } else {
+ valbuf, err = getVersioned(tx.itx, key, valbuf)
+ tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+ }
+ return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return &store.InvalidStream{tx.err}
+ }
+ var it store.Stream
+ if !tx.st.managesRange(start, limit) {
+ it = tx.itx.Scan(start, limit)
+ } else {
+ it = newStreamVersioned(tx.itx, start, limit)
+ tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+ }
+ return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Put(key, value)
+ } else {
+ err = putVersioned(tx.itx, key, value)
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ }
+ return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Delete(key)
+ } else {
+ err = deleteVersioned(tx.itx, key)
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ }
+ return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ // Check sequence numbers.
+ if uint64(len(tx.ops)) > MaxUint16 {
+ return verror.New(verror.ErrInternal, nil, "too many ops")
+ }
+ if tx.st.seq == MaxUint64 {
+ return verror.New(verror.ErrInternal, nil, "seq maxed out")
+ }
+ // Write LogEntry records.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+ for txSeq, op := range tx.ops {
+ key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+ value := &LogEntry{
+ Op: op,
+ // TODO(sadovsky): This information is also captured in LogEntry keys.
+ // Optimize to avoid redundancy.
+ Continued: txSeq < len(tx.ops)-1,
+ }
+ if err := util.PutObject(tx.itx, key, value); err != nil {
+ return err
+ }
+ }
+ if err := tx.itx.Commit(); err != nil {
+ return err
+ }
+ tx.st.seq++
+ return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+ return tx.itx.Abort()
+}
diff --git a/services/syncbase/store/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
similarity index 100%
rename from services/syncbase/store/watchable/types.vdl
rename to services/syncbase/server/watchable/types.vdl
diff --git a/services/syncbase/store/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
similarity index 86%
rename from services/syncbase/store/watchable/types.vdl.go
rename to services/syncbase/server/watchable/types.vdl.go
index 3b6da63..12b8e0b 100644
--- a/services/syncbase/store/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -18,7 +18,7 @@
}
func (GetOp) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.GetOp"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.GetOp"`
}) {
}
@@ -29,7 +29,7 @@
}
func (ScanOp) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.ScanOp"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.ScanOp"`
}) {
}
@@ -40,7 +40,7 @@
}
func (PutOp) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.PutOp"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.PutOp"`
}) {
}
@@ -50,7 +50,7 @@
}
func (DeleteOp) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.DeleteOp"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.DeleteOp"`
}) {
}
@@ -78,7 +78,7 @@
OpDelete struct{ Value DeleteOp }
// __OpReflect describes the Op union type.
__OpReflect struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.Op"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
Type Op
Union struct {
Get OpGet
@@ -122,7 +122,7 @@
}
func (LogEntry) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.LogEntry"`
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.LogEntry"`
}) {
}
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
new file mode 100644
index 0000000..cd37d3e
--- /dev/null
+++ b/services/syncbase/server/watchable/util.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
+// We should probably convert incoming strings to []byte's as early as possible,
+// and deal exclusively in []byte's internally.
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func makeVersionKey(key []byte) []byte {
+ return []byte(join(util.VersionPrefix, string(key)))
+}
+
+func makeAtVersionKey(key, version []byte) []byte {
+ return []byte(join(string(key), string(version)))
+}
+
+func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
+ return st.Get(makeVersionKey(key), nil)
+}
+
+func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+ return st.Get(makeAtVersionKey(key, version), valbuf)
+}
+
+func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
+ checkTransactionOrSnapshot(st)
+ version, err := getVersion(st, key)
+ if err != nil {
+ return valbuf, err
+ }
+ return getAtVersion(st, key, valbuf, version)
+}
+
+func putVersioned(tx store.Transaction, key, value []byte) error {
+ version := []byte(fmt.Sprintf("%x", rng.Int63()))
+ if err := tx.Put(makeVersionKey(key), version); err != nil {
+ return err
+ }
+ return tx.Put(makeAtVersionKey(key, version), value)
+}
+
+func deleteVersioned(tx store.Transaction, key []byte) error {
+ return tx.Delete(makeVersionKey(key))
+}
+
+func checkTransactionOrSnapshot(st store.StoreReader) {
+ _, isTransaction := st.(store.Transaction)
+ _, isSnapshot := st.(store.Snapshot)
+ if !isTransaction && !isSnapshot {
+ panic("neither a Transaction nor a Snapshot")
+ }
+}
+
+func join(parts ...string) string {
+ return util.JoinKeyParts(parts...)
+}
+
+func split(key string) []string {
+ return util.SplitKeyParts(key)
+}
diff --git a/services/syncbase/store/constants.go b/services/syncbase/store/constants.go
new file mode 100644
index 0000000..f97f2fd
--- /dev/null
+++ b/services/syncbase/store/constants.go
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// TODO(sadovsky): Maybe define verrors for these.
+const (
+ ErrMsgClosedStore = "closed store"
+ ErrMsgClosedSnapshot = "closed snapshot"
+ ErrMsgCanceledStream = "canceled stream"
+ ErrMsgCommittedTxn = "already called commit"
+ ErrMsgAbortedTxn = "already called abort"
+ ErrMsgExpiredTxn = "expired transaction"
+)
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index e163833..62250c6 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -90,7 +90,7 @@
d.readOptions = nil
C.leveldb_writeoptions_destroy(d.writeOptions)
d.writeOptions = nil
- d.err = verror.New(verror.ErrCanceled, nil, "closed store")
+ d.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
return nil
}
@@ -161,7 +161,7 @@
return newSnapshot(d, d.node)
}
-// write writes a batch and adds all written keys to txTable.
+// write writes a batch and adds all written keys to txTable.
// TODO(rogulenko): remove this method.
func (d *db) write(batch []writeOp, cOpts *C.leveldb_writeoptions_t) error {
d.txmu.Lock()
@@ -203,7 +203,7 @@
return nil
}
-// trackBatch writes the batch to txTable, adds a commit event to txEvents.
+// trackBatch writes the batch to txTable and adds a commit event to txEvents.
func (d *db) trackBatch(batch []writeOp) {
// TODO(rogulenko): do GC.
d.txSequenceNumber++
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 9a4ad6e..4bf59b6 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -56,7 +56,7 @@
s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
s.cSnapshot = nil
- s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
return nil
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index abf230e..2840e5d 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -124,14 +124,14 @@
if s.err != nil {
return
}
- // s.hasValue might be false if Advance was never called.
+ // s.hasValue will be false if Advance has never been called.
if s.hasValue {
// We copy the key and the value from the C heap to the Go heap before
// deallocating the C iterator.
s.key = store.CopyBytes(nil, s.cKey())
s.value = store.CopyBytes(nil, s.cVal())
}
- s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
s.destroyLeveldbIter()
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index e7a1b8a..308b4da 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -181,9 +181,10 @@
if tx.err != nil {
return store.WrapError(tx.err)
}
+ tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
// Explicitly remove this transaction from the event queue. If this was the
// only active transaction, the event queue becomes empty and writeLocked will
- // not add the write set of this transaction to the txTable.
+ // not add this transaction's write set to txTable.
tx.removeEvent()
defer tx.close()
tx.d.txmu.Lock()
@@ -191,14 +192,7 @@
if !tx.validateReadSet() {
return store.NewErrConcurrentTransaction(nil)
}
- if err := tx.d.writeLocked(tx.writes, tx.cOpts); err != nil {
- // Once Commit() has failed, subsequent ops on the transaction will fail
- // with the following error.
- tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction")
- return err
- }
- tx.err = verror.New(verror.ErrBadState, nil, "committed transaction")
- return nil
+ return tx.d.writeLocked(tx.writes, tx.cOpts)
}
// Abort implements the store.Transaction interface.
@@ -208,7 +202,7 @@
if tx.err != nil {
return store.WrapError(tx.err)
}
- tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
+ tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
tx.close()
return nil
}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index cd39f77..53bd6ef 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -22,7 +22,7 @@
// Assumes st lock is held.
func newSnapshot(st *memstore, parent *store.ResourceNode) *snapshot {
- dataCopy := map[string][]byte{}
+ dataCopy := make(map[string][]byte, len(st.data))
for k, v := range st.data {
dataCopy[k] = v
}
@@ -44,7 +44,7 @@
return store.WrapError(s.err)
}
s.node.Close()
- s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
return nil
}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index a3e98b1..d24d5d1 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -42,7 +42,7 @@
return store.WrapError(st.err)
}
st.node.Close()
- st.err = verror.New(verror.ErrCanceled, nil, "closed store")
+ st.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
return nil
}
@@ -67,6 +67,7 @@
if st.err != nil {
return &store.InvalidStream{st.err}
}
+ // TODO(sadovsky): Close snapshot once stream is closed or canceled.
return newSnapshot(st, st.node).Scan(start, limit)
}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index af4c693..f7e5945 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -96,5 +96,5 @@
return
}
s.node.Close()
- s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 81b51d0..f7e99c4 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -35,11 +35,10 @@
func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
node := store.NewResourceNode()
- sn := newSnapshot(st, node)
tx := &transaction{
node: node,
st: st,
- sn: sn,
+ sn: newSnapshot(st, node),
seq: seq,
createdTime: time.Now(),
puts: map[string][]byte{},
@@ -60,7 +59,7 @@
return store.WrapError(tx.err)
}
if tx.expired() {
- return verror.New(verror.ErrBadState, nil, "expired transaction")
+ return verror.New(verror.ErrBadState, nil, store.ErrMsgExpiredTxn)
}
return nil
}
@@ -116,16 +115,13 @@
if err := tx.error(); err != nil {
return err
}
+ tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
tx.node.Close()
tx.st.mu.Lock()
- defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+ defer tx.st.mu.Unlock()
if tx.seq <= tx.st.lastCommitSeq {
- // Once Commit() has failed with store.ErrConcurrentTransaction, subsequent
- // ops on the transaction will fail with the following error.
- tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction")
return store.NewErrConcurrentTransaction(nil)
}
- tx.err = verror.New(verror.ErrBadState, nil, "committed transaction")
for k, v := range tx.puts {
tx.st.data[k] = v
}
@@ -144,6 +140,6 @@
return err
}
tx.node.Close()
- tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
+ tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
return nil
}
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
index 0c65de7..4dbee5f 100644
--- a/services/syncbase/store/test/snapshot.go
+++ b/services/syncbase/store/test/snapshot.go
@@ -30,13 +30,13 @@
if err := snapshot.Close(); err != nil {
t.Fatalf("can't close the snapshot: %v", err)
}
- expectedErr := "closed snapshot"
- verifyError(t, snapshot.Close(), expectedErr, verror.ErrCanceled.ID)
+ expectedErrMsg := store.ErrMsgClosedSnapshot
+ verifyError(t, snapshot.Close(), verror.ErrCanceled.ID, expectedErrMsg)
_, err := snapshot.Get(key1, nil)
- verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
s = snapshot.Scan([]byte("a"), []byte("z"))
verifyAdvance(t, s, nil, nil)
- verifyError(t, s.Err(), expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, s.Err(), verror.ErrCanceled.ID, expectedErrMsg)
}
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
index 784c545..2d6d9a1 100644
--- a/services/syncbase/store/test/store.go
+++ b/services/syncbase/store/test/store.go
@@ -114,14 +114,13 @@
if step.key < 0 || step.key >= s.size {
t.Fatalf("invalid test step %v", step)
}
+ key := fmt.Sprintf("%05d", step.key)
switch step.op {
case Put:
- key := fmt.Sprintf("%05d", step.key)
value := randomBytes(s.rnd, 100)
s.memtable[key] = value
st.Put([]byte(key), value)
case Delete:
- key := fmt.Sprintf("%05d", step.key)
if _, ok := s.memtable[key]; ok {
delete(s.memtable, key)
st.Delete([]byte(key))
@@ -158,12 +157,12 @@
// writes and snapshots.
func RunReadWriteRandomTest(t *testing.T, st store.Store) {
rnd := rand.New(rand.NewSource(239017))
- var testcase []testStep
+ var steps []testStep
size := 50
for i := 0; i < 10000; i++ {
- testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
+ steps = append(steps, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
}
- runReadWriteTest(t, st, size, testcase)
+ runReadWriteTest(t, st, size, steps)
}
// RunStoreStateTest verifies operations that modify the state of a store.Store.
@@ -183,25 +182,25 @@
if err := st.Close(); err != nil {
t.Fatalf("can't close the store: %v", err)
}
- expectedErr := "closed store"
- verifyError(t, st.Close(), expectedErr, verror.ErrCanceled.ID)
+ expectedErrMsg := store.ErrMsgClosedStore
+ verifyError(t, st.Close(), verror.ErrCanceled.ID, expectedErrMsg)
s = st.Scan([]byte("a"), []byte("z"))
verifyAdvance(t, s, nil, nil)
- verifyError(t, s.Err(), expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, s.Err(), verror.ErrCanceled.ID, expectedErrMsg)
snapshot := st.NewSnapshot()
_, err := snapshot.Get(key1, nil)
- verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
tx := st.NewTransaction()
_, err = tx.Get(key1, nil)
- verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
_, err = st.Get(key1, nil)
- verifyError(t, err, expectedErr, verror.ErrCanceled.ID)
- verifyError(t, st.Put(key1, value1), expectedErr, verror.ErrCanceled.ID)
- verifyError(t, st.Delete(key1), expectedErr, verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
+ verifyError(t, st.Put(key1, value1), verror.ErrCanceled.ID, expectedErrMsg)
+ verifyError(t, st.Delete(key1), verror.ErrCanceled.ID, expectedErrMsg)
}
// RunCloseTest verifies that child objects are closed when the parent object is
@@ -227,14 +226,14 @@
st.Close()
for _, stream := range streams {
- verifyError(t, stream.Err(), "canceled stream", verror.ErrCanceled.ID)
+ verifyError(t, stream.Err(), verror.ErrCanceled.ID, store.ErrMsgCanceledStream)
}
for _, snapshot := range snapshots {
_, err := snapshot.Get(key1, nil)
- verifyError(t, err, "closed snapshot", verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgClosedSnapshot)
}
for _, tx := range transactions {
_, err := tx.Get(key1, nil)
- verifyError(t, err, "aborted transaction", verror.ErrCanceled.ID)
+ verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgAbortedTxn)
}
}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 28e6e32..98becff 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -37,5 +37,5 @@
}
}
verifyAdvance(t, s, nil, nil)
- verifyError(t, s.Err(), "canceled stream", verror.ErrCanceled.ID)
+ verifyError(t, s.Err(), verror.ErrCanceled.ID, store.ErrMsgCanceledStream)
}
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
index 6aa5861..80581a1 100644
--- a/services/syncbase/store/test/transaction.go
+++ b/services/syncbase/store/test/transaction.go
@@ -16,24 +16,24 @@
"v.io/v23/verror"
)
-// RunTransactionTest verifies operations that modify the state of a
+// RunTransactionStateTest verifies operations that modify the state of a
// store.Transaction.
func RunTransactionStateTest(t *testing.T, st store.Store) {
- abortFunctions := []func(t *testing.T, tx store.Transaction) (string, verror.ID){
- func(t *testing.T, tx store.Transaction) (string, verror.ID) {
+ finalizeFns := []func(t *testing.T, tx store.Transaction) (verror.ID, string){
+ func(t *testing.T, tx store.Transaction) (verror.ID, string) {
if err := tx.Abort(); err != nil {
Fatalf(t, "can't abort the transaction: %v", err)
}
- return "aborted transaction", verror.ErrCanceled.ID
+ return verror.ErrCanceled.ID, store.ErrMsgAbortedTxn
},
- func(t *testing.T, tx store.Transaction) (string, verror.ID) {
+ func(t *testing.T, tx store.Transaction) (verror.ID, string) {
if err := tx.Commit(); err != nil {
Fatalf(t, "can't commit the transaction: %v", err)
}
- return "committed transaction", verror.ErrBadState.ID
+ return verror.ErrBadState.ID, store.ErrMsgCommittedTxn
},
}
- for _, fn := range abortFunctions {
+ for _, fn := range finalizeFns {
key1, value1 := []byte("key1"), []byte("value1")
st.Put(key1, value1)
key2 := []byte("key2")
@@ -46,19 +46,19 @@
verifyAdvance(t, s, key1, value1)
verifyAdvance(t, s, nil, nil)
- // Test functions after Abort.
- expectedErr, expectedID := fn(t, tx)
- verifyError(t, tx.Abort(), expectedErr, expectedID)
- verifyError(t, tx.Commit(), expectedErr, expectedID)
+ // Test functions after finalize.
+ expectedID, expectedErrMsg := fn(t, tx)
+ verifyError(t, tx.Abort(), expectedID, expectedErrMsg)
+ verifyError(t, tx.Commit(), expectedID, expectedErrMsg)
s = tx.Scan([]byte("a"), []byte("z"))
verifyAdvance(t, s, nil, nil)
- verifyError(t, s.Err(), expectedErr, expectedID)
+ verifyError(t, s.Err(), expectedID, expectedErrMsg)
_, err := tx.Get(key1, nil)
- verifyError(t, err, expectedErr, expectedID)
- verifyError(t, tx.Put(key1, value1), expectedErr, expectedID)
- verifyError(t, tx.Delete(key1), expectedErr, expectedID)
+ verifyError(t, err, expectedID, expectedErrMsg)
+ verifyError(t, tx.Put(key1, value1), expectedID, expectedErrMsg)
+ verifyError(t, tx.Delete(key1), expectedID, expectedErrMsg)
}
}
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
index 296a678..55b886a 100644
--- a/services/syncbase/store/test/util.go
+++ b/services/syncbase/store/test/util.go
@@ -28,7 +28,7 @@
}
} else {
valbuf, err = st.Get(key, valbuf)
- verifyError(t, err, string(key), store.ErrUnknownKey.ID)
+ verifyError(t, err, store.ErrUnknownKey.ID, string(key))
valcopy := []byte("tmp")
// Verify that valbuf is not modified if the key is not found.
if !bytes.Equal(valbuf, valcopy) {
@@ -61,12 +61,15 @@
}
}
-func verifyError(t *testing.T, err error, substr string, errorID verror.ID) {
+// verifyError verifies that the given error has the given errorID and that the
+// error string contains the given substr. Pass an empty substr to skip the
+// substr check.
+func verifyError(t *testing.T, err error, errorID verror.ID, substr string) {
if got := verror.ErrorID(err); got != errorID {
Fatalf(t, "unexpected error ID: got %v, want %v", got, errorID)
}
if !strings.Contains(err.Error(), substr) {
- Fatalf(t, "unexpected error: got %v, want %v", err, substr)
+ Fatalf(t, "unexpected error: %q not found in %q", substr, err)
}
}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index 3d0867a..eee882a 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -46,10 +46,10 @@
return dst
}
-// WrapError wraps the given err with a verror. It is a no-op if the err is nil.
-// The returned error contains the current stack trace, but retains the input
-// error's IDAction pair. If the given error is not a verror, the IDAction pair
-// of the returned error will be (ErrUnknown.ID, NoRetry).
+// WrapError wraps the given error with a verror. It is a no-op if the given
+// error is nil. The returned error contains the current stack trace, but
+// retains the input error's IDAction pair. If the given error is not a verror,
+// the IDAction pair of the returned error will be (ErrUnknown.ID, NoRetry).
func WrapError(err error) error {
if err == nil {
return nil
diff --git a/services/syncbase/store/watchable/store.go b/services/syncbase/store/watchable/store.go
deleted file mode 100644
index e4fd0ee..0000000
--- a/services/syncbase/store/watchable/store.go
+++ /dev/null
@@ -1,197 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package watchable provides a store.Store that maintains a commit log. In
-// Syncbase, this log forms the basis for the implementation of client-facing
-// watch as well as the sync module's watching of store commits.
-//
-// Log entries are keyed in reverse chronological order. More specifically, the
-// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
-// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
-// the lexicographic order matches the numeric order. Thus, clients implementing
-// ResumeMarkers (i.e. implementing the watch API) should use
-// fmt.Sprintf("%020d", MaxUint64-marker) to convert external markers to
-// internal LogEntry key prefixes.
-package watchable
-
-// TODO(sadovsky): Write unit tests. (As of 2015-05-26 we're still iterating on
-// the design for how to expose a "watch" API from the storage engine, and we
-// don't want to write lots of tests prematurely.)
-// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
-// TODO(sadovsky): Allow clients to subscribe via Go channel.
-
-import (
- "fmt"
- "strconv"
- "strings"
- "sync"
-
- "v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
- "v.io/v23/vom"
-)
-
-const (
- LogPrefix = "$log"
- MaxUint16 uint64 = 1<<16 - 1 // 5 digits
- MaxUint64 uint64 = 1<<64 - 1 // 20 digits
-)
-
-// Store is a store.Store that maintains a commit log.
-type Store store.Store
-
-// Wrap returns a watchable.Store that wraps the given store.Store.
-func Wrap(st store.Store) (Store, error) {
- it := st.Scan([]byte(LogPrefix), []byte(""))
- var seq uint64 = 0
- for it.Advance() {
- key := string(it.Key(nil))
- parts := split(key)
- if len(parts) != 3 {
- panic("wrong number of parts: " + key)
- }
- invSeq, err := strconv.ParseUint(parts[1], 10, 64)
- if err != nil {
- panic("failed to parse invSeq: " + key)
- }
- seq = MaxUint64 - invSeq
- it.Cancel()
- }
- if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
- return nil, err
- }
- return &wstore{Store: st, seq: seq}, nil
-}
-
-type wstore struct {
- store.Store
- mu sync.Mutex // held during transaction commits; protects seq
- seq uint64 // sequence number, for commits
-}
-
-type transaction struct {
- store.Transaction
- st *wstore
- mu sync.Mutex // protects the fields below
- ops []Op
-}
-
-var (
- _ Store = (*wstore)(nil)
- _ store.Transaction = (*transaction)(nil)
-)
-
-// TODO(sadovsky): Decide whether to copy []bytes vs. requiring clients not to
-// modify passed-in []bytes.
-
-func (st *wstore) Put(key, value []byte) error {
- // Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(key, value)
- })
-}
-
-func (st *wstore) Delete(key []byte) error {
- // Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(key)
- })
-}
-
-func (st *wstore) NewTransaction() store.Transaction {
- return &transaction{Transaction: st.Store.NewTransaction(), st: st}
-}
-
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- valbuf, err := tx.Transaction.Get(key, valbuf)
- if err == nil || verror.ErrorID(err) == store.ErrUnknownKey.ID {
- tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
- }
- return valbuf, err
-}
-
-func (tx *transaction) Scan(start, limit []byte) store.Stream {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- it := tx.Transaction.Scan(start, limit)
- if it.Err() == nil {
- tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
- }
- return it
-}
-
-func (tx *transaction) Put(key, value []byte) error {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- err := tx.Transaction.Put(key, value)
- if err == nil {
- tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
- }
- return err
-}
-
-func (tx *transaction) Delete(key []byte) error {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- err := tx.Transaction.Delete(key)
- if err == nil {
- tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
- }
- return err
-}
-
-func (tx *transaction) Commit() error {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- // Check sequence numbers.
- if uint64(len(tx.ops)) > MaxUint16 {
- return verror.New(verror.ErrInternal, nil, "too many ops")
- }
- if tx.st.seq == MaxUint64 {
- return verror.New(verror.ErrInternal, nil, "seq maxed out")
- }
- // Write LogEntry records.
- // TODO(sadovsky): Use a more efficient lexicographic number encoding.
- keyPrefix := join(LogPrefix, fmt.Sprintf("%020d", MaxUint64-tx.st.seq))
- for txSeq, op := range tx.ops {
- key := join(keyPrefix, fmt.Sprintf("%05d", MaxUint16-uint64(txSeq)))
- value := &LogEntry{
- Op: op,
- // TODO(sadovsky): This information is also captured in LogEntry keys.
- // Optimize to avoid redundancy.
- Continued: txSeq < len(tx.ops)-1,
- }
- if err := put(tx.Transaction, key, value); err != nil {
- return err
- }
- }
- if err := tx.Transaction.Commit(); err != nil {
- return err
- }
- tx.st.seq++
- return nil
-}
-
-////////////////////////////////////////
-// Internal helpers
-
-func join(parts ...string) string {
- return strings.Join(parts, ":")
-}
-
-func split(key string) []string {
- return strings.Split(key, ":")
-}
-
-func put(st store.StoreWriter, k string, v interface{}) error {
- bytes, err := vom.Encode(v)
- if err != nil {
- return err
- }
- return st.Put([]byte(k), bytes)
-}
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index 35f935a..44865a3 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -22,7 +22,6 @@
_ "v.io/x/ref/runtime/factories/generic"
)
-// TODO(sadovsky): Perhaps this should be one of the standard Veyron flags.
var (
name = flag.String("name", "", "Name to mount at.")
)