ref/runtime/internal/flow/conn: Fix bug when receiving flows.
Before this fix when we received a new flow we would send an open
message when writing back.
Change-Id: I897cfb39fe393e17251f6d9036bcf15090729556
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index f9ac79c..28f584d 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -228,7 +228,7 @@
return NewErrUnexpectedMsg(ctx, "openFlow")
}
c.mu.Lock()
- f := c.newFlowLocked(ctx, msg.id, msg.bkey, msg.dkey, false, false)
+ f := c.newFlowLocked(ctx, msg.id, msg.bkey, msg.dkey, false, true)
c.mu.Unlock()
c.handler.HandleFlow(f)
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 6851c79..b5b6eeb 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -8,10 +8,12 @@
"bytes"
"crypto/rand"
"io"
+ "sync"
"testing"
"time"
"v.io/v23"
+ "v.io/v23/flow"
_ "v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
@@ -29,6 +31,40 @@
}
}
+func trunc(b []byte) []byte {
+ if len(b) > 100 {
+ return b[:100]
+ }
+ return b
+}
+
+func doWrite(t *testing.T, f flow.Flow, data []byte) {
+ mid := len(data) / 2
+ wrote, err := f.WriteMsg(data[:mid], data[mid:])
+ if err != nil || wrote != len(data) {
+ t.Errorf("Unexpected result for write: %d, %v wanted %d, nil", wrote, err, len(data))
+ }
+}
+
+func doRead(t *testing.T, f flow.Flow, want []byte, wg *sync.WaitGroup) {
+ for read := 0; len(want) > 0; read++ {
+ got, err := f.ReadMsg()
+ if err != nil && err != io.EOF {
+ t.Errorf("Unexpected error: %v", err)
+ break
+ }
+ if !bytes.Equal(got, want[:len(got)]) {
+ t.Errorf("On read %d got: %v want %v", read, trunc(got), trunc(want))
+ break
+ }
+ want = want[len(got):]
+ }
+ if len(want) != 0 {
+ t.Errorf("got %d leftover bytes, expected 0.", len(want))
+ }
+ wg.Done()
+}
+
func TestLargeWrite(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
@@ -37,42 +73,12 @@
defer cl()
defer shutdown()
- want := randData
- finished := make(chan struct{})
- go func(x []byte) {
- mid := len(x) / 2
- wrote, err := df.WriteMsgAndClose(x[:mid], x[mid:])
- if err != nil {
- t.Fatalf("Unexpected error for write: %v", err)
- }
- if wrote != len(x) {
- t.Errorf("got %d want %d", wrote, len(x))
- }
- close(finished)
- }(want)
-
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go doWrite(t, df, randData)
+ go doRead(t, df, randData, &wg)
af := <-flows
- read := 0
- for len(want) > 0 {
- got, err := af.ReadMsg()
- if err != nil && err != io.EOF {
- t.Fatalf("Unexpected error: %v", err)
- }
- if !bytes.Equal(got, want[:len(got)]) {
- pl := len(got)
- if pl > 100 {
- pl = 100
- }
- pg, pw := got[:pl], want[:pl]
- t.Fatalf("On read %d got: %v want %v", read, pg, pw)
- }
- want = want[len(got):]
- read++
- }
- if len(want) != 0 {
- t.Errorf("got %d leftover bytes, expected 0.", len(want))
- }
- <-finished
- <-df.Closed()
- <-af.Closed()
+ go doRead(t, af, randData, &wg)
+ go doWrite(t, af, randData)
+ wg.Wait()
}