ref: Change "profiles" directory to "runtime"

As per vanadium/issues#470

MultiPart: 4/10

Change-Id: I3ac47c1d9c514f7bbe1c80507c2b3db7fcd9f6d4
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
new file mode 100644
index 0000000..ff1ee51
--- /dev/null
+++ b/runtime/internal/rpc/client.go
@@ -0,0 +1,1055 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"fmt"
+	"io"
+	"math/rand"
+	"net"
+	"reflect"
+	"sync"
+	"time"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/namespace"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/vdl"
+	vtime "v.io/v23/vdlroot/time"
+	"v.io/v23/verror"
+	"v.io/v23/vom"
+	"v.io/v23/vtrace"
+
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/vc"
+)
+
+const pkgPath = "v.io/x/ref/runtime/internal/rpc"
+
+func reg(id, msg string) verror.IDAction {
+	// Note: the error action is never used and is instead computed
+	// at a higher level. The errors here are purely for informational
+	// purposes.
+	return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
+}
+
+var (
+	// These errors are intended to be used as arguments to higher
+	// level errors and hence {1}{2} is omitted from their format
+	// strings to avoid repeating these n-times in the final error
+	// message visible to the user.
+	errClientCloseAlreadyCalled  = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called")
+	errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called")
+	errNonRootedName             = reg(".errNonRootedName", "{3} does not appear to contain an address")
+	errInvalidEndpoint           = reg(".errInvalidEndpoint", "failed to parse endpoint")
+	errIncompatibleEndpoint      = reg(".errIncompatibleEndpoint", "incompatible endpoint")
+	errRequestEncoding           = reg(".errRequestEncoding", "failed to encode request {3}{:4}")
+	errDischargeEncoding         = reg(".errDischargeEncoding", "failed to encode discharges {:3}")
+	errBlessingEncoding          = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}")
+	errArgEncoding               = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}")
+	errMismatchedResults         = reg(".errMismatchedResults", "got {3} results, but want {4}")
+	errResultDecoding            = reg(".errResultDecoding", "failed to decode result #{3}{:4}")
+	errResponseDecoding          = reg(".errResponseDecoding", "failed to decode response{:3}")
+	errRemainingStreamResults    = reg(".errRemaingStreamResults", "stream closed with remaining stream results")
+	errNoBlessingsForPeer        = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}")
+	errBlessingGrant             = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}")
+	errBlessingAdd               = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}")
+	errServerAuthorizeFailed     = reg(".errServerAuthorizedFailed", "failed to authorize flow with remote blessings{:3} {:4}")
+
+	errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}")
+
+	errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}")
+	errNoPrincipal      = reg(".errNoPrincipal", "principal required for secure connections")
+)
+
+type client struct {
+	streamMgr          stream.Manager
+	ns                 namespace.T
+	vcOpts             []stream.VCOpt // vc opts passed to dial
+	preferredProtocols []string
+
+	// We cache the IP networks on the device since it is not that cheap to read
+	// network interfaces through os syscall.
+	// TODO(jhahn): Add monitoring the network interface changes.
+	ipNets []*net.IPNet
+
+	vcCache *vc.VCCache
+
+	dc vc.DischargeClient
+}
+
+var _ rpc.Client = (*client)(nil)
+
+func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+	c := &client{
+		streamMgr: streamMgr,
+		ns:        ns,
+		ipNets:    ipNetworks(),
+		vcCache:   vc.NewVCCache(),
+	}
+	c.dc = InternalNewDischargeClient(nil, c, 0)
+	for _, opt := range opts {
+		// Collect all client opts that are also vc opts.
+		switch v := opt.(type) {
+		case stream.VCOpt:
+			c.vcOpts = append(c.vcOpts, v)
+		case PreferredProtocols:
+			c.preferredProtocols = v
+		}
+	}
+
+	return c, nil
+}
+
+func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
+	suberr := func(err error) *verror.SubErr {
+		return &verror.SubErr{Err: err, Options: verror.Print}
+	}
+
+	found, err := c.vcCache.ReservedFind(ep, principal)
+	if err != nil {
+		return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+	}
+	defer c.vcCache.Unreserve(ep, principal)
+	if found != nil {
+		// We are serializing the creation of all flows per VC. This is okay
+		// because if one flow creation is to block, it is likely that all others
+		// for that VC would block as well.
+		if flow, err := found.Connect(); err == nil {
+			return flow, nil
+		}
+		// If the vc fails to establish a new flow, we assume it's
+		// broken, remove it from the cache, and proceed to establishing
+		// a new vc.
+		//
+		// TODO(suharshs): The decision to redial 1 time when the dialing the vc
+		// in the cache fails is a bit inconsistent with the behavior when a newly
+		// dialed vc.Connect fails. We should revisit this.
+		//
+		// TODO(caprita): Should we distinguish errors due to vc being
+		// closed from other errors?  If not, should we call vc.Close()
+		// before removing the vc from the cache?
+		if err := c.vcCache.Delete(found); err != nil {
+			return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+		}
+	}
+
+	sm := c.streamMgr
+	v, err := sm.Dial(ep, principal, vcOpts...)
+	if err != nil {
+		return nil, suberr(err)
+	}
+
+	flow, err := v.Connect()
+	if err != nil {
+		return nil, suberr(err)
+	}
+
+	if err := c.vcCache.Insert(v.(*vc.VC)); err != nil {
+		sm.ShutdownEndpoint(ep)
+		return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
+	}
+
+	return flow, nil
+}
+
+// A randomized exponential backoff. The randomness deters error convoys
+// from forming.  The first time you retry n should be 0, then 1 etc.
+func backoff(n uint, deadline time.Time) bool {
+	// This is ((100 to 200) * 2^n) ms.
+	b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
+	if b > maxBackoff {
+		b = maxBackoff
+	}
+	r := deadline.Sub(time.Now())
+	if b > r {
+		// We need to leave a little time for the call to start or
+		// we'll just timeout in startCall before we actually do
+		// anything.  If we just have a millisecond left, give up.
+		if r <= time.Millisecond {
+			return false
+		}
+		b = r - time.Millisecond
+	}
+	time.Sleep(b)
+	return true
+}
+
+func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
+	defer vlog.LogCallf("ctx=,name=%.10s...,method=%.10s...,args=,opts...=%v", name, method, opts)("") // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	return c.startCall(ctx, name, method, args, opts)
+}
+
+func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
+	defer vlog.LogCallf("ctx=,name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)("") // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	deadline := getDeadline(ctx, opts)
+
+	var lastErr error
+	for retries := uint(0); ; retries++ {
+		call, err := c.startCall(ctx, name, method, inArgs, opts)
+		if err != nil {
+			return err
+		}
+		err = call.Finish(outArgs...)
+		if err == nil {
+			return nil
+		}
+		lastErr = err
+		// We only retry if RetryBackoff is returned by the application because other
+		// RetryConnection and RetryRefetch required actions by the client before
+		// retrying.
+		if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
+			vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
+			break
+		}
+		if !backoff(retries, deadline) {
+			break
+		}
+		vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
+	}
+	return lastErr
+}
+
+func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
+	// Context specified deadline.
+	deadline, hasDeadline := ctx.Deadline()
+	if !hasDeadline {
+		// Default deadline.
+		deadline = time.Now().Add(defaultCallTimeout)
+	}
+	if r, ok := getRetryTimeoutOpt(opts); ok {
+		// Caller specified deadline.
+		deadline = time.Now().Add(r)
+	}
+	return deadline
+}
+
+func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
+	switch {
+	case noRetry(opts):
+		return false
+	case action != verror.RetryBackoff:
+		return false
+	case time.Now().After(deadline):
+		return false
+	}
+	return true
+}
+
+func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
+	switch {
+	case noRetry(opts):
+		return false
+	case action != verror.RetryConnection && action != verror.RetryRefetch:
+		return false
+	case time.Now().After(deadline):
+		return false
+	case requireResolve && getNoNamespaceOpt(opts):
+		// If we're skipping resolution and there are no servers for
+		// this call retrying is not going to help, we can't come up
+		// with new servers if there is no resolution.
+		return false
+	}
+	return true
+}
+
+func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) {
+	var impetus security.DischargeImpetus
+	if len(serverBlessings) > 0 {
+		impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
+		for i, b := range serverBlessings {
+			impetus.Server[i] = security.BlessingPattern(b)
+		}
+	}
+	impetus.Method = method
+	if len(args) > 0 {
+		impetus.Arguments = make([]*vdl.Value, len(args))
+		for i, a := range args {
+			vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a))
+			if err != nil {
+				return security.DischargeImpetus{}, err
+			}
+			impetus.Arguments[i] = vArg
+		}
+	}
+	return impetus, nil
+}
+
+// startCall ensures StartCall always returns verror.E.
+func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
+	if !ctx.Initialized() {
+		return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
+	}
+	ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
+	if err := canCreateServerAuthorizer(ctx, opts); err != nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, err)
+	}
+
+	deadline := getDeadline(ctx, opts)
+
+	var lastErr error
+	for retries := uint(0); ; retries++ {
+		call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
+		if err == nil {
+			return call, nil
+		}
+		lastErr = err
+		if !shouldRetry(action, requireResolve, deadline, opts) {
+			span.Annotatef("Cannot retry after error: %s", err)
+			break
+		}
+		if !backoff(retries, deadline) {
+			break
+		}
+		span.Annotatef("Retrying due to error: %s", err)
+	}
+	return nil, lastErr
+}
+
+type serverStatus struct {
+	index             int
+	server, suffix    string
+	flow              stream.Flow
+	blessings         []string                    // authorized server blessings
+	rejectedBlessings []security.RejectedBlessing // rejected server blessings
+	serverErr         *verror.SubErr
+}
+
+func suberrName(server, name, method string) string {
+	// In the case the client directly dialed an endpoint we want to avoid printing
+	// the endpoint twice.
+	if server == name {
+		return fmt.Sprintf("%s.%s", server, method)
+	}
+	return fmt.Sprintf("%s:%s.%s", server, name, method)
+}
+
+// tryCreateFlow attempts to establish a Flow to "server" (which must be a
+// rooted name), over which a method invocation request could be sent.
+//
+// The server at the remote end of the flow is authorized using the provided
+// authorizer, both during creation of the VC underlying the flow and the
+// flow itself.
+// TODO(cnicolaou): implement real, configurable load balancing.
+func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+	status := &serverStatus{index: index, server: server}
+	var span vtrace.Span
+	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
+	span.Annotatef("address:%v", server)
+	defer func() {
+		ch <- status
+		span.Finish()
+	}()
+
+	suberr := func(err error) *verror.SubErr {
+		return &verror.SubErr{
+			Name:    suberrName(server, name, method),
+			Err:     err,
+			Options: verror.Print,
+		}
+	}
+
+	address, suffix := naming.SplitAddressName(server)
+	if len(address) == 0 {
+		status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
+		return
+	}
+	status.suffix = suffix
+
+	ep, err := inaming.NewEndpoint(address)
+	if err != nil {
+		status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
+		return
+	}
+	if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
+		status.serverErr.Name = suberrName(server, name, method)
+		vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
+		return
+	}
+
+	// Authorize the remote end of the flow using the provided authorizer.
+	if status.flow.LocalPrincipal() == nil {
+		// LocalPrincipal is nil which means we are operating under
+		// SecurityNone.
+		return
+	}
+
+	seccall := security.NewCall(&security.CallParams{
+		LocalPrincipal:   status.flow.LocalPrincipal(),
+		LocalBlessings:   status.flow.LocalBlessings(),
+		RemoteBlessings:  status.flow.RemoteBlessings(),
+		LocalEndpoint:    status.flow.LocalEndpoint(),
+		RemoteEndpoint:   status.flow.RemoteEndpoint(),
+		RemoteDischarges: status.flow.RemoteDischarges(),
+		Method:           method,
+		Suffix:           status.suffix,
+	})
+	if err := auth.Authorize(ctx, seccall); err != nil {
+		// We will test for errServerAuthorizeFailed in failedTryCall and report
+		// verror.ErrNotTrusted
+		status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
+		vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
+		status.flow.Close()
+		status.flow = nil
+		return
+	}
+	status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
+	return
+}
+
+// tryCall makes a single attempt at a call. It may connect to multiple servers
+// (all that serve "name"), but will invoke the method on at most one of them
+// (the server running on the most preferred protcol and network amongst all
+// the servers that were successfully connected to and authorized).
+// if requireResolve is true on return, then we shouldn't bother retrying unless
+// you can re-resolve.
+func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
+	var resolved *naming.MountEntry
+	var blessingPattern security.BlessingPattern
+	blessingPattern, name = security.SplitPatternName(name)
+	if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
+		// We always return NoServers as the error so that the caller knows
+		// that's ok to retry the operation since the name may be registered
+		// in the near future.
+		switch {
+		case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
+			return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
+		case verror.ErrorID(err) == verror.ErrNoServers.ID:
+			// Avoid wrapping errors unnecessarily.
+			return nil, verror.NoRetry, false, err
+		default:
+			return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
+		}
+	} else {
+		if len(resolved.Servers) == 0 {
+			// This should never happen.
+			return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
+		}
+		// An empty set of protocols means all protocols...
+		if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+			return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
+		}
+	}
+
+	// We need to ensure calls to v23 factory methods do not occur during runtime
+	// initialization. Currently, the agent, which uses SecurityNone, is the only caller
+	// during runtime initialization. We would like to set the principal in the context
+	// to nil if we are running in SecurityNone, but this always results in a panic since
+	// the agent client would trigger the call v23.WithPrincipal during runtime
+	// initialization. So, we gate the call to v23.GetPrincipal instead since the agent
+	// client will have callEncrypted == false.
+	// Potential solutions to this are:
+	// (1) Create a separate client for the agent so that this code doesn't have to
+	//     account for its use during runtime initialization.
+	// (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
+	//     on here.
+	var principal security.Principal
+	if callEncrypted(opts) {
+		if principal = v23.GetPrincipal(ctx); principal == nil {
+			return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
+		}
+	}
+
+	// servers is now ordered by the priority heurestic implemented in
+	// filterAndOrderServers.
+	//
+	// Try to connect to all servers in parallel.  Provide sufficient
+	// buffering for all of the connections to finish instantaneously. This
+	// is important because we want to process the responses in priority
+	// order; that order is indicated by the order of entries in servers.
+	// So, if two respones come in at the same 'instant', we prefer the
+	// first in the resolved.Servers)
+	attempts := len(resolved.Servers)
+
+	responses := make([]*serverStatus, attempts)
+	ch := make(chan *serverStatus, attempts)
+	vcOpts := append(getVCOpts(opts), c.vcOpts...)
+	authorizer := newServerAuthorizer(blessingPattern, opts...)
+	for i, server := range resolved.Names() {
+		// Create a copy of vcOpts for each call to tryCreateFlow
+		// to avoid concurrent tryCreateFlows from stepping on each
+		// other while manipulating their copy of the options.
+		vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
+		copy(vcOptsCopy, vcOpts)
+		go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
+	}
+
+	var timeoutChan <-chan time.Time
+	if deadline, ok := ctx.Deadline(); ok {
+		timeoutChan = time.After(deadline.Sub(time.Now()))
+	}
+
+	for {
+		// Block for at least one new response from the server, or the timeout.
+		select {
+		case r := <-ch:
+			responses[r.index] = r
+			// Read as many more responses as we can without blocking.
+		LoopNonBlocking:
+			for {
+				select {
+				default:
+					break LoopNonBlocking
+				case r := <-ch:
+					responses[r.index] = r
+				}
+			}
+		case <-timeoutChan:
+			vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
+			_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
+			if verror.ErrorID(err) != verror.ErrTimeout.ID {
+				return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
+			}
+			return nil, verror.NoRetry, false, err
+		}
+
+		dc := c.dc
+		if shouldNotFetchDischarges(opts) {
+			dc = nil
+		}
+		// Process new responses, in priority order.
+		numResponses := 0
+		for _, r := range responses {
+			if r != nil {
+				numResponses++
+			}
+			if r == nil || r.flow == nil {
+				continue
+			}
+
+			doneChan := ctx.Done()
+			r.flow.SetDeadline(doneChan)
+			fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
+			if err != nil {
+				return nil, verror.NoRetry, false, err
+			}
+
+			if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
+				r.serverErr = &verror.SubErr{
+					Name:    suberrName(r.server, name, method),
+					Options: verror.Print,
+					Err:     verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
+				}
+				vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
+				r.flow.Close()
+				r.flow = nil
+				continue
+			}
+
+			// This is the 'point of no return'; once the RPC is started (fc.start
+			// below) we can't be sure if it makes it to the server or not so, this
+			// code will never call fc.start more than once to ensure that we provide
+			// 'at-most-once' rpc semantics at this level. Retrying the network
+			// connections (i.e. creating flows) is fine since we can cleanup that
+			// state if we abort a call (i.e. close the flow).
+			//
+			// We must ensure that all flows other than r.flow are closed.
+			//
+			// TODO(cnicolaou): all errors below are marked as NoRetry
+			// because we want to provide at-most-once rpc semantics so
+			// we only ever attempt an RPC once. In the future, we'll cache
+			// responses on the server and then we can retry in-flight
+			// RPCs.
+			go cleanupTryCall(r, responses, ch)
+
+			if doneChan != nil {
+				go func() {
+					select {
+					case <-doneChan:
+						vtrace.GetSpan(fc.ctx).Annotate("Canceled")
+						fc.flow.Cancel()
+					case <-fc.flow.Closed():
+					}
+				}()
+			}
+
+			deadline, _ := ctx.Deadline()
+			if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
+				return nil, verror.NoRetry, false, verr
+			}
+			return fc, verror.NoRetry, false, nil
+		}
+		if numResponses == len(responses) {
+			return c.failedTryCall(ctx, name, method, responses, ch)
+		}
+	}
+}
+
+// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
+// goroutines, and have closed the flow from each one except skip.  This is a
+// blocking function; it should be called in its own goroutine.
+func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
+	numPending := 0
+	for _, r := range responses {
+		switch {
+		case r == nil:
+			// The response hasn't arrived yet.
+			numPending++
+		case r == skip || r.flow == nil:
+			// Either we should skip this flow, or we've closed the flow for this
+			// response already; nothing more to do.
+		default:
+			// We received the response, but haven't closed the flow yet.
+			r.flow.Close()
+		}
+	}
+	// Now we just need to wait for the pending responses and close their flows.
+	for i := 0; i < numPending; i++ {
+		if r := <-ch; r.flow != nil {
+			r.flow.Close()
+		}
+	}
+}
+
+// failedTryCall performs ©asynchronous cleanup for tryCall, and returns an
+// appropriate error from the responses we've already received.  All parallel
+// calls in tryCall failed or we timed out if we get here.
+func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
+	go cleanupTryCall(nil, responses, ch)
+	c.ns.FlushCacheEntry(name)
+	suberrs := []verror.SubErr{}
+	topLevelError := verror.ErrNoServers
+	topLevelAction := verror.RetryRefetch
+	onlyErrNetwork := true
+	for _, r := range responses {
+		if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
+			switch verror.ErrorID(r.serverErr.Err) {
+			case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
+				topLevelError = verror.ErrNotTrusted
+				topLevelAction = verror.NoRetry
+				onlyErrNetwork = false
+			case stream.ErrAborted.ID, stream.ErrNetwork.ID:
+				// do nothing
+			default:
+				onlyErrNetwork = false
+			}
+			suberrs = append(suberrs, *r.serverErr)
+		}
+	}
+
+	if onlyErrNetwork {
+		// If we only encountered network errors, then report ErrBadProtocol.
+		topLevelError = verror.ErrBadProtocol
+	}
+
+	// TODO(cnicolaou): we get system errors for things like dialing using
+	// the 'ws' protocol which can never succeed even if we retry the connection,
+	// hence we return RetryRefetch below except for the case where the servers
+	// are not trusted, in case there's no point in retrying at all.
+	// TODO(cnicolaou): implementing at-most-once rpc semantics in the future
+	// will require thinking through all of the cases where the RPC can
+	// be retried by the client whilst it's actually being executed on the
+	// server.
+	return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
+}
+
+// prepareBlessingsAndDischarges prepares blessings and discharges for
+// the call.
+//
+// This includes: (1) preparing blessings that must be granted to the
+// server, (2) preparing blessings that the client authenticates with,
+// and, (3) preparing any discharges for third-party caveats on the client's
+// blessings.
+func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error {
+	// LocalPrincipal is nil which means we are operating under
+	// SecurityNone.
+	if fc.flow.LocalPrincipal() == nil {
+		return nil
+	}
+
+	// Fetch blessings from the client's blessing store that are to be
+	// shared with the server.
+	if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() {
+		// TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings
+		// to the server.
+		return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings)
+	}
+
+	// Fetch any discharges for third-party caveats on the client's blessings.
+	if !fc.blessings.IsZero() && fc.dc != nil {
+		impetus, err := mkDischargeImpetus(fc.server, method, args)
+		if err != nil {
+			return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err))
+		}
+		fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
+	}
+
+	// Prepare blessings that must be granted to the server (using any
+	// rpc.Granter implementation in 'opts').
+	//
+	// NOTE(ataly, suharshs): Before invoking the granter, we set the parameters
+	// of the current call. The user can now retrieve the principal via
+	// v23.GetPrincipal(ctx), or via call.LocalPrincipal(). While in theory the
+	// two principals can be different, the flow.LocalPrincipal == nil check at
+	// the beginning of this method ensures that the two are the same and non-nil
+	// at this point in the code.
+	ldischargeMap := make(map[string]security.Discharge)
+	for _, d := range fc.discharges {
+		ldischargeMap[d.ID()] = d
+	}
+	seccall := security.NewCall(&security.CallParams{
+		LocalPrincipal:   fc.flow.LocalPrincipal(),
+		LocalBlessings:   fc.blessings,
+		RemoteBlessings:  fc.flow.RemoteBlessings(),
+		LocalEndpoint:    fc.flow.LocalEndpoint(),
+		RemoteEndpoint:   fc.flow.RemoteEndpoint(),
+		LocalDischarges:  ldischargeMap,
+		RemoteDischarges: fc.flow.RemoteDischarges(),
+		Method:           method,
+		Suffix:           suffix,
+	})
+	if err := fc.prepareGrantedBlessings(ctx, seccall, opts); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, call security.Call, opts []rpc.CallOpt) error {
+	for _, o := range opts {
+		switch v := o.(type) {
+		case rpc.Granter:
+			if b, err := v.Grant(ctx, call); err != nil {
+				return verror.New(errBlessingGrant, fc.ctx, err)
+			} else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
+				return verror.New(errBlessingAdd, fc.ctx, err)
+			}
+		}
+	}
+	return nil
+}
+
+func (c *client) Close() {
+	defer vlog.LogCall()() // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	for _, v := range c.vcCache.Close() {
+		c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
+	}
+}
+
+// flowClient implements the RPC client-side protocol for a single RPC, over a
+// flow that's already connected to the server.
+type flowClient struct {
+	ctx      *context.T   // context to annotate with call details
+	dec      *vom.Decoder // to decode responses and results from the server
+	enc      *vom.Encoder // to encode requests and args to the server
+	server   []string     // Blessings bound to the server that authorize it to receive the RPC request from the client.
+	flow     stream.Flow  // the underlying flow
+	response rpc.Response // each decoded response message is kept here
+
+	discharges []security.Discharge // discharges used for this request
+	dc         vc.DischargeClient   // client-global discharge-client
+
+	blessings        security.Blessings // the local blessings for the current RPC.
+	grantedBlessings security.Blessings // the blessings granted to the server.
+
+	sendClosedMu sync.Mutex
+	sendClosed   bool // is the send side already closed? GUARDED_BY(sendClosedMu)
+	finished     bool // has Finish() already been called?
+}
+
+var _ rpc.ClientCall = (*flowClient)(nil)
+var _ rpc.Stream = (*flowClient)(nil)
+
+func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) {
+	fc := &flowClient{
+		ctx:    ctx,
+		flow:   flow,
+		server: server,
+		dc:     dc,
+	}
+	typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+	if typeenc == nil {
+		fc.enc = vom.NewEncoder(flow)
+		fc.dec = vom.NewDecoder(flow)
+	} else {
+		fc.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder))
+		typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+		fc.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder))
+	}
+	return fc, nil
+}
+
+// close determines the appropriate error to return, in particular,
+// if a timeout or cancelation has occured then any error
+// is turned into a timeout or cancelation as appropriate.
+// Cancelation takes precedence over timeout. This is needed because
+// a timeout can lead to any other number of errors due to the underlying
+// network connection being shutdown abruptly.
+func (fc *flowClient) close(err error) error {
+	subErr := verror.SubErr{Err: err, Options: verror.Print}
+	subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
+	if cerr := fc.flow.Close(); cerr != nil && err == nil {
+		return verror.New(verror.ErrInternal, fc.ctx, subErr)
+	}
+	if err == nil {
+		return nil
+	}
+	switch verror.ErrorID(err) {
+	case verror.ErrCanceled.ID:
+		return err
+	case verror.ErrTimeout.ID:
+		// Canceled trumps timeout.
+		if fc.ctx.Err() == context.Canceled {
+			return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr)
+		}
+		return err
+	default:
+		switch fc.ctx.Err() {
+		case context.DeadlineExceeded:
+			timeout := verror.New(verror.ErrTimeout, fc.ctx)
+			err := verror.AddSubErrs(timeout, fc.ctx, subErr)
+			return err
+		case context.Canceled:
+			canceled := verror.New(verror.ErrCanceled, fc.ctx)
+			err := verror.AddSubErrs(canceled, fc.ctx, subErr)
+			return err
+		}
+	}
+	switch verror.ErrorID(err) {
+	case errRequestEncoding.ID, errArgEncoding.ID, errResponseDecoding.ID:
+		return verror.New(verror.ErrBadProtocol, fc.ctx, err)
+	}
+	return err
+}
+
+func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
+	// Encode the Blessings information for the client to authorize the flow.
+	var blessingsRequest rpc.BlessingsRequest
+	if fc.flow.LocalPrincipal() != nil {
+		blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
+	}
+	req := rpc.Request{
+		Suffix:           suffix,
+		Method:           method,
+		NumPosArgs:       uint64(len(args)),
+		Deadline:         vtime.Deadline{deadline},
+		GrantedBlessings: fc.grantedBlessings,
+		Blessings:        blessingsRequest,
+		Discharges:       fc.discharges,
+		TraceRequest:     vtrace.GetRequest(fc.ctx),
+		Language:         string(i18n.GetLangID(fc.ctx)),
+	}
+	if err := fc.enc.Encode(req); err != nil {
+		berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
+		return fc.close(berr)
+	}
+	for ix, arg := range args {
+		if err := fc.enc.Encode(arg); err != nil {
+			berr := verror.New(errArgEncoding, fc.ctx, ix, err)
+			return fc.close(berr)
+		}
+	}
+	return nil
+}
+
+func (fc *flowClient) Send(item interface{}) error {
+	defer vlog.LogCallf("item=")("") // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	if fc.sendClosed {
+		return verror.New(verror.ErrAborted, fc.ctx)
+	}
+
+	// The empty request header indicates what follows is a streaming arg.
+	if err := fc.enc.Encode(rpc.Request{}); err != nil {
+		berr := verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err)
+		return fc.close(berr)
+	}
+	if err := fc.enc.Encode(item); err != nil {
+		berr := verror.New(errArgEncoding, fc.ctx, -1, err)
+		return fc.close(berr)
+	}
+	return nil
+}
+
+// decodeNetError tests for a net.Error from the lower stream code and
+// translates it into an appropriate error to be returned by the higher level
+// RPC api calls. It also tests for the net.Error being a stream.NetError
+// and if so, uses the error it stores rather than the stream.NetError itself
+// as its retrun value. This allows for the stack trace of the original
+// error to be chained to that of any verror created with it as a first parameter.
+func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
+	if neterr, ok := err.(net.Error); ok {
+		if streamNeterr, ok := err.(*stream.NetError); ok {
+			err = streamNeterr.Err() // return the error stored in the stream.NetError
+		}
+		if neterr.Timeout() || neterr.Temporary() {
+			// If a read is canceled in the lower levels we see
+			// a timeout error - see readLocked in vc/reader.go
+			if ctx.Err() == context.Canceled {
+				return verror.ErrCanceled, err
+			}
+			return verror.ErrTimeout, err
+		}
+	}
+	if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
+		return verror.IDAction{id, verror.Action(err)}, err
+	}
+	return verror.ErrBadProtocol, err
+}
+
+func (fc *flowClient) Recv(itemptr interface{}) error {
+	defer vlog.LogCallf("itemptr=")("") // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	switch {
+	case fc.response.Error != nil:
+		return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
+	case fc.response.EndStreamResults:
+		return io.EOF
+	}
+
+	// Decode the response header and handle errors and EOF.
+	if err := fc.dec.Decode(&fc.response); err != nil {
+		id, verr := decodeNetError(fc.ctx, err)
+		berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+		return fc.close(berr)
+	}
+	if fc.response.Error != nil {
+		return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
+	}
+	if fc.response.EndStreamResults {
+		// Return EOF to indicate to the caller that there are no more stream
+		// results.  Any error sent by the server is kept in fc.response.Error, and
+		// returned to the user in Finish.
+		return io.EOF
+	}
+	// Decode the streaming result.
+	if err := fc.dec.Decode(itemptr); err != nil {
+		id, verr := decodeNetError(fc.ctx, err)
+		berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+		// TODO(cnicolaou): should we be caching this?
+		fc.response.Error = berr
+		return fc.close(berr)
+	}
+	return nil
+}
+
+func (fc *flowClient) CloseSend() error {
+	defer vlog.LogCall()() // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	return fc.closeSend()
+}
+
+// closeSend ensures CloseSend always returns verror.E.
+func (fc *flowClient) closeSend() error {
+	fc.sendClosedMu.Lock()
+	defer fc.sendClosedMu.Unlock()
+	if fc.sendClosed {
+		return nil
+	}
+	if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil {
+		// TODO(caprita): Indiscriminately closing the flow below causes
+		// a race as described in:
+		// https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
+		//
+		// There should be a finer grained way to fix this (for example,
+		// encoding errors should probably still result in closing the
+		// flow); on the flip side, there may exist other instances
+		// where we are closing the flow but should not.
+		//
+		// For now, commenting out the line below removes the flakiness
+		// from our existing unit tests, but this needs to be revisited
+		// and fixed correctly.
+		//
+		//   return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err))
+	}
+	fc.sendClosed = true
+	return nil
+}
+
+func (fc *flowClient) Finish(resultptrs ...interface{}) error {
+	defer vlog.LogCallf("resultptrs...=%v", resultptrs)("") // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	err := fc.finish(resultptrs...)
+	vtrace.GetSpan(fc.ctx).Finish()
+	return err
+}
+
+// finish ensures Finish always returns a verror.E.
+func (fc *flowClient) finish(resultptrs ...interface{}) error {
+	if fc.finished {
+		err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
+		return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
+	}
+	fc.finished = true
+
+	// Call closeSend implicitly, if the user hasn't already called it.  There are
+	// three cases:
+	// 1) Server is blocked on Recv waiting for the final request message.
+	// 2) Server has already finished processing, the final response message and
+	//    out args are queued up on the client, and the flow is closed.
+	// 3) Between 1 and 2: the server isn't blocked on Recv, but the final
+	//    response and args aren't queued up yet, and the flow isn't closed.
+	//
+	// We must call closeSend to handle case (1) and unblock the server; otherwise
+	// we'll deadlock with both client and server waiting for each other.  We must
+	// ignore the error (if any) to handle case (2).  In that case the flow is
+	// closed, meaning writes will fail and reads will succeed, and closeSend will
+	// always return an error.  But this isn't a "real" error; the client should
+	// read the rest of the results and succeed.
+	_ = fc.closeSend()
+	// Decode the response header, if it hasn't already been decoded by Recv.
+	if fc.response.Error == nil && !fc.response.EndStreamResults {
+		if err := fc.dec.Decode(&fc.response); err != nil {
+			id, verr := decodeNetError(fc.ctx, err)
+			berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+			return fc.close(berr)
+		}
+		// The response header must indicate the streaming results have ended.
+		if fc.response.Error == nil && !fc.response.EndStreamResults {
+			berr := verror.New(errRemainingStreamResults, fc.ctx)
+			return fc.close(berr)
+		}
+	}
+	if fc.response.AckBlessings {
+		clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
+	}
+	// Incorporate any VTrace info that was returned.
+	vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
+	if fc.response.Error != nil {
+		id := verror.ErrorID(fc.response.Error)
+		if id == verror.ErrNoAccess.ID && fc.dc != nil {
+			// In case the error was caused by a bad discharge, we do not want to get stuck
+			// with retrying again and again with this discharge. As there is no direct way
+			// to detect it, we conservatively flush all discharges we used from the cache.
+			// TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
+			vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
+			fc.dc.Invalidate(fc.discharges...)
+		}
+		if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
+			return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
+		}
+		return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
+	}
+	if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
+		berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
+		return fc.close(berr)
+	}
+	for ix, r := range resultptrs {
+		if err := fc.dec.Decode(r); err != nil {
+			id, verr := decodeNetError(fc.ctx, err)
+			berr := verror.New(id, fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, verr))
+			return fc.close(berr)
+		}
+	}
+	return fc.close(nil)
+}
+
+func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
+	defer vlog.LogCall()() // AUTO-GENERATED, DO NOT EDIT, MUST BE FIRST STATEMENT
+	return fc.server, fc.flow.RemoteBlessings()
+}
+
+func bpatterns(patterns []string) []security.BlessingPattern {
+	if patterns == nil {
+		return nil
+	}
+	bpatterns := make([]security.BlessingPattern, len(patterns))
+	for i, p := range patterns {
+		bpatterns[i] = security.BlessingPattern(p)
+	}
+	return bpatterns
+}