syncbase/store: Scan/Exec after Put/Delete in a transaction.

Make Scan() and Exec() in a transaction see the effects of a
Put() or Delete() in the same transaction, while the transaction
is still in-flight.

This change merges the snapshot.Scan() stream with the writes
array in the transaction (which contains put and delete
operations).

There was a choice to be made as to whether Scan/Exec should return
the state as of when it was called or whether later changes
in the transaction could find their way into the stream.  Since
we actively return the stream to the client (rather than waiting
on the client to call Advance), I felt it was less suprprising to
return the state when Scan/Exec was called.

Note: Since Exec can call Scan multiple times, I changed the
implementation of KeyValueStream to create all of the streams
from Scan at the start.

Change-Id: I70d19964e96dd45832122d6ce6c93c8ff35089d8
diff --git a/v23/syncbase/nosql/batch_test.go b/v23/syncbase/nosql/batch_test.go
index 85e1033..be137ac 100644
--- a/v23/syncbase/nosql/batch_test.go
+++ b/v23/syncbase/nosql/batch_test.go
@@ -73,15 +73,57 @@
 		t.Fatalf("Put() failed: %v", err)
 	}
 
-	// Check that foo is not yet visible.
+	// Check that foo is visible inside of this transaction.
+	tu.CheckScan(t, ctx, b1tb, nosql.Prefix(""), []string{"fooKey"}, []interface{}{"fooValue"})
+
+	// Check that foo is not yet visible outside of this transaction.
 	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
 
+	// Start a scan in b1, advance the scan one row, put a new value that would
+	// occur later in the scan (if it were visible) and then advance the scan to see
+	// that it doesn't show (since we snapshot uncommiteed changes at the start).
+	// Ditto for Exec.
+	// start the scan and exec
+	scanIt := b1tb.Scan(ctx, nosql.Prefix(""))
+	if !scanIt.Advance() {
+		t.Fatal("scanIt.Advance() returned false")
+	}
+	_, execIt, err := b1.Exec(ctx, "select k from tb")
+	if err != nil {
+		t.Fatalf("b1.Exec() failed: %v", err)
+	}
+	if !execIt.Advance() {
+		t.Fatal("execIt.Advance() returned false")
+	}
+	// put "zzzKey"
+	if err := b1tb.Put(ctx, "zzzKey", "zzzValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+	// make sure Scan's Advance doesn't return a "zzzKey"
+	for scanIt.Advance() {
+		if string(scanIt.Key()) == "zzzKey" {
+			t.Fatal("scanIt.Advance() found zzzKey")
+		}
+	}
+	if scanIt.Err() != nil {
+		t.Fatalf("scanIt.Advance() failed: %v", err)
+	}
+	// make sure Exec's Advance doesn't return a "zzzKey"
+	for execIt.Advance() {
+		if string(execIt.Result()[0].String()) == "zzzKey" {
+			t.Fatal("execIt.Advance() found zzzKey")
+		}
+	}
+	if execIt.Err() != nil {
+		t.Fatalf("execIt.Advance() failed: %v", err)
+	}
+
 	if err := b1.Commit(ctx); err != nil {
 		t.Fatalf("b1.Commit() failed: %v", err)
 	}
 
 	// Check that foo is now visible.
-	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"fooKey"}, []interface{}{"fooValue"})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"fooKey", "zzzKey"}, []interface{}{"fooValue", "zzzValue"})
 
 	// Test that concurrent transactions are isolated.
 	if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
@@ -114,8 +156,8 @@
 		t.Fatalf("b2.Commit() should have failed: %v", err)
 	}
 
-	// Check that foo, bar, and baz (but not rab) are now visible.
-	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"barKey", "bazKey", "fooKey"}, []interface{}{"barValue", "bazValue", "fooValue"})
+	// Check that foo, bar, baz and zzz (but not rab) are now visible.
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"barKey", "bazKey", "fooKey", "zzzKey"}, []interface{}{"barValue", "bazValue", "fooValue", "zzzValue"})
 }
 
 // Tests that BatchDatabase.Exec doesn't see changes committed outside the batch.
@@ -125,7 +167,7 @@
 // 4. confirm new row not seen when querying all rows in the table
 // 5. abort the batch and create a new readonly batch
 // 6. confirm new row NOW seen when querying all rows in the table
