syncbase/store: Get after Put/Delete in a transaction.
Make Get() in a transaction see the effects of a Put() or Delete()
in the same transaction, while the transaction is still in-flight.
Change-Id: I99a97b9870e4b855cf5f6dde7af62091653faee2
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index be6ac4f..70f1bb0 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,6 +7,7 @@
// #include "leveldb/c.h"
import "C"
import (
+ "bytes"
"container/list"
"sync"
@@ -110,6 +111,22 @@
return valbuf, convertError(tx.err)
}
tx.reads.keys = append(tx.reads.keys, key)
+
+ // Reflect the state of the transaction: the "writes" (puts and
+ // deletes) override the values in the transaction snapshot.
+ // Find the last "writes" entry for this key, if one exists.
+ // Note: this step could be optimized by using maps (puts and
+ // deletes) instead of an array.
+ for i := len(tx.writes) - 1; i >= 0; i-- {
+ op := &tx.writes[i]
+ if bytes.Equal(op.key, key) {
+ if op.t == putOp {
+ return op.value, nil
+ }
+ return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
+ }
+ }
+
return tx.snapshot.Get(key, valbuf)
}
@@ -120,6 +137,9 @@
if tx.err != nil {
return &store.InvalidStream{tx.err}
}
+ // TODO(rdaoud): create an in-memory copy of the current transaction
+ // state (the puts and deletes so far) for the scan stream's Advance()
+ // to merge that data while traversing the store snapshot.
tx.reads.ranges = append(tx.reads.ranges, scanRange{
start: start,
limit: limit,
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 888ce87..1a1b0f4 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -71,6 +71,17 @@
if err := tx.error(); err != nil {
return valbuf, err
}
+
+ // Reflect the state of the transaction: the "puts" and "deletes"
+ // override the values in the transaction snapshot.
+ keyStr := string(key)
+ if val, ok := tx.puts[keyStr]; ok {
+ return val, nil
+ }
+ if _, ok := tx.deletes[keyStr]; ok {
+ return valbuf, verror.New(store.ErrUnknownKey, nil, keyStr)
+ }
+
return tx.sn.Get(key, valbuf)
}
@@ -81,6 +92,9 @@
if err := tx.error(); err != nil {
return &store.InvalidStream{err}
}
+ // TODO(rdaoud): create an in-memory copy of the current transaction
+ // state (the puts and deletes so far) for the scan stream's Advance()
+ // to merge that data while traversing the store snapshot.
return tx.sn.Scan(start, limit)
}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index e30a5f0..5155cd9 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -58,7 +58,7 @@
// Transaction provides a mechanism for atomic reads and writes.
//
-// Reads don't reflect writes performed within this transaction.
+// Reads do reflect writes and deletes performed within this transaction.
// Once a transaction has been committed or aborted, subsequent method calls
// will fail with no effect.
type Transaction interface {
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
index 80581a1..7b697b4 100644
--- a/services/syncbase/store/test/transaction.go
+++ b/services/syncbase/store/test/transaction.go
@@ -46,6 +46,19 @@
verifyAdvance(t, s, key1, value1)
verifyAdvance(t, s, nil, nil)
+ // Test Put then Get inside the transaction.
+ key3, value3 := []byte("key3"), []byte("value3")
+ tx.Put(key3, value3)
+ verifyGet(t, tx, key3, value3)
+
+ // Test Delete of old key then Get inside the transaction.
+ tx.Delete(key1)
+ verifyGet(t, tx, key1, nil)
+
+ // Test Delete of new key then Get inside the transaction.
+ tx.Delete(key3)
+ verifyGet(t, tx, key3, nil)
+
// Test functions after finalize.
expectedID, expectedErrMsg := fn(t, tx)
verifyError(t, tx.Abort(), expectedID, expectedErrMsg)