ref/runtime/internal/flow/conn: Fix bug when receiving flows.

Before this fix when we received a new flow we would send an open
message when writing back.

Change-Id: I897cfb39fe393e17251f6d9036bcf15090729556
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index f9ac79c..28f584d 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -228,7 +228,7 @@
 			return NewErrUnexpectedMsg(ctx, "openFlow")
 		}
 		c.mu.Lock()
-		f := c.newFlowLocked(ctx, msg.id, msg.bkey, msg.dkey, false, false)
+		f := c.newFlowLocked(ctx, msg.id, msg.bkey, msg.dkey, false, true)
 		c.mu.Unlock()
 		c.handler.HandleFlow(f)
 
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 6851c79..b5b6eeb 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -8,10 +8,12 @@
 	"bytes"
 	"crypto/rand"
 	"io"
+	"sync"
 	"testing"
 	"time"
 
 	"v.io/v23"
+	"v.io/v23/flow"
 	_ "v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/test"
 	"v.io/x/ref/test/goroutines"
@@ -29,6 +31,40 @@
 	}
 }
 
+func trunc(b []byte) []byte {
+	if len(b) > 100 {
+		return b[:100]
+	}
+	return b
+}
+
+func doWrite(t *testing.T, f flow.Flow, data []byte) {
+	mid := len(data) / 2
+	wrote, err := f.WriteMsg(data[:mid], data[mid:])
+	if err != nil || wrote != len(data) {
+		t.Errorf("Unexpected result for write: %d, %v wanted %d, nil", wrote, err, len(data))
+	}
+}
+
+func doRead(t *testing.T, f flow.Flow, want []byte, wg *sync.WaitGroup) {
+	for read := 0; len(want) > 0; read++ {
+		got, err := f.ReadMsg()
+		if err != nil && err != io.EOF {
+			t.Errorf("Unexpected error: %v", err)
+			break
+		}
+		if !bytes.Equal(got, want[:len(got)]) {
+			t.Errorf("On read %d got: %v want %v", read, trunc(got), trunc(want))
+			break
+		}
+		want = want[len(got):]
+	}
+	if len(want) != 0 {
+		t.Errorf("got %d leftover bytes, expected 0.", len(want))
+	}
+	wg.Done()
+}
+
 func TestLargeWrite(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 
@@ -37,42 +73,12 @@
 	defer cl()
 	defer shutdown()
 
-	want := randData
-	finished := make(chan struct{})
-	go func(x []byte) {
-		mid := len(x) / 2
-		wrote, err := df.WriteMsgAndClose(x[:mid], x[mid:])
-		if err != nil {
-			t.Fatalf("Unexpected error for write: %v", err)
-		}
-		if wrote != len(x) {
-			t.Errorf("got %d want %d", wrote, len(x))
-		}
-		close(finished)
-	}(want)
-
+	var wg sync.WaitGroup
+	wg.Add(2)
+	go doWrite(t, df, randData)
+	go doRead(t, df, randData, &wg)
 	af := <-flows
-	read := 0
-	for len(want) > 0 {
-		got, err := af.ReadMsg()
-		if err != nil && err != io.EOF {
-			t.Fatalf("Unexpected error: %v", err)
-		}
-		if !bytes.Equal(got, want[:len(got)]) {
-			pl := len(got)
-			if pl > 100 {
-				pl = 100
-			}
-			pg, pw := got[:pl], want[:pl]
-			t.Fatalf("On read %d got: %v want %v", read, pg, pw)
-		}
-		want = want[len(got):]
-		read++
-	}
-	if len(want) != 0 {
-		t.Errorf("got %d leftover bytes, expected 0.", len(want))
-	}
-	<-finished
-	<-df.Closed()
-	<-af.Closed()
+	go doRead(t, af, randData, &wg)
+	go doWrite(t, af, randData)
+	wg.Wait()
 }