runtime/internal/flow/manager: Enable bidirectional rpc through
ListeningEndpoints.
If the flow manager is not listening a default endpiont with just
the RoutingID will be returned from ListeningEndpoints for use
in bidirectional RPC.
MultiPart: 1/2
Change-Id: If52cd7e6b61089b0557df9de61c70de23c98ff3c
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d067aa1..c2eb5a9 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -204,12 +204,17 @@
// ListeningEndpoints returns the endpoints that the Manager has explicitly
// listened on. The Manager will accept new flows on these endpoints.
-// Returned endpoints all have a RoutingID unique to the Acceptor.
+// If the Manager is not listening on any endpoints, an endpoint with the
+// Manager's RoutingID will be returned for use in bidirectional RPC.
+// Returned endpoints all have the Manager's unique RoutingID.
func (m *manager) ListeningEndpoints() []naming.Endpoint {
m.mu.Lock()
ret := make([]naming.Endpoint, len(m.listenEndpoints))
copy(ret, m.listenEndpoints)
m.mu.Unlock()
+ if len(ret) == 0 {
+ ret = append(ret, &inaming.Endpoint{RID: m.rid})
+ }
return ret
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 7af0053..41c470f 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -29,33 +29,12 @@
rid := naming.FixedRoutingID(0x5555)
m := New(ctx, rid)
- want := "read this please"
if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- eps := m.ListeningEndpoints()
- if len(eps) == 0 {
- t.Fatalf("no endpoints listened on")
- }
- flow, err := m.Dial(ctx, eps[0], flowtest.BlessingsForPeer)
- if err != nil {
- t.Error(err)
- }
- writeLine(flow, want)
-
- flow, err = m.Accept(ctx)
- if err != nil {
- t.Fatal(err)
- }
- got, err := readLine(flow)
- if err != nil {
- t.Error(err)
- }
- if got != want {
- t.Errorf("got %v, want %v", got, want)
- }
+ testFlows(t, ctx, m, m, flowtest.BlessingsForPeer)
}
func TestDialCachedConn(t *testing.T) {
@@ -67,40 +46,79 @@
t.Fatal(err)
}
- eps := am.ListeningEndpoints()
- if len(eps) == 0 {
- t.Fatalf("no endpoints listened on")
- }
dm := New(ctx, naming.FixedRoutingID(0x1111))
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
- dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
- dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
}
-func dialAndAccept(t *testing.T, ctx *context.T, dm, am flow.Manager, ep naming.Endpoint, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
+func TestBidirectionalListeningEndpoint(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ am := New(ctx, naming.FixedRoutingID(0x5555))
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+ eps := am.ListeningEndpoints()
+ if len(eps) == 0 {
+ t.Fatalf("no endpoints listened on")
+ }
+ dm := New(ctx, naming.FixedRoutingID(0x1111))
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ // Now am should be able to make a flow to dm even though dm is not listening.
+ testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
+}
+
+func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
+ eps := am.ListeningEndpoints()
+ if len(eps) == 0 {
+ t.Fatalf("no endpoints listened on")
+ }
+ ep := eps[0]
var err error
df, err = dm.Dial(ctx, ep, bFn)
if err != nil {
t.Fatal(err)
}
- // Write a line to ensure that the openFlow message is sent.
- writeLine(df, "")
+ want := "do you read me?"
+ writeLine(df, want)
af, err = am.Accept(ctx)
if err != nil {
t.Fatal(err)
}
+
+ got, err := readLine(af)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+
+ want = "i read you"
+ if err := writeLine(af, want); err != nil {
+ t.Error(err)
+ }
+ got, err = readLine(df)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
return
}