runtime/internal/rpc: Try to minimize the work that long running dial threads
do.  This is mainly to prevent them from using vanadium state after the runtime
has shutdown.

Change-Id: Ia141a28df36c5119937231d9ae60bf0b20b67627
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 3db1610..26b7e91 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,10 @@
 
 	vcCache *vc.VCCache
 
+	wg     sync.WaitGroup
+	mu     sync.Mutex
+	closed bool
+
 	dc vc.DischargeClient
 }
 
@@ -93,8 +97,7 @@
 	c := &client{
 		streamMgr: streamMgr,
 		ns:        ns,
-
-		vcCache: vc.NewVCCache(),
+		vcCache:   vc.NewVCCache(),
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
@@ -346,6 +349,7 @@
 // flow itself.
 // TODO(cnicolaou): implement real, configurable load balancing.
 func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+	defer c.wg.Done()
 	status := &serverStatus{index: index, server: server}
 	var span vtrace.Span
 	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
@@ -354,7 +358,6 @@
 		ch <- status
 		span.Finish()
 	}()
-
 	suberr := func(err error) *verror.SubErr {
 		return &verror.SubErr{
 			Name:    suberrName(server, name, method),
@@ -485,6 +488,14 @@
 		// other while manipulating their copy of the options.
 		vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
 		copy(vcOptsCopy, vcOpts)
+		c.mu.Lock()
+		if c.closed {
+			c.mu.Unlock()
+			return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+		}
+		c.wg.Add(1)
+		c.mu.Unlock()
+
 		go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
 	}
 
@@ -734,9 +745,13 @@
 
 func (c *client) Close() {
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	c.mu.Lock()
+	c.closed = true
+	c.mu.Unlock()
 	for _, v := range c.vcCache.Close() {
 		c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
 	}
+	c.wg.Wait()
 }
 
 // flowClient implements the RPC client-side protocol for a single RPC, over a
diff --git a/runtime/internal/rpc/stream/manager/manager.go b/runtime/internal/rpc/stream/manager/manager.go
index e5aa37d..1b63a9f 100644
--- a/runtime/internal/rpc/stream/manager/manager.go
+++ b/runtime/internal/rpc/stream/manager/manager.go
@@ -113,6 +113,11 @@
 	return "", "", verror.New(stream.ErrResolveFailed, nil, verror.New(errUnknownNetwork, nil, network))
 }
 
+type dialResult struct {
+	conn net.Conn
+	err  error
+}
+
 // FindOrDialVIF returns the network connection (VIF) to the provided address
 // from the cache in the manager. If not already present in the cache, a new
 // connection will be created using net.Dial.
@@ -146,7 +151,20 @@
 	defer unblock()
 
 	ctx.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
-	conn, err := dial(d, network, address, timeout)
+
+	ch := make(chan *dialResult)
+	go func() {
+		conn, err := dial(d, network, address, timeout)
+		ch <- &dialResult{conn, err}
+	}()
+
+	var conn net.Conn
+	select {
+	case result := <-ch:
+		conn, err = result.conn, result.err
+	case <-ctx.Done():
+		return nil, verror.New(stream.ErrDialFailed, ctx, err)
+	}
 	if err != nil {
 		return nil, err
 	}