runtime/internal/flow/manager: Enable bidirectional rpc through
ListeningEndpoints.

If the flow manager is not listening a default endpiont with just
the RoutingID will be returned from ListeningEndpoints for use
in bidirectional RPC.

MultiPart: 1/2
Change-Id: If52cd7e6b61089b0557df9de61c70de23c98ff3c
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d067aa1..c2eb5a9 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -204,12 +204,17 @@
 
 // ListeningEndpoints returns the endpoints that the Manager has explicitly
 // listened on. The Manager will accept new flows on these endpoints.
-// Returned endpoints all have a RoutingID unique to the Acceptor.
+// If the Manager is not listening on any endpoints, an endpoint with the
+// Manager's RoutingID will be returned for use in bidirectional RPC.
+// Returned endpoints all have the Manager's unique RoutingID.
 func (m *manager) ListeningEndpoints() []naming.Endpoint {
 	m.mu.Lock()
 	ret := make([]naming.Endpoint, len(m.listenEndpoints))
 	copy(ret, m.listenEndpoints)
 	m.mu.Unlock()
+	if len(ret) == 0 {
+		ret = append(ret, &inaming.Endpoint{RID: m.rid})
+	}
 	return ret
 }
 
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 7af0053..41c470f 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -29,33 +29,12 @@
 
 	rid := naming.FixedRoutingID(0x5555)
 	m := New(ctx, rid)
-	want := "read this please"
 
 	if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
-	eps := m.ListeningEndpoints()
-	if len(eps) == 0 {
-		t.Fatalf("no endpoints listened on")
-	}
-	flow, err := m.Dial(ctx, eps[0], flowtest.BlessingsForPeer)
-	if err != nil {
-		t.Error(err)
-	}
-	writeLine(flow, want)
-
-	flow, err = m.Accept(ctx)
-	if err != nil {
-		t.Fatal(err)
-	}
-	got, err := readLine(flow)
-	if err != nil {
-		t.Error(err)
-	}
-	if got != want {
-		t.Errorf("got %v, want %v", got, want)
-	}
+	testFlows(t, ctx, m, m, flowtest.BlessingsForPeer)
 }
 
 func TestDialCachedConn(t *testing.T) {
@@ -67,40 +46,79 @@
 		t.Fatal(err)
 	}
 
-	eps := am.ListeningEndpoints()
-	if len(eps) == 0 {
-		t.Fatalf("no endpoints listened on")
-	}
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
 	// At first the cache should be empty.
 	if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing a connection the cache should hold one connection.
-	dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing another connection the cache should still hold one connection
 	// because the connections should be reused.
-	dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 }
 
-func dialAndAccept(t *testing.T, ctx *context.T, dm, am flow.Manager, ep naming.Endpoint, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
+func TestBidirectionalListeningEndpoint(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	am := New(ctx, naming.FixedRoutingID(0x5555))
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+	eps := am.ListeningEndpoints()
+	if len(eps) == 0 {
+		t.Fatalf("no endpoints listened on")
+	}
+	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	// Now am should be able to make a flow to dm even though dm is not listening.
+	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
+}
+
+func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
+	eps := am.ListeningEndpoints()
+	if len(eps) == 0 {
+		t.Fatalf("no endpoints listened on")
+	}
+	ep := eps[0]
 	var err error
 	df, err = dm.Dial(ctx, ep, bFn)
 	if err != nil {
 		t.Fatal(err)
 	}
-	// Write a line to ensure that the openFlow message is sent.
-	writeLine(df, "")
+	want := "do you read me?"
+	writeLine(df, want)
 	af, err = am.Accept(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
+
+	got, err := readLine(af)
+	if err != nil {
+		t.Error(err)
+	}
+	if got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+
+	want = "i read you"
+	if err := writeLine(af, want); err != nil {
+		t.Error(err)
+	}
+	got, err = readLine(df)
+	if err != nil {
+		t.Error(err)
+	}
+	if got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
 	return
 }