syncbase/vsync: Addressing Adam's review comments from go/vcl/12873 and a few other cleanups.

Change-Id: I876d7685cc5e34c9575c0dfe4c41271826a97dc1
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3c4a054..963121e 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -208,7 +208,85 @@
 	wtx.fromSync = true
 }
 
-// Exported as a helper function for testing purposes
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency
+// control. At minimum, an object implementing the StoreReader interface is
+// required since this is a Get operation.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+	switch w := st.(type) {
+	case *transaction:
+		w.mu.Lock()
+		defer w.mu.Unlock()
+		if w.err != nil {
+			return nil, convertError(w.err)
+		}
+		return getVersion(w.itx, key)
+	case *wstore:
+		return getVersion(w.ist, key)
+	}
+	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire. At minimum, an object implementing the
+// StoreReader interface is required since this is a Get operation.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+	switch w := st.(type) {
+	case *transaction:
+		w.mu.Lock()
+		defer w.mu.Unlock()
+		if w.err != nil {
+			return valbuf, convertError(w.err)
+		}
+		return getAtVersion(w.itx, key, valbuf, version)
+	case *wstore:
+		return getAtVersion(w.ist, key, valbuf, version)
+	}
+	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
+}
+
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	// Note that we do not enqueue a PutOp in the log since this Put is not
+	// updating the current version of a key.
+	return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version. At minimum, an object implementing
+// the StoreReadWriter interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+		return err
+	}
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	return nil
+}
+
+// Exported as a helper function for testing purposes.
 func getLogEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.