runtime/internal/rpc: Try to minimize the work that long running dial threads
do. This is mainly to prevent them from using vanadium state after the runtime
has shutdown.
Change-Id: Ia141a28df36c5119937231d9ae60bf0b20b67627
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 3db1610..26b7e91 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,10 @@
vcCache *vc.VCCache
+ wg sync.WaitGroup
+ mu sync.Mutex
+ closed bool
+
dc vc.DischargeClient
}
@@ -93,8 +97,7 @@
c := &client{
streamMgr: streamMgr,
ns: ns,
-
- vcCache: vc.NewVCCache(),
+ vcCache: vc.NewVCCache(),
}
ipNets, err := ipNetworks()
if err != nil {
@@ -346,6 +349,7 @@
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+ defer c.wg.Done()
status := &serverStatus{index: index, server: server}
var span vtrace.Span
ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
@@ -354,7 +358,6 @@
ch <- status
span.Finish()
}()
-
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{
Name: suberrName(server, name, method),
@@ -485,6 +488,14 @@
// other while manipulating their copy of the options.
vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
copy(vcOptsCopy, vcOpts)
+ c.mu.Lock()
+ if c.closed {
+ c.mu.Unlock()
+ return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+ }
+ c.wg.Add(1)
+ c.mu.Unlock()
+
go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
}
@@ -734,9 +745,13 @@
func (c *client) Close() {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ c.mu.Lock()
+ c.closed = true
+ c.mu.Unlock()
for _, v := range c.vcCache.Close() {
c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
}
+ c.wg.Wait()
}
// flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/runtime/internal/rpc/stream/manager/manager.go b/runtime/internal/rpc/stream/manager/manager.go
index e5aa37d..1b63a9f 100644
--- a/runtime/internal/rpc/stream/manager/manager.go
+++ b/runtime/internal/rpc/stream/manager/manager.go
@@ -113,6 +113,11 @@
return "", "", verror.New(stream.ErrResolveFailed, nil, verror.New(errUnknownNetwork, nil, network))
}
+type dialResult struct {
+ conn net.Conn
+ err error
+}
+
// FindOrDialVIF returns the network connection (VIF) to the provided address
// from the cache in the manager. If not already present in the cache, a new
// connection will be created using net.Dial.
@@ -146,7 +151,20 @@
defer unblock()
ctx.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
- conn, err := dial(d, network, address, timeout)
+
+ ch := make(chan *dialResult)
+ go func() {
+ conn, err := dial(d, network, address, timeout)
+ ch <- &dialResult{conn, err}
+ }()
+
+ var conn net.Conn
+ select {
+ case result := <-ch:
+ conn, err = result.conn, result.err
+ case <-ctx.Done():
+ return nil, verror.New(stream.ErrDialFailed, ctx, err)
+ }
if err != nil {
return nil, err
}