syncbase: watchable store with versioning support

Change-Id: I5268638e63ffc31f2e128f9470594cdbee8191a1
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
new file mode 100644
index 0000000..17c899f
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+type transaction struct {
+	itx store.Transaction
+	st  *wstore
+	mu  sync.Mutex // protects the fields below
+	err error
+	ops []Op
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *wstore) *transaction {
+	return &transaction{
+		itx: st.ist.NewTransaction(),
+		st:  st,
+	}
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return valbuf, store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		valbuf, err = tx.itx.Get(key, valbuf)
+	} else {
+		valbuf, err = getVersioned(tx.itx, key, valbuf)
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+	}
+	return valbuf, err
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return &store.InvalidStream{tx.err}
+	}
+	var it store.Stream
+	if !tx.st.managesRange(start, limit) {
+		it = tx.itx.Scan(start, limit)
+	} else {
+		it = newStreamVersioned(tx.itx, start, limit)
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+	}
+	return it
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Put(key, value)
+	} else {
+		err = putVersioned(tx.itx, key, value)
+		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+	}
+	return err
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	var err error
+	if !tx.st.managesKey(key) {
+		err = tx.itx.Delete(key)
+	} else {
+		err = deleteVersioned(tx.itx, key)
+		tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	}
+	return err
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	// Check sequence numbers.
+	if uint64(len(tx.ops)) > MaxUint16 {
+		return verror.New(verror.ErrInternal, nil, "too many ops")
+	}
+	if tx.st.seq == MaxUint64 {
+		return verror.New(verror.ErrInternal, nil, "seq maxed out")
+	}
+	// Write LogEntry records.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", MaxUint64-tx.st.seq))
+	for txSeq, op := range tx.ops {
+		key := join(keyPrefix, fmt.Sprintf("%04x", MaxUint16-uint64(txSeq)))
+		value := &LogEntry{
+			Op: op,
+			// TODO(sadovsky): This information is also captured in LogEntry keys.
+			// Optimize to avoid redundancy.
+			Continued: txSeq < len(tx.ops)-1,
+		}
+		if err := util.PutObject(tx.itx, key, value); err != nil {
+			return err
+		}
+	}
+	if err := tx.itx.Commit(); err != nil {
+		return err
+	}
+	tx.st.seq++
+	return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return store.WrapError(tx.err)
+	}
+	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
+	return tx.itx.Abort()
+}