-func TestBatchExec(t *testing.T) {
+func TestBatchExecIsolation(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
 	defer cleanup()
 	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
@@ -202,6 +244,131 @@
 	tu.CheckExecError(t, ctx, roBatch, "select k, v from foo", syncql.ErrTableCantAccess.ID)
 }
 
+// Tests that BatchDatabase.Exec DOES see changes made inside the transaction but before
+// Exec is called.
+func TestBatchExec(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+
+	foo := Foo{I: 4, S: "f"}
+	if err := tb.Put(ctx, "foo", foo); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+
+	bar := Bar{F: 0.5, S: "b"}
+	// NOTE: not best practice, but store bar as
+	// optional (by passing the address of bar to Put).
+	// This tests auto-dereferencing.
+	if err := tb.Put(ctx, "bar", &bar); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+
+	baz := Baz{Name: "John Doe", Active: true}
+	if err := tb.Put(ctx, "baz", baz); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+
+	// Begin a readwrite batch.
+	rwBatch, err := d.BeginBatch(ctx, wire.BatchOptions{ReadOnly: false})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+
+	// fetch all rows
+	tu.CheckExec(t, ctx, rwBatch, "select k, v from tb",
+		[]string{"k", "v"},
+		[][]*vdl.Value{
+			[]*vdl.Value{vdl.ValueOf("bar"), vdl.ValueOf(bar)},
+			[]*vdl.Value{vdl.ValueOf("baz"), vdl.ValueOf(baz)},
+			[]*vdl.Value{vdl.ValueOf("foo"), vdl.ValueOf(foo)},
+		})
+
+	rwBatchTb := rwBatch.Table("tb")
+
+	// Add a row in this batch
+	newRow := Baz{Name: "Snow White", Active: true}
+	if err := rwBatchTb.Put(ctx, "newRow", newRow); err != nil {
+		t.Fatalf("rwBatchTb.Put() failed: %v", err)
+	}
+
+	// confirm fetching all rows DOES get the new row
+	tu.CheckExec(t, ctx, rwBatch, "select k, v from tb",
+		[]string{"k", "v"},
+		[][]*vdl.Value{
+			[]*vdl.Value{vdl.ValueOf("bar"), vdl.ValueOf(bar)},
+			[]*vdl.Value{vdl.ValueOf("baz"), vdl.ValueOf(baz)},
+			[]*vdl.Value{vdl.ValueOf("foo"), vdl.ValueOf(foo)},
+			[]*vdl.Value{vdl.ValueOf("newRow"), vdl.ValueOf(newRow)},
+		})
+
+	// Delete the first row (bar) and the last row (newRow).
+	// Change the baz row.  Confirm these rows are no longer fetched and that
+	// the change to baz is seen.
+	if err := rwBatchTb.Delete(ctx, nosql.SingleRow("bar")); err != nil {
+		t.Fatalf("rwBatchTb.Delete(bar) failed: %v", err)
+	}
+	if err := rwBatchTb.Delete(ctx, nosql.SingleRow("newRow")); err != nil {
+		t.Fatalf("rwBatchTb.Delete(newRow) failed: %v", err)
+	}
+	baz2 := Baz{Name: "Batman", Active: false}
+	if err := rwBatchTb.Put(ctx, "baz", baz2); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+	tu.CheckExec(t, ctx, rwBatch, "select k, v from tb",
+		[]string{"k", "v"},
+		[][]*vdl.Value{
+			[]*vdl.Value{vdl.ValueOf("baz"), vdl.ValueOf(baz2)},
+			[]*vdl.Value{vdl.ValueOf("foo"), vdl.ValueOf(foo)},
+		})
+
+	// Add the 2 rows (we just deleted) back again.
+	// Delete the other two rows (baz, foo).
+	// Confirm we just see the three rows we added back.
+	// Add a row in this batch
+	bar2 := Baz{Name: "Tom Thumb", Active: true}
+	if err := rwBatchTb.Put(ctx, "bar", bar2); err != nil {
+		t.Fatalf("rwBatchTb.Put() failed: %v", err)
+	}
+	newRow2 := Baz{Name: "Snow White", Active: false}
+	if err := rwBatchTb.Put(ctx, "newRow", newRow2); err != nil {
+		t.Fatalf("rwBatchTb.Put() failed: %v", err)
+	}
+	if err := rwBatchTb.Delete(ctx, nosql.SingleRow("baz")); err != nil {
+		t.Fatalf("rwBatchTb.Delete(baz) failed: %v", err)
+	}
+	if err := rwBatchTb.Delete(ctx, nosql.SingleRow("foo")); err != nil {
+		t.Fatalf("rwBatchTb.Delete(foo) failed: %v", err)
+	}
+	tu.CheckExec(t, ctx, rwBatch, "select k, v from tb",
+		[]string{"k", "v"},
+		[][]*vdl.Value{
+			[]*vdl.Value{vdl.ValueOf("bar"), vdl.ValueOf(bar2)},
+			[]*vdl.Value{vdl.ValueOf("newRow"), vdl.ValueOf(newRow2)},
+		})
+
+	// commit rw batch
+	rwBatch.Commit(ctx)
+
+	// start a new (ro) batch
+	roBatch, err := d.BeginBatch(ctx, wire.BatchOptions{ReadOnly: false})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	defer roBatch.Abort(ctx)
+
+	// confirm fetching all rows gets the rows committed above
+	// as it was never committed
+	tu.CheckExec(t, ctx, roBatch, "select k, v from tb",
+		[]string{"k", "v"},
+		[][]*vdl.Value{
+			[]*vdl.Value{vdl.ValueOf("bar"), vdl.ValueOf(bar2)},
+			[]*vdl.Value{vdl.ValueOf("newRow"), vdl.ValueOf(newRow2)},
+		})
+}
+
 // Tests enforcement of BatchOptions.ReadOnly.
 func TestReadOnlyBatch(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index d18ab04..9ad3e56 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -404,12 +404,25 @@
 }
 
 func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
+	streams := []store.Stream{}
+	for _, keyRange := range keyRanges {
+		start := keyRange.Start
+		limit := keyRange.Limit
+		// 0-255 means examine all rows
+		if start == string([]byte{0}) && limit == string([]byte{255}) {
+			start = ""
+			limit = ""
+		}
+		// TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
+		//               Need a way to Scan multiple ranges at the same state of uncommitted changes.
+		streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), start, limit)))
+	}
 	return &kvs{
 		t:         t,
 		keyRanges: keyRanges,
-		curr:      -1,
+		curr:      0,
 		validRow:  false,
-		it:        nil,
+		it:        streams,
 		err:       nil,
 	}, nil
 }
