Merge "runtime/internal/flow/conn: Increment next flowID by 2."
diff --git a/services/syncbase/store/ptrie/stream.go b/services/syncbase/store/ptrie/stream.go
index 73ce855..df2bb88 100644
--- a/services/syncbase/store/ptrie/stream.go
+++ b/services/syncbase/store/ptrie/stream.go
@@ -38,7 +38,7 @@
 		return &Stream{}
 	}
 	s := &Stream{
-		limit: store.CopyBytes(nil, limit),
+		limit: copyBytes(limit),
 	}
 	// Locate the first key-value pair with key >= start and capture
 	// the DFS stack.
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
index 3d40c09..0b37927 100644
--- a/services/syncbase/store/transactions/manager.go
+++ b/services/syncbase/store/transactions/manager.go
@@ -175,17 +175,3 @@
 	Keys   [][]byte
 	Ranges []scanRange
 }
-
-type writeOpArray []WriteOp
-
-func (a writeOpArray) Len() int {
-	return len(a)
-}
-
-func (a writeOpArray) Less(i, j int) bool {
-	return string(a[i].Key) < string(a[j].Key)
-}
-
-func (a writeOpArray) Swap(i, j int) {
-	a[i], a[j] = a[j], a[i]
-}
diff --git a/services/syncbase/store/transactions/merged_stream.go b/services/syncbase/store/transactions/merged_stream.go
index 8c96bef..560de6b 100644
--- a/services/syncbase/store/transactions/merged_stream.go
+++ b/services/syncbase/store/transactions/merged_stream.go
@@ -5,9 +5,12 @@
 package transactions
 
 import (
-	"sort"
+	"bytes"
+	"fmt"
+	"sync"
 
 	"v.io/x/ref/services/syncbase/store"
+	"v.io/x/ref/services/syncbase/store/ptrie"
 )
 
 //////////////////////////////////////////////////////////////
@@ -16,103 +19,112 @@
 // This implementation of Stream must take into account writes
 // which have occurred since the snapshot was taken on the
 // transaction.
-//
-// The MergeWritesWithStream() function requires uncommitted
-// changes to be passed in as an array of WriteOp.
 
