syncbase: watchable store with versioning support
Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+type transaction struct {
+ itx store.Transaction
+ st *wstore
+ mu sync.Mutex // protects the fields below
+ err error
+ ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+ return &transaction{
+ itx: st.ist.NewTransaction(),
+ st: st,
+ }
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return valbuf, store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ valbuf, err = tx.itx.Get(key, valbuf)
+ } else {
+ valbuf, err = getVersioned(tx.itx, key, valbuf)
+ tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+ }
+ return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return &store.InvalidStream{tx.err}
+ }
+ var it store.Stream
+ if !tx.st.managesRange(start, limit) {
+ it = tx.itx.Scan(start, limit)
+ } else {
+ it = newStreamVersioned(tx.itx, start, limit)
+ tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+ }
+ return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Put(key, value)
+ } else {
+ err = putVersioned(tx.itx, key, value)
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ }
+ return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ var err error
+ if !tx.st.managesKey(key) {
+ err = tx.itx.Delete(key)
+ } else {
+ err = deleteVersioned(tx.itx, key)
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ }
+ return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ // Check sequence numbers.
+ if uint64(len(tx.ops)) > MaxUint16 {
+ return verror.New(verror.ErrInternal, nil, "too many ops")
+ }
+ if tx.st.seq == MaxUint64 {
+ return verror.New(verror.ErrInternal, nil, "seq maxed out")
+ }
+ // Write LogEntry records.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+ for txSeq, op := range tx.ops {
+ key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+ value := &LogEntry{
+ Op: op,
+ // TODO(sadovsky): This information is also captured in LogEntry keys.
+ // Optimize to avoid redundancy.
+ Continued: txSeq < len(tx.ops)-1,
+ }
+ if err := util.PutObject(tx.itx, key, value); err != nil {
+ return err
+ }
+ }
+ if err := tx.itx.Commit(); err != nil {
+ return err
+ }
+ tx.st.seq++
+ return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return store.WrapError(tx.err)
+ }
+ tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+ return tx.itx.Abort()
+}