@@ -417,11 +430,11 @@
 type kvs struct {
 	t         *tableDb
 	keyRanges query_db.KeyRanges
-	curr      int // current index into prefixes, -1 at start
+	curr      int
 	validRow  bool
 	currKey   string
 	currValue *vdl.Value
-	it        store.Stream // current prefix key value stream
+	it        []store.Stream // array of store.Streams
 	err       error
 }
 
@@ -429,28 +442,15 @@
 	if s.err != nil {
 		return false
 	}
-	if s.curr == -1 {
-		s.curr++
-	}
 	for s.curr < len(s.keyRanges) {
-		if s.it == nil {
-			start := s.keyRanges[s.curr].Start
-			limit := s.keyRanges[s.curr].Limit
-			// 0-255 means examine all rows
-			if start == string([]byte{0}) && limit == string([]byte{255}) {
-				start = ""
-				limit = ""
-			}
-			s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), start, limit))
-		}
-		if s.it.Advance() {
+		if s.it[s.curr].Advance() {
 			// key
-			keyBytes := s.it.Key(nil)
+			keyBytes := s.it[s.curr].Key(nil)
 			parts := util.SplitKeyParts(string(keyBytes))
 			// TODO(rogulenko): Check access for the key.
 			s.currKey = parts[len(parts)-1]
 			// value
-			valueBytes := s.it.Value(nil)
+			valueBytes := s.it[s.curr].Value(nil)
 			var currValue *vdl.Value
 			if err := vom.Decode(valueBytes, &currValue); err != nil {
 				s.validRow = false
@@ -463,12 +463,12 @@
 		}
 		// Advance returned false.  It could be an err, or it could
 		// be we've reached the end.
-		if err := s.it.Err(); err != nil {
+		if err := s.it[s.curr].Err(); err != nil {
 			s.validRow = false
 			s.err = err
 			return false
 		}
-		// We've reached the end of the iterator for this prefix.
+		// We've reached the end of the iterator for this keyRange.
 		// Jump to the next one.
 		s.curr++
 		s.it = nil
@@ -491,7 +491,9 @@
 
 func (s *kvs) Cancel() {
 	if s.it != nil {
-		s.it.Cancel()
+		for i := s.curr; i < len(s.it); i++ {
+			s.it[i].Cancel()
+		}
 		s.it = nil
 	}
 	// set curr to end of keyRanges so Advance will return false
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index eb8507c..67f253c 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -119,21 +119,21 @@
 
 // Put implements the store.StoreWriter interface.
 func (d *db) Put(key, value []byte) error {
-	write := writeOp{
-		t:     putOp,
-		key:   key,
-		value: value,
+	write := store.WriteOp{
+		T:     store.PutOp,
+		Key:   key,
+		Value: value,
 	}
-	return d.write([]writeOp{write}, d.writeOptions)
+	return d.write([]store.WriteOp{write}, d.writeOptions)
 }
 
 // Delete implements the store.StoreWriter interface.
 func (d *db) Delete(key []byte) error {
-	write := writeOp{
-		t:   deleteOp,
-		key: key,
+	write := store.WriteOp{
+		T:   store.DeleteOp,
+		Key: key,
 	}
-	return d.write([]writeOp{write}, d.writeOptions)
+	return d.write([]store.WriteOp{write}, d.writeOptions)
 }
 
 // NewTransaction implements the store.Store interface.
@@ -160,14 +160,14 @@
 
 // write writes a batch and adds all written keys to txTable.
 // TODO(rogulenko): remove this method.
-func (d *db) write(batch []writeOp, cOpts *C.leveldb_writeoptions_t) error {
+func (d *db) write(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
 	d.txmu.Lock()
 	defer d.txmu.Unlock()
 	return d.writeLocked(batch, cOpts)
 }
 
 // writeLocked is like write(), but it assumes txmu is held.
-func (d *db) writeLocked(batch []writeOp, cOpts *C.leveldb_writeoptions_t) error {
+func (d *db) writeLocked(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
 	d.mu.Lock()
 	defer d.mu.Unlock()
 	if d.err != nil {
@@ -176,16 +176,16 @@
 	cBatch := C.leveldb_writebatch_create()
 	defer C.leveldb_writebatch_destroy(cBatch)
 	for _, write := range batch {
-		switch write.t {
-		case putOp:
-			cKey, cKeyLen := cSlice(write.key)
-			cVal, cValLen := cSlice(write.value)
+		switch write.T {
+		case store.PutOp:
+			cKey, cKeyLen := cSlice(write.Key)
+			cVal, cValLen := cSlice(write.Value)
 			C.leveldb_writebatch_put(cBatch, cKey, cKeyLen, cVal, cValLen)
-		case deleteOp:
-			cKey, cKeyLen := cSlice(write.key)
+		case store.DeleteOp:
+			cKey, cKeyLen := cSlice(write.Key)
 			C.leveldb_writebatch_delete(cBatch, cKey, cKeyLen)
 		default:
-			panic(fmt.Sprintf("unknown write operation type: %v", write.t))
+			panic(fmt.Sprintf("unknown write operation type: %v", write.T))
 		}
 	}
 	var cError *C.char
@@ -201,14 +201,14 @@
 }
 
 // trackBatch writes the batch to txTable and adds a commit event to txEvents.
-func (d *db) trackBatch(batch []writeOp) {
+func (d *db) trackBatch(batch []store.WriteOp) {
 	// TODO(rogulenko): do GC.
 	d.txSequenceNumber++
 	seq := d.txSequenceNumber
 	var keys [][]byte
 	for _, write := range batch {
-		d.txTable.add(write.key, seq)
-		keys = append(keys, write.key)
+		d.txTable.add(write.Key, seq)
+		keys = append(keys, write.Key)
 	}
 	tx := &commitedTransaction{
 		seq:   seq,
diff --git a/x/ref/services/syncbase/store/leveldb/transaction.go b/x/ref/services/syncbase/store/leveldb/transaction.go
index 70f1bb0..f3f755c 100644
--- a/x/ref/services/syncbase/store/leveldb/transaction.go
+++ b/x/ref/services/syncbase/store/leveldb/transaction.go
@@ -15,28 +15,6 @@
 	"v.io/v23/verror"
 )
 
-type scanRange struct {
-	start, limit []byte
-}
-
-type readSet struct {
-	keys   [][]byte
-	ranges []scanRange
-}
-
-type writeType int
-
-const (
-	putOp writeType = iota
-	deleteOp
-)
-
-type writeOp struct {
-	t     writeType
-	key   []byte
-	value []byte
-}
-
 // commitedTransaction is only used as an element of db.txEvents.
 type commitedTransaction struct {
 	seq   uint64
@@ -53,8 +31,8 @@
 	seq      uint64
 	event    *list.Element // pointer to element of db.txEvents
 	snapshot store.Snapshot
-	reads    readSet
-	writes   []writeOp
+	reads    store.ReadSet
+	writes   []store.WriteOp
 	cOpts    *C.leveldb_writeoptions_t
 	err      error
 }
@@ -110,7 +88,7 @@
 	if tx.err != nil {
 		return valbuf, convertError(tx.err)
 	}
-	tx.reads.keys = append(tx.reads.keys, key)
+	tx.reads.Keys = append(tx.reads.Keys, key)
 
 	// Reflect the state of the transaction: the "writes" (puts and
 	// deletes) override the values in the transaction snapshot.
@@ -119,9 +97,9 @@
 	// deletes) instead of an array.
 	for i := len(tx.writes) - 1; i >= 0; i-- {
 		op := &tx.writes[i]
-		if bytes.Equal(op.key, key) {
-			if op.t == putOp {
-				return op.value, nil
+		if bytes.Equal(op.Key, key) {
+			if op.T == store.PutOp {
+				return op.Value, nil
 			}
 			return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
 		}
@@ -137,14 +115,14 @@
 	if tx.err != nil {
 		return &store.InvalidStream{tx.err}
 	}
-	// TODO(rdaoud): create an in-memory copy of the current transaction
-	// state (the puts and deletes so far) for the scan stream's Advance()
-	// to merge that data while traversing the store snapshot.
-	tx.reads.ranges = append(tx.reads.ranges, scanRange{
-		start: start,
-		limit: limit,
+
+	tx.reads.Ranges = append(tx.reads.Ranges, store.ScanRange{
+		Start: start,
+		Limit: limit,
 	})
-	return tx.snapshot.Scan(start, limit)
+
+	// Return a stream which merges the snaphot stream with the uncommitted changes.
+	return store.MergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -154,10 +132,10 @@
 	if tx.err != nil {
 		return convertError(tx.err)
 	}
-	tx.writes = append(tx.writes, writeOp{
-		t:     putOp,
-		key:   key,
-		value: value,
+	tx.writes = append(tx.writes, store.WriteOp{
+		T:     store.PutOp,
+		Key:   key,
+		Value: value,
 	})
 	return nil
 }
@@ -169,9 +147,9 @@
 	if tx.err != nil {
 		return convertError(tx.err)
 	}
-	tx.writes = append(tx.writes, writeOp{
-		t:   deleteOp,
-		key: key,
+	tx.writes = append(tx.writes, store.WriteOp{
+		T:   store.DeleteOp,
+		Key: key,
 	})
 	return nil
 }
@@ -180,13 +158,13 @@
 // been invalidated by other transactions.
 // Assumes tx.d.txmu is held.
 func (tx *transaction) validateReadSet() bool {
-	for _, key := range tx.reads.keys {
+	for _, key := range tx.reads.Keys {
 		if tx.d.txTable.get(key) > tx.seq {
 			return false
 		}
 	}
-	for _, r := range tx.reads.ranges {
-		if tx.d.txTable.rangeMax(r.start, r.limit) > tx.seq {
+	for _, r := range tx.reads.Ranges {
+		if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
 			return false
 		}
 
diff --git a/x/ref/services/syncbase/store/memstore/transaction.go b/x/ref/services/syncbase/store/memstore/transaction.go
index 1a1b0f4..a2ed326 100644
--- a/x/ref/services/syncbase/store/memstore/transaction.go
+++ b/x/ref/services/syncbase/store/memstore/transaction.go
@@ -92,10 +92,19 @@
 	if err := tx.error(); err != nil {
 		return &store.InvalidStream{err}
 	}
-	// TODO(rdaoud): create an in-memory copy of the current transaction
-	// state (the puts and deletes so far) for the scan stream's Advance()
-	// to merge that data while traversing the store snapshot.
-	return tx.sn.Scan(start, limit)
+
+	// Create an array of store.WriteOps as it is needed to merge
+	// the snaphot stream with the uncommitted changes.
+	var writes []store.WriteOp
+	for k, v := range tx.puts {
+		writes = append(writes, store.WriteOp{T: store.PutOp, Key: []byte(k), Value: v})
+	}
+	for k, _ := range tx.deletes {
+		writes = append(writes, store.WriteOp{T: store.DeleteOp, Key: []byte(k), Value: []byte{}})
+	}
+
+	// Return a stream which merges the snaphot stream with the uncommitted changes.
+	return store.MergeWritesWithStream(tx.sn, writes, start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/x/ref/services/syncbase/store/merged_stream.go b/x/ref/services/syncbase/store/merged_stream.go
new file mode 100644
index 0000000..e8c1058
--- /dev/null
+++ b/x/ref/services/syncbase/store/merged_stream.go
@@ -0,0 +1,147 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+import (
+	"sort"
+)
+
+//////////////////////////////////////////////////////////////
+// mergedStream implementation of Stream
+//
+// This implementation of Stream must take into account writes
+// which have occurred since the snapshot was taken on the
+// transaction.
+//
+// The MergeWritesWithStream() function requires uncommitted
+// changes to be passed in as an array of WriteOp.
+
+// Create a new stream which merges a snapshot stream with an array of write operations.
+func MergeWritesWithStream(sn Snapshot, w []WriteOp, start, limit []byte) Stream {
+	// Collect writes with the range specified, then sort them.
+	// Note: Writes could contain more than one write for a given key.
+	//       The last write is the current state.
+	writesMap := map[string]WriteOp{}
+	for _, write := range w {
+		if string(write.Key) >= string(start) && (string(limit) == "" || string(write.Key) < string(limit)) {
+			writesMap[string(write.Key)] = write
+		}
+	}
+	var writesArray WriteOpArray
+	for _, writeOp := range writesMap {
+		writesArray = append(writesArray, writeOp)
+	}
+	sort.Sort(writesArray)
+	return &mergedStream{
+		snapshotStream:      sn.Scan(start, limit),
+		writesArray:         writesArray,
+		writesCursor:        0,
+		unusedSnapshotValue: false,
+		snapshotStreamEOF:   false,
+		hasValue:            false,
+	}
+}
+
+type mergedStream struct {
+	snapshotStream      Stream
+	writesArray         []WriteOp
+	writesCursor        int
+	unusedSnapshotValue bool
+	snapshotStreamEOF   bool
+	hasValue            bool // if true, Key() and Value() can be called
+	key                 []byte
+	value               []byte
+}
+
+// Convenience function to check EOF on writesArray
+func (s *mergedStream) writesArrayEOF() bool {
+	return s.writesCursor >= len(s.writesArray)
+}
+
+// If a kv from the snapshot isn't on deck, call
+// Advance on the snapshot and set unusedSnapshotValue.
+// If EOF encountered, set snapshotStreamEOF.
+// If error encountered, return it.
+func (s *mergedStream) stageSnapshotKeyValue() error {
+	if !s.snapshotStreamEOF && !s.unusedSnapshotValue {
+		if !s.snapshotStream.Advance() {
+			s.snapshotStreamEOF = true
+			if err := s.snapshotStream.Err(); err != nil {
+				return err
+			}
+		}
+		s.unusedSnapshotValue = true
+	}
+	return nil
+}
+
+// Pick a kv from either the snapshot or the uncommited writes array.
+// If an uncommited write is picked advance past it and return false (also, advance the snapshot
+// stream if its current key is equal to the ucommitted delete).
+func (s *mergedStream) pickKeyValue() bool {
+	if !s.snapshotStreamEOF && (s.writesArrayEOF() || string(s.writesArray[s.writesCursor].Key) > string(s.snapshotStream.Key(nil))) {
+		s.key = s.snapshotStream.Key(s.key)
+		s.value = s.snapshotStream.Value(s.value)
+		s.unusedSnapshotValue = false
+		return true
+	}
+	if !s.snapshotStreamEOF && string(s.writesArray[s.writesCursor].Key) == string(s.snapshotStream.Key(nil)) {
+		s.unusedSnapshotValue = false
+	}
+	if s.writesArrayEOF() || s.writesArray[s.writesCursor].T == DeleteOp {
+		s.writesCursor++
+		return false
+	}
+	s.key = CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
+	s.value = CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
+	s.writesCursor++
+	return true
+}
+
+func (s *mergedStream) Advance() bool {
+	s.hasValue = false
+	for true {
+		if err := s.stageSnapshotKeyValue(); err != nil {
+			return false
+		}
+		if s.snapshotStreamEOF && s.writesArrayEOF() {
+			return false
+		}
+		if s.pickKeyValue() {
+			s.hasValue = true
+			return true
+		}
+	}
+	return false // compiler insists on this line
+}
+
+// Key implements the Stream interface.
+func (s *mergedStream) Key(keybuf []byte) []byte {
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return CopyBytes(keybuf, s.key)
+}
+
+// Value implements the Stream interface.
+func (s *mergedStream) Value(valbuf []byte) []byte {
+	if !s.hasValue {
+		panic("nothing staged")
+	}
+	return CopyBytes(valbuf, s.value)
+}
+
+// Err implements the Stream interface.
+func (s *mergedStream) Err() error {
+	return s.snapshotStream.Err()
+}
+
+// Cancel implements the Stream interface.
+func (s *mergedStream) Cancel() {
+	s.snapshotStream.Cancel()
+	s.hasValue = false
+	s.snapshotStreamEOF = true
+	s.writesCursor = len(s.writesArray)
+}
diff --git a/x/ref/services/syncbase/store/test/transaction.go b/x/ref/services/syncbase/store/test/transaction.go
index 889e05b..3e1039b 100644
--- a/x/ref/services/syncbase/store/test/transaction.go
+++ b/x/ref/services/syncbase/store/test/transaction.go
@@ -46,10 +46,14 @@
 		verifyAdvance(t, s, key1, value1)
 		verifyAdvance(t, s, nil, nil)
 
-		// Test Put then Get inside the transaction.
+		// Test Put then Get & Scan inside the transaction.
 		key3, value3 := []byte("key3"), []byte("value3")
 		tx.Put(key3, value3)
 		verifyGet(t, tx, key3, value3)
+		s = tx.Scan([]byte("a"), []byte("z"))
+		verifyAdvance(t, s, key1, value1)
+		verifyAdvance(t, s, key3, value3)
+		verifyAdvance(t, s, nil, nil)
 
 		// Test Delete of old key then Get inside the transaction.
 		tx.Delete(key1)
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index d42a367..ac9a12e 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -45,3 +45,43 @@
 	copy(dst, src)
 	return dst
 }
+
+//////////////////////////////////////////////////////////////
+// Read and Write types used for storing transcation reads
+// and uncommitted writes.
+
+type ScanRange struct {
+	Start, Limit []byte
+}
+
+type ReadSet struct {
+	Keys   [][]byte
+	Ranges []ScanRange
+}
+
+type WriteType int
+
+const (
+	PutOp WriteType = iota
+	DeleteOp
+)
+
+type WriteOp struct {
+	T     WriteType
+	Key   []byte
+	Value []byte
+}
+
+type WriteOpArray []WriteOp
+
+func (a WriteOpArray) Len() int {
+	return len(a)
+}
+
+func (a WriteOpArray) Less(i, j int) bool {
+	return string(a[i].Key) < string(a[j].Key)
+}
+
+func (a WriteOpArray) Swap(i, j int) {
+	a[i], a[j] = a[j], a[i]
+}