-// Create a new stream which merges a snapshot stream with an array of write operations.
-func mergeWritesWithStream(sn store.Snapshot, w []WriteOp, start, limit []byte) store.Stream {
-	// Collect writes with the range specified, then sort them.
-	// Note: Writes could contain more than one write for a given key.
-	//       The last write is the current state.
-	writesMap := map[string]WriteOp{}
-	for _, write := range w {
-		if string(write.Key) >= string(start) && (string(limit) == "" || string(write.Key) < string(limit)) {
-			writesMap[string(write.Key)] = write
-		}
+// Create a new stream which merges a snapshot stream with write operations.
+func mergeWritesWithStream(sn store.Snapshot, w *ptrie.T, start, limit []byte) store.Stream {
+	m := &mergedStream{
+		s:       sn.Scan(start, limit),
+		sHasKey: true,
+		p:       w.Scan(start, limit),
+		pHasKey: true,
 	}
-	var writesArray writeOpArray
-	for _, writeOp := range writesMap {
-		writesArray = append(writesArray, writeOp)
-	}
-	sort.Sort(writesArray)
-	return &mergedStream{
-		snapshotStream:      sn.Scan(start, limit),
-		writesArray:         writesArray,
-		writesCursor:        0,
-		unusedSnapshotValue: false,
-		snapshotStreamEOF:   false,
-		hasValue:            false,
-	}
+	m.advanceS()
+	m.advanceP()
+	return m
 }
 
+type valueSourceType uint32
+
+const (
+	notInitialized valueSourceType = iota
+	snapshotStream
+	ptrieStream
+)
+
 type mergedStream struct {
-	snapshotStream      store.Stream
-	writesArray         []WriteOp
-	writesCursor        int
-	unusedSnapshotValue bool
-	snapshotStreamEOF   bool
-	hasValue            bool // if true, Key() and Value() can be called
-	key                 []byte
-	value               []byte
+	mu sync.Mutex
+
+	s       store.Stream
+	sHasKey bool
+	sKey    []byte
+
+	p       *ptrie.Stream
+	pHasKey bool
+	pKey    []byte
+	// value indicates which stream holds the staged key-value pair
+	valueSource valueSourceType
+	cancelMutex sync.Mutex // protects isCanceled
+	isCanceled  bool
 }
 
-// Convenience function to check EOF on writesArray
-func (s *mergedStream) writesArrayEOF() bool {
-	return s.writesCursor >= len(s.writesArray)
+func (m *mergedStream) advanceS() {
+	if m.sHasKey {
+		m.sHasKey = m.s.Advance()
+	}
+	if m.sHasKey {
+		m.sKey = m.s.Key(m.sKey)
+	}
 }
 
-// If a kv from the snapshot isn't on deck, call
-// Advance on the snapshot and set unusedSnapshotValue.
-// If EOF encountered, set snapshotStreamEOF.
-// If error encountered, return it.
-func (s *mergedStream) stageSnapshotKeyValue() error {
-	if !s.snapshotStreamEOF && !s.unusedSnapshotValue {
-		if !s.snapshotStream.Advance() {
-			s.snapshotStreamEOF = true
-			if err := s.snapshotStream.Err(); err != nil {
-				return err
-			}
-		}
-		s.unusedSnapshotValue = true
+func (m *mergedStream) advanceP() {
+	if m.pHasKey {
+		m.pHasKey = m.p.Advance()
 	}
-	return nil
+	if m.pHasKey {
+		m.pKey = m.p.Key(m.pKey)
+	}
 }
 
-// Pick a kv from either the snapshot or the uncommited writes array.
-// If an uncommited write is picked advance past it and return false (also, advance the snapshot
-// stream if its current key is equal to the ucommitted delete).
-func (s *mergedStream) pickKeyValue() bool {
-	if !s.snapshotStreamEOF && (s.writesArrayEOF() || string(s.writesArray[s.writesCursor].Key) > string(s.snapshotStream.Key(nil))) {
-		s.key = s.snapshotStream.Key(s.key)
-		s.value = s.snapshotStream.Value(s.value)
-		s.unusedSnapshotValue = false
-		return true
-	}
-	if !s.snapshotStreamEOF && string(s.writesArray[s.writesCursor].Key) == string(s.snapshotStream.Key(nil)) {
-		s.unusedSnapshotValue = false
-	}
-	if s.writesArrayEOF() || s.writesArray[s.writesCursor].T == DeleteOp {
-		s.writesCursor++
-		return false
-	}
-	s.key = store.CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
-	s.value = store.CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
-	s.writesCursor++
-	return true
+func (m *mergedStream) canceled() bool {
+	m.cancelMutex.Lock()
+	defer m.cancelMutex.Unlock()
+	return m.isCanceled
 }
 
-func (s *mergedStream) Advance() bool {
-	s.hasValue = false
+// stage stages a key-value pair from either the snapshot or the uncommitted
+// writes.
+func (m *mergedStream) stage() valueSourceType {
+	if m.sHasKey && (!m.pHasKey || bytes.Compare(m.sKey, m.pKey) < 0) {
+		return snapshotStream
+	}
+	if m.sHasKey && bytes.Compare(m.sKey, m.pKey) == 0 {
+		m.advanceS()
+	}
+	switch value := m.p.Value().(type) {
+	case isDeleted:
+		m.advanceP()
+		return notInitialized
+	case []byte:
+		return ptrieStream
+	default:
+		panic(fmt.Sprintf("unexpected type %T of value", value))
+	}
+}
+
+// Advance implements the Stream interface.
+func (m *mergedStream) Advance() bool {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	// Invariant: one of the two streams points to the last staged key-value
+	// pair and the other stream points to the key greater than the last staged
+	// key with the exception if it is the first call of Advance().
+	switch m.valueSource {
+	case snapshotStream:
+		m.advanceS()
+	case ptrieStream:
+		m.advanceP()
+	}
+	m.valueSource = notInitialized
+	// Invariant: both streams point to a key-value pairs with keys greater than
+	// the last staged key.
+	// We need to pick a stream that points to a smaller key. If the picked
+	// stream is the ptrie stream and the key-value pair represents a delete
+	// operation, we skip the key-value pair and pick a key-value pair again.
 	for true {
-		if err := s.stageSnapshotKeyValue(); err != nil {
+		if m.canceled() || (!m.sHasKey && !m.pHasKey) {
 			return false
 		}
-		if s.snapshotStreamEOF && s.writesArrayEOF() {
-			return false
-		}
-		if s.pickKeyValue() {
-			s.hasValue = true
+		if m.valueSource = m.stage(); m.valueSource != notInitialized {
 			return true
 		}
 	}
@@ -120,30 +132,45 @@
 }
 
 // Key implements the Stream interface.
-func (s *mergedStream) Key(keybuf []byte) []byte {
-	if !s.hasValue {
+func (m *mergedStream) Key(keybuf []byte) []byte {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	switch m.valueSource {
+	case snapshotStream:
+		return store.CopyBytes(keybuf, m.sKey)
+	case ptrieStream:
+		return store.CopyBytes(keybuf, m.pKey)
+	default:
 		panic("nothing staged")
 	}
-	return store.CopyBytes(keybuf, s.key)
 }
 
 // Value implements the Stream interface.
-func (s *mergedStream) Value(valbuf []byte) []byte {
-	if !s.hasValue {
+func (m *mergedStream) Value(valbuf []byte) []byte {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	switch m.valueSource {
+	case snapshotStream:
+		// m.s.Value internally copies bytes to valbuf.
+		return m.s.Value(valbuf)
+	case ptrieStream:
+		return store.CopyBytes(valbuf, m.p.Value().([]byte))
+	default:
 		panic("nothing staged")
 	}
-	return store.CopyBytes(valbuf, s.value)
 }
 
 // Err implements the Stream interface.
-func (s *mergedStream) Err() error {
-	return s.snapshotStream.Err()
+func (m *mergedStream) Err() error {
+	return m.s.Err()
 }
 
 // Cancel implements the Stream interface.
-func (s *mergedStream) Cancel() {
-	s.snapshotStream.Cancel()
-	s.hasValue = false
-	s.snapshotStreamEOF = true
-	s.writesCursor = len(s.writesArray)
+func (m *mergedStream) Cancel() {
+	m.cancelMutex.Lock()
+	if !m.isCanceled {
+		m.isCanceled = true
+		m.s.Cancel()
+	}
+	m.cancelMutex.Unlock()
 }
diff --git a/services/syncbase/store/transactions/merged_stream_test.go b/services/syncbase/store/transactions/merged_stream_test.go
new file mode 100644
index 0000000..be4a75d
--- /dev/null
+++ b/services/syncbase/store/transactions/merged_stream_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions_test
+
+import (
+	"bytes"
+	"testing"
+
+	"v.io/x/ref/services/syncbase/store/memstore"
+)
+
+// TestMergedStream checks correctness of the mergedStream.
+func TestMergedStream(t *testing.T) {
+	// Here is the test structure: we take N keys and run through all possible
+	// states of store with 3 keys and all possible transactions that
+	// involve 3 keys. For store key there are two states: exists and doesn't
+	// exist. For a transaction key there are three states: not in transaction,
+	// put and delete.
+	keys := []string{"a", "b", "c", "d"}
+	N := len(keys)
+	n2, n3 := 1, 1 // 2**N and 3**N
+	for i := 0; i < N; i++ {
+		n2, n3 = n2*2, n3*3
+	}
+	for storeState := 0; storeState < n2; storeState++ {
+		for txState := 0; txState < n3; txState++ {
+			var expected []string
+			st := memstore.New()
+			ss, ts := storeState, txState
+			// Populate the store and the expected result.
+			for i := 0; i < N; i++ {
+				if ss%2 == 1 {
+					st.Put([]byte(keys[i]), []byte(keys[i]))
+				}
+				if (ss%2 == 1 || ts%3 == 1) && (ts%3 != 2) {
+					expected = append(expected, keys[i])
+				}
+				ss /= 2
+				ts /= 3
+			}
+			// Create a transaction.
+			tx := st.NewTransaction()
+			ts = txState
+			for i := 0; i < N; i++ {
+				if ts%3 == 1 {
+					tx.Put([]byte(keys[i]), []byte(keys[i]))
+				} else if ts%3 == 2 {
+					tx.Delete([]byte(keys[i]))
+				}
+				ts /= 3
+			}
+			s := tx.Scan(nil, nil)
+			for i := 0; i < len(expected); i++ {
+				if !s.Advance() {
+					t.Fatal("the stream didn't advance")
+				}
+				if got, want := s.Key(nil), []byte(expected[i]); !bytes.Equal(got, want) {
+					t.Fatalf("unexpected key: got %q, want %q", got, want)
+				}
+				if got, want := s.Value(nil), []byte(expected[i]); !bytes.Equal(got, want) {
+					t.Fatalf("unexpected value: got %q, want %q", got, want)
+				}
+			}
+			if s.Advance() {
+				t.Fatal("the stream advanced")
+			}
+			if err := s.Err(); err != nil {
+				t.Fatalf("unexpected stream error: %v", err)
+			}
+		}
+	}
+}
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 1c81abd..78abc36 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -5,15 +5,18 @@
 package transactions
 
 import (
-	"bytes"
 	"container/list"
+	"fmt"
 	"sync"
 
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/store"
+	"v.io/x/ref/services/syncbase/store/ptrie"
 )
 
+type isDeleted struct{}
+
 // transaction is a wrapper on top of a BatchWriter and a store.Snapshot that
 // implements the store.Transaction interface.
 type transaction struct {
@@ -24,8 +27,13 @@
 	event    *list.Element // pointer to element of mg.events
 	snapshot store.Snapshot
 	reads    readSet
-	writes   []WriteOp
-	err      error
+	// writes holds in-flight mutations of the transaction.
+	// writes holds key-value pairs where the value type can be:
+	//   isDeleted: the last modification of the row was Delete;
+	//   []byte: the last modification of the row was Put, value holds
+	//           the actual value that was put.
+	writes *ptrie.T
+	err    error
 }
 
 var _ store.Transaction = (*transaction)(nil)
@@ -35,6 +43,7 @@
 		mg:       mg,
 		snapshot: mg.BatchStore.NewSnapshot(),
 		seq:      mg.seq,
+		writes:   ptrie.New(true),
 	}
 	tx.event = mg.events.PushFront(tx)
 	return tx
@@ -56,23 +65,18 @@
 	if tx.err != nil {
 		return valbuf, store.ConvertError(tx.err)
 	}
+	key = store.CopyBytes(nil, key)
 	tx.reads.Keys = append(tx.reads.Keys, key)
-
-	// Reflect the state of the transaction: the "writes" (puts and
-	// deletes) override the values in the transaction snapshot.
-	// Find the last "writes" entry for this key, if one exists.
-	// Note: this step could be optimized by using maps (puts and
-	// deletes) instead of an array.
-	for i := len(tx.writes) - 1; i >= 0; i-- {
-		op := &tx.writes[i]
-		if bytes.Equal(op.Key, key) {
-			if op.T == PutOp {
-				return op.Value, nil
-			}
+	if value := tx.writes.Get(key); value != nil {
+		switch bytes := value.(type) {
+		case []byte:
+			return store.CopyBytes(valbuf, bytes), nil
+		case isDeleted:
 			return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
+		default:
+			panic(fmt.Sprintf("unexpected type %T of value", bytes))
 		}
 	}
-
 	return tx.snapshot.Get(key, valbuf)
 }
 
@@ -83,14 +87,13 @@
 	if tx.err != nil {
 		return &store.InvalidStream{Error: tx.err}
 	}
-
+	start, limit = store.CopyBytes(nil, start), store.CopyBytes(nil, limit)
 	tx.reads.Ranges = append(tx.reads.Ranges, scanRange{
 		Start: start,
 		Limit: limit,
 	})
-
 	// Return a stream which merges the snaphot stream with the uncommitted changes.
-	return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
+	return mergeWritesWithStream(tx.snapshot, tx.writes.Copy(), start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -100,11 +103,7 @@
 	if tx.err != nil {
 		return store.ConvertError(tx.err)
 	}
-	tx.writes = append(tx.writes, WriteOp{
-		T:     PutOp,
-		Key:   key,
-		Value: value,
-	})
+	tx.writes.Put(key, value)
 	return nil
 }
 
@@ -115,10 +114,7 @@
 	if tx.err != nil {
 		return store.ConvertError(tx.err)
 	}
-	tx.writes = append(tx.writes, WriteOp{
-		T:   DeleteOp,
-		Key: key,
-	})
+	tx.writes.Put(key, isDeleted{})
 	return nil
 }
 
@@ -163,10 +159,22 @@
 	if !tx.validateReadSet() {
 		return store.NewErrConcurrentTransaction(nil)
 	}
-	if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil {
+	var batch []WriteOp
+	s := tx.writes.Scan(nil, nil)
+	for s.Advance() {
+		switch bytes := s.Value().(type) {
+		case []byte:
+			batch = append(batch, WriteOp{T: PutOp, Key: s.Key(nil), Value: bytes})
+		case isDeleted:
+			batch = append(batch, WriteOp{T: DeleteOp, Key: s.Key(nil)})
+		default:
+			panic(fmt.Sprintf("unexpected type %T of value", bytes))
+		}
+	}
+	if err := tx.mg.BatchStore.WriteBatch(batch...); err != nil {
 		return err
 	}
-	tx.mg.trackBatch(tx.writes...)
+	tx.mg.trackBatch(batch...)
 	return nil
 }