syncbase/vsync: Addressing Adam's review comments from go/vcl/12873 and a few other cleanups.
Change-Id: I876d7685cc5e34c9575c0dfe4c41271826a97dc1
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3c4a054..963121e 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -208,7 +208,85 @@
wtx.fromSync = true
}
-// Exported as a helper function for testing purposes
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency
+// control. At minimum, an object implementing the StoreReader interface is
+// required since this is a Get operation.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+ switch w := st.(type) {
+ case *transaction:
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.err != nil {
+ return nil, convertError(w.err)
+ }
+ return getVersion(w.itx, key)
+ case *wstore:
+ return getVersion(w.ist, key)
+ }
+ return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire. At minimum, an object implementing the
+// StoreReader interface is required since this is a Get operation.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+ switch w := st.(type) {
+ case *transaction:
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.err != nil {
+ return valbuf, convertError(w.err)
+ }
+ return getAtVersion(w.itx, key, valbuf, version)
+ case *wstore:
+ return getAtVersion(w.ist, key, valbuf, version)
+ }
+ return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ // Note that we do not enqueue a PutOp in the log since this Put is not
+ // updating the current version of a key.
+ return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+ wtx := tx.(*transaction)
+
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+
+ if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+ return err
+ }
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ return nil
+}
+
+// Exported as a helper function for testing purposes.
func getLogEntryKey(seq uint64) string {
// Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.