Merge "runtime/internal/flow/conn: Increment next flowID by 2."
diff --git a/services/syncbase/store/ptrie/stream.go b/services/syncbase/store/ptrie/stream.go
index 73ce855..df2bb88 100644
--- a/services/syncbase/store/ptrie/stream.go
+++ b/services/syncbase/store/ptrie/stream.go
@@ -38,7 +38,7 @@
return &Stream{}
}
s := &Stream{
- limit: store.CopyBytes(nil, limit),
+ limit: copyBytes(limit),
}
// Locate the first key-value pair with key >= start and capture
// the DFS stack.
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
index 3d40c09..0b37927 100644
--- a/services/syncbase/store/transactions/manager.go
+++ b/services/syncbase/store/transactions/manager.go
@@ -175,17 +175,3 @@
Keys [][]byte
Ranges []scanRange
}
-
-type writeOpArray []WriteOp
-
-func (a writeOpArray) Len() int {
- return len(a)
-}
-
-func (a writeOpArray) Less(i, j int) bool {
- return string(a[i].Key) < string(a[j].Key)
-}
-
-func (a writeOpArray) Swap(i, j int) {
- a[i], a[j] = a[j], a[i]
-}
diff --git a/services/syncbase/store/transactions/merged_stream.go b/services/syncbase/store/transactions/merged_stream.go
index 8c96bef..560de6b 100644
--- a/services/syncbase/store/transactions/merged_stream.go
+++ b/services/syncbase/store/transactions/merged_stream.go
@@ -5,9 +5,12 @@
package transactions
import (
- "sort"
+ "bytes"
+ "fmt"
+ "sync"
"v.io/x/ref/services/syncbase/store"
+ "v.io/x/ref/services/syncbase/store/ptrie"
)
//////////////////////////////////////////////////////////////
@@ -16,103 +19,112 @@
// This implementation of Stream must take into account writes
// which have occurred since the snapshot was taken on the
// transaction.
-//
-// The MergeWritesWithStream() function requires uncommitted
-// changes to be passed in as an array of WriteOp.
-// Create a new stream which merges a snapshot stream with an array of write operations.
-func mergeWritesWithStream(sn store.Snapshot, w []WriteOp, start, limit []byte) store.Stream {
- // Collect writes with the range specified, then sort them.
- // Note: Writes could contain more than one write for a given key.
- // The last write is the current state.
- writesMap := map[string]WriteOp{}
- for _, write := range w {
- if string(write.Key) >= string(start) && (string(limit) == "" || string(write.Key) < string(limit)) {
- writesMap[string(write.Key)] = write
- }
+// Create a new stream which merges a snapshot stream with write operations.
+func mergeWritesWithStream(sn store.Snapshot, w *ptrie.T, start, limit []byte) store.Stream {
+ m := &mergedStream{
+ s: sn.Scan(start, limit),
+ sHasKey: true,
+ p: w.Scan(start, limit),
+ pHasKey: true,
}
- var writesArray writeOpArray
- for _, writeOp := range writesMap {
- writesArray = append(writesArray, writeOp)
- }
- sort.Sort(writesArray)
- return &mergedStream{
- snapshotStream: sn.Scan(start, limit),
- writesArray: writesArray,
- writesCursor: 0,
- unusedSnapshotValue: false,
- snapshotStreamEOF: false,
- hasValue: false,
- }
+ m.advanceS()
+ m.advanceP()
+ return m
}
+type valueSourceType uint32
+
+const (
+ notInitialized valueSourceType = iota
+ snapshotStream
+ ptrieStream
+)
+
type mergedStream struct {
- snapshotStream store.Stream
- writesArray []WriteOp
- writesCursor int
- unusedSnapshotValue bool
- snapshotStreamEOF bool
- hasValue bool // if true, Key() and Value() can be called
- key []byte
- value []byte
+ mu sync.Mutex
+
+ s store.Stream
+ sHasKey bool
+ sKey []byte
+
+ p *ptrie.Stream
+ pHasKey bool
+ pKey []byte
+ // value indicates which stream holds the staged key-value pair
+ valueSource valueSourceType
+ cancelMutex sync.Mutex // protects isCanceled
+ isCanceled bool
}
-// Convenience function to check EOF on writesArray
-func (s *mergedStream) writesArrayEOF() bool {
- return s.writesCursor >= len(s.writesArray)
+func (m *mergedStream) advanceS() {
+ if m.sHasKey {
+ m.sHasKey = m.s.Advance()
+ }
+ if m.sHasKey {
+ m.sKey = m.s.Key(m.sKey)
+ }
}
-// If a kv from the snapshot isn't on deck, call
-// Advance on the snapshot and set unusedSnapshotValue.
-// If EOF encountered, set snapshotStreamEOF.
-// If error encountered, return it.
-func (s *mergedStream) stageSnapshotKeyValue() error {
- if !s.snapshotStreamEOF && !s.unusedSnapshotValue {
- if !s.snapshotStream.Advance() {
- s.snapshotStreamEOF = true
- if err := s.snapshotStream.Err(); err != nil {
- return err
- }
- }
- s.unusedSnapshotValue = true
+func (m *mergedStream) advanceP() {
+ if m.pHasKey {
+ m.pHasKey = m.p.Advance()
}
- return nil
+ if m.pHasKey {
+ m.pKey = m.p.Key(m.pKey)
+ }
}
-// Pick a kv from either the snapshot or the uncommited writes array.
-// If an uncommited write is picked advance past it and return false (also, advance the snapshot
-// stream if its current key is equal to the ucommitted delete).
-func (s *mergedStream) pickKeyValue() bool {
- if !s.snapshotStreamEOF && (s.writesArrayEOF() || string(s.writesArray[s.writesCursor].Key) > string(s.snapshotStream.Key(nil))) {
- s.key = s.snapshotStream.Key(s.key)
- s.value = s.snapshotStream.Value(s.value)
- s.unusedSnapshotValue = false
- return true
- }
- if !s.snapshotStreamEOF && string(s.writesArray[s.writesCursor].Key) == string(s.snapshotStream.Key(nil)) {
- s.unusedSnapshotValue = false
- }
- if s.writesArrayEOF() || s.writesArray[s.writesCursor].T == DeleteOp {
- s.writesCursor++
- return false
- }
- s.key = store.CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
- s.value = store.CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
- s.writesCursor++
- return true
+func (m *mergedStream) canceled() bool {
+ m.cancelMutex.Lock()
+ defer m.cancelMutex.Unlock()
+ return m.isCanceled
}
-func (s *mergedStream) Advance() bool {
- s.hasValue = false
+// stage stages a key-value pair from either the snapshot or the uncommitted
+// writes.
+func (m *mergedStream) stage() valueSourceType {
+ if m.sHasKey && (!m.pHasKey || bytes.Compare(m.sKey, m.pKey) < 0) {
+ return snapshotStream
+ }
+ if m.sHasKey && bytes.Compare(m.sKey, m.pKey) == 0 {
+ m.advanceS()
+ }
+ switch value := m.p.Value().(type) {
+ case isDeleted:
+ m.advanceP()
+ return notInitialized
+ case []byte:
+ return ptrieStream
+ default:
+ panic(fmt.Sprintf("unexpected type %T of value", value))
+ }
+}
+
+// Advance implements the Stream interface.
+func (m *mergedStream) Advance() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ // Invariant: one of the two streams points to the last staged key-value
+ // pair and the other stream points to the key greater than the last staged
+ // key with the exception if it is the first call of Advance().
+ switch m.valueSource {
+ case snapshotStream:
+ m.advanceS()
+ case ptrieStream:
+ m.advanceP()
+ }
+ m.valueSource = notInitialized
+ // Invariant: both streams point to a key-value pairs with keys greater than
+ // the last staged key.
+ // We need to pick a stream that points to a smaller key. If the picked
+ // stream is the ptrie stream and the key-value pair represents a delete
+ // operation, we skip the key-value pair and pick a key-value pair again.
for true {
- if err := s.stageSnapshotKeyValue(); err != nil {
+ if m.canceled() || (!m.sHasKey && !m.pHasKey) {
return false
}
- if s.snapshotStreamEOF && s.writesArrayEOF() {
- return false
- }
- if s.pickKeyValue() {
- s.hasValue = true
+ if m.valueSource = m.stage(); m.valueSource != notInitialized {
return true
}
}
@@ -120,30 +132,45 @@
}
// Key implements the Stream interface.
-func (s *mergedStream) Key(keybuf []byte) []byte {
- if !s.hasValue {
+func (m *mergedStream) Key(keybuf []byte) []byte {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ switch m.valueSource {
+ case snapshotStream:
+ return store.CopyBytes(keybuf, m.sKey)
+ case ptrieStream:
+ return store.CopyBytes(keybuf, m.pKey)
+ default:
panic("nothing staged")
}
- return store.CopyBytes(keybuf, s.key)
}
// Value implements the Stream interface.
-func (s *mergedStream) Value(valbuf []byte) []byte {
- if !s.hasValue {
+func (m *mergedStream) Value(valbuf []byte) []byte {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ switch m.valueSource {
+ case snapshotStream:
+ // m.s.Value internally copies bytes to valbuf.
+ return m.s.Value(valbuf)
+ case ptrieStream:
+ return store.CopyBytes(valbuf, m.p.Value().([]byte))
+ default:
panic("nothing staged")
}
- return store.CopyBytes(valbuf, s.value)
}
// Err implements the Stream interface.
-func (s *mergedStream) Err() error {
- return s.snapshotStream.Err()
+func (m *mergedStream) Err() error {
+ return m.s.Err()
}
// Cancel implements the Stream interface.
-func (s *mergedStream) Cancel() {
- s.snapshotStream.Cancel()
- s.hasValue = false
- s.snapshotStreamEOF = true
- s.writesCursor = len(s.writesArray)
+func (m *mergedStream) Cancel() {
+ m.cancelMutex.Lock()
+ if !m.isCanceled {
+ m.isCanceled = true
+ m.s.Cancel()
+ }
+ m.cancelMutex.Unlock()
}
diff --git a/services/syncbase/store/transactions/merged_stream_test.go b/services/syncbase/store/transactions/merged_stream_test.go
new file mode 100644
index 0000000..be4a75d
--- /dev/null
+++ b/services/syncbase/store/transactions/merged_stream_test.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions_test
+
+import (
+ "bytes"
+ "testing"
+
+ "v.io/x/ref/services/syncbase/store/memstore"
+)
+
+// TestMergedStream checks correctness of the mergedStream.
+func TestMergedStream(t *testing.T) {
+ // Here is the test structure: we take N keys and run through all possible
+ // states of store with 3 keys and all possible transactions that
+ // involve 3 keys. For store key there are two states: exists and doesn't
+ // exist. For a transaction key there are three states: not in transaction,
+ // put and delete.
+ keys := []string{"a", "b", "c", "d"}
+ N := len(keys)
+ n2, n3 := 1, 1 // 2**N and 3**N
+ for i := 0; i < N; i++ {
+ n2, n3 = n2*2, n3*3
+ }
+ for storeState := 0; storeState < n2; storeState++ {
+ for txState := 0; txState < n3; txState++ {
+ var expected []string
+ st := memstore.New()
+ ss, ts := storeState, txState
+ // Populate the store and the expected result.
+ for i := 0; i < N; i++ {
+ if ss%2 == 1 {
+ st.Put([]byte(keys[i]), []byte(keys[i]))
+ }
+ if (ss%2 == 1 || ts%3 == 1) && (ts%3 != 2) {
+ expected = append(expected, keys[i])
+ }
+ ss /= 2
+ ts /= 3
+ }
+ // Create a transaction.
+ tx := st.NewTransaction()
+ ts = txState
+ for i := 0; i < N; i++ {
+ if ts%3 == 1 {
+ tx.Put([]byte(keys[i]), []byte(keys[i]))
+ } else if ts%3 == 2 {
+ tx.Delete([]byte(keys[i]))
+ }
+ ts /= 3
+ }
+ s := tx.Scan(nil, nil)
+ for i := 0; i < len(expected); i++ {
+ if !s.Advance() {
+ t.Fatal("the stream didn't advance")
+ }
+ if got, want := s.Key(nil), []byte(expected[i]); !bytes.Equal(got, want) {
+ t.Fatalf("unexpected key: got %q, want %q", got, want)
+ }
+ if got, want := s.Value(nil), []byte(expected[i]); !bytes.Equal(got, want) {
+ t.Fatalf("unexpected value: got %q, want %q", got, want)
+ }
+ }
+ if s.Advance() {
+ t.Fatal("the stream advanced")
+ }
+ if err := s.Err(); err != nil {
+ t.Fatalf("unexpected stream error: %v", err)
+ }
+ }
+ }
+}
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 1c81abd..78abc36 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -5,15 +5,18 @@
package transactions
import (
- "bytes"
"container/list"
+ "fmt"
"sync"
"v.io/v23/verror"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/store"
+ "v.io/x/ref/services/syncbase/store/ptrie"
)
+type isDeleted struct{}
+
// transaction is a wrapper on top of a BatchWriter and a store.Snapshot that
// implements the store.Transaction interface.
type transaction struct {
@@ -24,8 +27,13 @@
event *list.Element // pointer to element of mg.events
snapshot store.Snapshot
reads readSet
- writes []WriteOp
- err error
+ // writes holds in-flight mutations of the transaction.
+ // writes holds key-value pairs where the value type can be:
+ // isDeleted: the last modification of the row was Delete;
+ // []byte: the last modification of the row was Put, value holds
+ // the actual value that was put.
+ writes *ptrie.T
+ err error
}
var _ store.Transaction = (*transaction)(nil)
@@ -35,6 +43,7 @@
mg: mg,
snapshot: mg.BatchStore.NewSnapshot(),
seq: mg.seq,
+ writes: ptrie.New(true),
}
tx.event = mg.events.PushFront(tx)
return tx
@@ -56,23 +65,18 @@
if tx.err != nil {
return valbuf, store.ConvertError(tx.err)
}
+ key = store.CopyBytes(nil, key)
tx.reads.Keys = append(tx.reads.Keys, key)
-
- // Reflect the state of the transaction: the "writes" (puts and
- // deletes) override the values in the transaction snapshot.
- // Find the last "writes" entry for this key, if one exists.
- // Note: this step could be optimized by using maps (puts and
- // deletes) instead of an array.
- for i := len(tx.writes) - 1; i >= 0; i-- {
- op := &tx.writes[i]
- if bytes.Equal(op.Key, key) {
- if op.T == PutOp {
- return op.Value, nil
- }
+ if value := tx.writes.Get(key); value != nil {
+ switch bytes := value.(type) {
+ case []byte:
+ return store.CopyBytes(valbuf, bytes), nil
+ case isDeleted:
return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
+ default:
+ panic(fmt.Sprintf("unexpected type %T of value", bytes))
}
}
-
return tx.snapshot.Get(key, valbuf)
}
@@ -83,14 +87,13 @@
if tx.err != nil {
return &store.InvalidStream{Error: tx.err}
}
-
+ start, limit = store.CopyBytes(nil, start), store.CopyBytes(nil, limit)
tx.reads.Ranges = append(tx.reads.Ranges, scanRange{
Start: start,
Limit: limit,
})
-
// Return a stream which merges the snaphot stream with the uncommitted changes.
- return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
+ return mergeWritesWithStream(tx.snapshot, tx.writes.Copy(), start, limit)
}
// Put implements the store.StoreWriter interface.
@@ -100,11 +103,7 @@
if tx.err != nil {
return store.ConvertError(tx.err)
}
- tx.writes = append(tx.writes, WriteOp{
- T: PutOp,
- Key: key,
- Value: value,
- })
+ tx.writes.Put(key, value)
return nil
}
@@ -115,10 +114,7 @@
if tx.err != nil {
return store.ConvertError(tx.err)
}
- tx.writes = append(tx.writes, WriteOp{
- T: DeleteOp,
- Key: key,
- })
+ tx.writes.Put(key, isDeleted{})
return nil
}
@@ -163,10 +159,22 @@
if !tx.validateReadSet() {
return store.NewErrConcurrentTransaction(nil)
}
- if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil {
+ var batch []WriteOp
+ s := tx.writes.Scan(nil, nil)
+ for s.Advance() {
+ switch bytes := s.Value().(type) {
+ case []byte:
+ batch = append(batch, WriteOp{T: PutOp, Key: s.Key(nil), Value: bytes})
+ case isDeleted:
+ batch = append(batch, WriteOp{T: DeleteOp, Key: s.Key(nil)})
+ default:
+ panic(fmt.Sprintf("unexpected type %T of value", bytes))
+ }
+ }
+ if err := tx.mg.BatchStore.WriteBatch(batch...); err != nil {
return err
}
- tx.mg.trackBatch(tx.writes...)
+ tx.mg.trackBatch(batch...)
return nil
}