flow/manager: Fix bidirectional rpc.
The issue is that we rely on caching to be completed before making a
bidirectional rpc so we need to ensure that a flow is returns only after
caching the underlying conn is complete.
Change-Id: Ia5fa1799ce69d04a1d9da83bda438d11dbb87c7b
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d6d7ca1..0254e9c 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -204,29 +204,37 @@
ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
return
}
+ cached := make(chan struct{})
c, err := conn.NewAccepted(
ctx,
flowConn,
local,
version.Supported,
- &flowHandler{q: m.q},
+ &flowHandler{q: m.q, cached: cached},
)
if err != nil {
+ close(cached)
flowConn.Close()
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
if err := m.cache.InsertWithRoutingID(c); err != nil {
- ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
+ close(cached)
+ ctx.Errorf("failed to cache conn %v: %v", c, err)
}
+ close(cached)
}
}
type flowHandler struct {
- q *upcqueue.T
+ q *upcqueue.T
+ cached chan struct{}
}
func (h *flowHandler) HandleFlow(f flow.Flow) error {
+ if h.cached != nil {
+ <-h.cached
+ }
return h.q.Put(f)
}