syncbase/store: Get after Put/Delete in a transaction.

Make Get() in a transaction see the effects of a Put() or Delete()
in the same transaction, while the transaction is still in-flight.

Change-Id: I99a97b9870e4b855cf5f6dde7af62091653faee2
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index be6ac4f..70f1bb0 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,6 +7,7 @@
 // #include "leveldb/c.h"
 import "C"
 import (
+	"bytes"
 	"container/list"
 	"sync"
 
@@ -110,6 +111,22 @@
 		return valbuf, convertError(tx.err)
 	}
 	tx.reads.keys = append(tx.reads.keys, key)
+
+	// Reflect the state of the transaction: the "writes" (puts and
+	// deletes) override the values in the transaction snapshot.
+	// Find the last "writes" entry for this key, if one exists.
+	// Note: this step could be optimized by using maps (puts and
+	// deletes) instead of an array.
+	for i := len(tx.writes) - 1; i >= 0; i-- {
+		op := &tx.writes[i]
+		if bytes.Equal(op.key, key) {
+			if op.t == putOp {
+				return op.value, nil
+			}
+			return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
+		}
+	}
+
 	return tx.snapshot.Get(key, valbuf)
 }
 
@@ -120,6 +137,9 @@
 	if tx.err != nil {
 		return &store.InvalidStream{tx.err}
 	}
+	// TODO(rdaoud): create an in-memory copy of the current transaction
+	// state (the puts and deletes so far) for the scan stream's Advance()
+	// to merge that data while traversing the store snapshot.
 	tx.reads.ranges = append(tx.reads.ranges, scanRange{
 		start: start,
 		limit: limit,
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 888ce87..1a1b0f4 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -71,6 +71,17 @@
 	if err := tx.error(); err != nil {
 		return valbuf, err
 	}
+
+	// Reflect the state of the transaction: the "puts" and "deletes"
+	// override the values in the transaction snapshot.
+	keyStr := string(key)
+	if val, ok := tx.puts[keyStr]; ok {
+		return val, nil
+	}
+	if _, ok := tx.deletes[keyStr]; ok {
+		return valbuf, verror.New(store.ErrUnknownKey, nil, keyStr)
+	}
+
 	return tx.sn.Get(key, valbuf)
 }
 
@@ -81,6 +92,9 @@
 	if err := tx.error(); err != nil {
 		return &store.InvalidStream{err}
 	}
+	// TODO(rdaoud): create an in-memory copy of the current transaction
+	// state (the puts and deletes so far) for the scan stream's Advance()
+	// to merge that data while traversing the store snapshot.
 	return tx.sn.Scan(start, limit)
 }
 
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index e30a5f0..5155cd9 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -58,7 +58,7 @@
 
 // Transaction provides a mechanism for atomic reads and writes.
 //
-// Reads don't reflect writes performed within this transaction.
+// Reads do reflect writes and deletes performed within this transaction.
 // Once a transaction has been committed or aborted, subsequent method calls
 // will fail with no effect.
 type Transaction interface {
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
index 80581a1..7b697b4 100644
--- a/services/syncbase/store/test/transaction.go
+++ b/services/syncbase/store/test/transaction.go
@@ -46,6 +46,19 @@
 		verifyAdvance(t, s, key1, value1)
 		verifyAdvance(t, s, nil, nil)
 
+		// Test Put then Get inside the transaction.
+		key3, value3 := []byte("key3"), []byte("value3")
+		tx.Put(key3, value3)
+		verifyGet(t, tx, key3, value3)
+
+		// Test Delete of old key then Get inside the transaction.
+		tx.Delete(key1)
+		verifyGet(t, tx, key1, nil)
+
+		// Test Delete of new key then Get inside the transaction.
+		tx.Delete(key3)
+		verifyGet(t, tx, key3, nil)
+
 		// Test functions after finalize.
 		expectedID, expectedErrMsg := fn(t, tx)
 		verifyError(t, tx.Abort(), expectedID, expectedErrMsg)