ref: Introduce client.Connection and refactor xclient.go.

This should hopefully make changing the parallel dial
policy in the future simpler.

Next step: Use the client.Connection function in syncbase
peer selection.

Improvements observed during this change:
We should get rid of RetryTimeout:
- No one uses it currently.
- It allowed for a bug that made it possible for nested
calls to rpcs to mess with the perceived timeout of
inner nested calls.

MultiPart: 2/2

Change-Id: I60d7c2231d06ee0a8ae9b8c305bab9494541e255
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 9c1f62e..0809d9e 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -133,6 +133,9 @@
 func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
 	return nil, nil
 }
+func (fc *fakeDischargeClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+	return nil, nil
+}
 func (fc *fakeDischargeClient) Close()                  {}
 func (fc *fakeDischargeClient) Closed() <-chan struct{} { return nil }
 
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index bd784f2..8199210 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -954,3 +954,33 @@
 		t.Error(err)
 	}
 }
+
+func TestClientConnection(t *testing.T) {
+	ctx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	ctx, cancel := context.WithCancel(ctx)
+	name := "mountpoint/server"
+	_, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() { <-server.Closed() }()
+	defer cancel()
+
+	client := v23.GetClient(ctx)
+
+	conn, err := client.Connection(ctx, name)
+	if err != nil {
+		t.Error(err)
+	}
+
+	// StartCall should use the same connection that was just created.
+	call, err := client.StartCall(ctx, name, "Closure", nil, nil)
+	if err != nil {
+		t.Error(err)
+	}
+	if got, want := call.Security().LocalEndpoint().String(), conn.LocalEndpoint().String(); got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 2d85b14..1528ee9 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -138,50 +138,6 @@
 	return c.startCall(ctx, name, method, args, deadline, opts)
 }
 
-func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (rpc.ClientCall, error) {
-	ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
-	for retries := uint(0); ; retries++ {
-		call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
-		switch {
-		case err == nil:
-			return call, nil
-		case !shouldRetry(action, requireResolve, deadline, opts):
-			span.Annotatef("Cannot retry after error: %s", err)
-			span.Finish()
-			return nil, err
-		case !backoff(retries, deadline):
-			span.Annotatef("Retries exhausted")
-			span.Finish()
-			return nil, err
-		default:
-			span.Annotatef("Retrying due to error: %s", err)
-			ctx.VI(2).Infof("Retrying due to error: %s", err)
-		}
-	}
-}
-
-// A randomized exponential backoff. The randomness deters error convoys
-// from forming.  The first time you retry n should be 0, then 1 etc.
-func backoff(n uint, deadline time.Time) bool {
-	// This is ((100 to 200) * 2^n) ms.
-	b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
-	if b > maxBackoff {
-		b = maxBackoff
-	}
-	r := deadline.Sub(time.Now())
-	if b > r {
-		// We need to leave a little time for the call to start or
-		// we'll just timeout in startCall before we actually do
-		// anything.  If we just have a millisecond left, give up.
-		if r <= time.Millisecond {
-			return false
-		}
-		b = r - time.Millisecond
-	}
-	time.Sleep(b)
-	return true
-}
-
 func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
 	defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	deadline := getDeadline(ctx, opts)
@@ -204,50 +160,37 @@
 	}
 }
 
-func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
-	// Context specified deadline.
-	deadline, hasDeadline := ctx.Deadline()
-	if !hasDeadline {
-		// Default deadline.
-		deadline = time.Now().Add(defaultCallTimeout)
+func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (rpc.ClientCall, error) {
+	ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
+	r, err := c.connectToName(ctx, name, method, args, deadline, opts)
+	if err != nil {
+		return nil, err
 	}
-	if r, ok := getRetryTimeoutOpt(opts); ok {
-		// Caller specified deadline.
-		deadline = time.Now().Add(r)
+
+	fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
+	if err != nil {
+		return nil, err
 	}
-	return deadline
+
+	deadline, _ = ctx.Deadline()
+	if verr := fc.start(r.suffix, method, args, deadline, opts); verr != nil {
+		return nil, verr
+	}
+	return fc, nil
 }
 
-func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
-	switch {
-	case noRetry(opts):
-		return false
-	case action != verror.RetryBackoff:
-		return false
-	case time.Now().After(deadline):
-		return false
+func (c *xclient) Connection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.ManagedConn, error) {
+	deadline := getDeadline(ctx, opts)
+	r, err := c.connectToName(ctx, name, "", nil, deadline, opts)
+	if err != nil {
+		return nil, err
 	}
-	return true
+	conn := r.flow.Conn()
+	r.flow.Close()
+	return conn, nil
 }
 
-func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
-	switch {
-	case noRetry(opts):
-		return false
-	case action != verror.RetryConnection && action != verror.RetryRefetch:
-		return false
-	case time.Now().After(deadline):
-		return false
-	case requireResolve && getNoNamespaceOpt(opts):
-		// If we're skipping resolution and there are no servers for
-		// this call retrying is not going to help, we can't come up
-		// with new servers if there is no resolution.
-		return false
-	}
-	return true
-}
-
-type xserverStatus struct {
+type serverStatus struct {
 	index          int
 	server, suffix string
 	flow           flow.Flow
@@ -256,34 +199,152 @@
 	typeDec        *vom.TypeDecoder
 }
 
-func suberrName(server, name, method string) string {
-	// In the case the client directly dialed an endpoint we want to avoid printing
-	// the endpoint twice.
-	if server == name {
-		return fmt.Sprintf("%s.%s", server, method)
+// connectToName attempts too connect to the provided name. It may retry connecting
+// to the servers the name resolves to based on the type of error encountered.
+// Once deadline is reached, it will stop retrying.
+func (c *xclient) connectToName(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (*serverStatus, error) {
+	span := vtrace.GetSpan(ctx)
+	for retries := uint(0); ; retries++ {
+		r, action, requireResolve, err := c.tryConnectToName(ctx, name, method, args, opts)
+		switch {
+		case err == nil:
+			return r, nil
+		case !shouldRetry(action, requireResolve, deadline, opts):
+			span.Annotatef("Cannot retry after error: %s", err)
+			span.Finish()
+			return nil, err
+		case !backoff(retries, deadline):
+			span.Annotatef("Retries exhausted")
+			span.Finish()
+			return nil, err
+		default:
+			span.Annotatef("Retrying due to error: %s", err)
+			ctx.VI(2).Infof("Retrying due to error: %s", err)
+		}
 	}
-	return fmt.Sprintf("%s:%s.%s", server, name, method)
 }
 
-// tryCreateFlow attempts to establish a Flow to "server" (which must be a
-// rooted name), over which a method invocation request could be sent.
+// tryConnectToName makes a single attempt in connecting to a name. It may
+// connect to multiple servers (all that serve "name"), but will return a
+// serverStatus for at most one of them (the server running on the most
+// preferred protocol and network amongst all the servers that were successfully
+// connected to and authorized).
+// If requireResolve is true on return, then we shouldn't bother retrying unless
+// you can re-resolve.
+//
+// TODO(toddw): Remove action from out-args, the error should tell us the action.
+func (c *xclient) tryConnectToName(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (*serverStatus, verror.ActionCode, bool, error) {
+	blessingPattern, name := security.SplitPatternName(name)
+	resolved, err := c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...)
+	switch {
+	case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
+		return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
+	case verror.ErrorID(err) == verror.ErrNoServers.ID:
+		return nil, verror.NoRetry, false, err // avoid unnecessary wrapping
+	case verror.ErrorID(err) == verror.ErrTimeout.ID:
+		return nil, verror.NoRetry, false, err // return timeout without wrapping
+	case err != nil:
+		return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
+	case len(resolved.Servers) == 0:
+		// This should never happen.
+		return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
+	}
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
+		return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
+	}
+
+	// servers is now ordered by the priority heurestic implemented in
+	// filterAndOrderServers.
+	//
+	// Try to connect to all servers in parallel.  Provide sufficient
+	// buffering for all of the connections to finish instantaneously. This
+	// is important because we want to process the responses in priority
+	// order; that order is indicated by the order of entries in servers.
+	// So, if two respones come in at the same 'instant', we prefer the
+	// first in the resolved.Servers)
+	//
+	// TODO(toddw): Refactor the parallel dials so that the policy can be changed,
+	// and so that the goroutines for each Call are tracked separately.
+	responses := make([]*serverStatus, len(resolved.Servers))
+	ch := make(chan *serverStatus, len(resolved.Servers))
+	authorizer := newServerAuthorizer(blessingPattern, opts...)
+	peerAuth := peerAuthorizer{authorizer, method, args}
+	channelTimeout := time.Duration(0)
+	for _, opt := range opts {
+		if t, ok := opt.(options.ChannelTimeout); ok {
+			channelTimeout = time.Duration(t)
+		}
+	}
+	for i, server := range resolved.Names() {
+		c.mu.Lock()
+		if c.closing {
+			c.mu.Unlock()
+			return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+		}
+		c.wg.Add(1)
+		c.mu.Unlock()
+
+		go c.tryConnectToServer(ctx, i, name, server, method, args, peerAuth, channelTimeout, ch)
+	}
+
+	for {
+		// Block for at least one new response from the server, or the timeout.
+		select {
+		case r := <-ch:
+			responses[r.index] = r
+			// Read as many more responses as we can without blocking.
+		LoopNonBlocking:
+			for {
+				select {
+				default:
+					break LoopNonBlocking
+				case r := <-ch:
+					responses[r.index] = r
+				}
+			}
+		case <-ctx.Done():
+			return c.failedTryConnectToName(ctx, name, method, responses, ch)
+		}
+
+		// Process new responses, in priority order.
+		numResponses := 0
+		for _, r := range responses {
+			if r != nil {
+				numResponses++
+			}
+			if r == nil || r.flow == nil {
+				continue
+			}
+			// We must ensure that all flows other than r.flow are closed.
+			go cleanupTryConnectToName(r, responses, ch)
+			return r, verror.NoRetry, false, nil
+		}
+		if numResponses == len(responses) {
+			return c.failedTryConnectToName(ctx, name, method, responses, ch)
+		}
+	}
+}
+
+// tryConnectToServer attempts to establish a Flow to a single "server"
+// (which must be a rooted name), over which a method invocation request
+// could be sent.
 //
 // The server at the remote end of the flow is authorized using the provided
 // authorizer, both during creation of the VC underlying the flow and the
 // flow itself.
 // TODO(cnicolaou): implement real, configurable load balancing.
-func (c *xclient) tryCreateFlow(
+func (c *xclient) tryConnectToServer(
 	ctx *context.T,
 	index int,
 	name, server, method string,
 	args []interface{},
-	auth security.Authorizer,
+	auth flow.PeerAuthorizer,
 	channelTimeout time.Duration,
-	ch chan<- *xserverStatus) {
+	ch chan<- *serverStatus) {
 	defer c.wg.Done()
-	status := &xserverStatus{index: index, server: server}
+	status := &serverStatus{index: index, server: server}
 	var span vtrace.Span
-	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow "+server)
+	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryConnectToServer "+server)
 	defer func() {
 		ch <- status
 		span.Finish()
@@ -308,8 +369,7 @@
 		status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
 		return
 	}
-	peerAuth := peerAuthorizer{auth, method, args}
-	flow, err := c.flowMgr.Dial(ctx, ep, peerAuth, channelTimeout)
+	flow, err := c.flowMgr.Dial(ctx, ep, auth, channelTimeout)
 	if err != nil {
 		ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
 		status.serverErr = suberr(err)
@@ -342,198 +402,10 @@
 	status.flow = flow
 }
 
-type typeFlowAuthorizer struct{}
-
-func (a typeFlowAuthorizer) AuthorizePeer(
-	ctx *context.T,
-	localEP, remoteEP naming.Endpoint,
-	remoteBlessings security.Blessings,
-	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
-	return nil, nil, nil
-}
-
-func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
-	security.Blessings, map[string]security.Discharge, error) {
-	return security.Blessings{}, nil, nil
-}
-
-type peerAuthorizer struct {
-	auth   security.Authorizer
-	method string
-	args   []interface{}
-}
-
-func (x peerAuthorizer) AuthorizePeer(
-	ctx *context.T,
-	localEP, remoteEP naming.Endpoint,
-	remoteBlessings security.Blessings,
-	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
-	localPrincipal := v23.GetPrincipal(ctx)
-	// The "Method" and "Suffix" fields of the call are not populated
-	// as they are considered irrelevant for authorizing server blessings.
-	call := security.NewCall(&security.CallParams{
-		Timestamp:        time.Now(),
-		LocalPrincipal:   localPrincipal,
-		LocalEndpoint:    localEP,
-		RemoteBlessings:  remoteBlessings,
-		RemoteDischarges: remoteDischarges,
-		RemoteEndpoint:   remoteEP,
-	})
-	if err := x.auth.Authorize(ctx, call); err != nil {
-		return nil, nil, verror.New(errPeerAuthorizeFailed, ctx, call.RemoteBlessings(), err)
-	}
-	peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
-	return peerNames, rejectedPeerNames, nil
-}
-
-func (x peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
-	security.Blessings, map[string]security.Discharge, error) {
-	localPrincipal := v23.GetPrincipal(ctx)
-	clientB := localPrincipal.BlessingStore().ForPeer(peerNames...)
-	dis, _ := slib.PrepareDischarges(ctx, clientB, peerNames, x.method, x.args)
-	return clientB, dis, nil
-}
-
-// tryCall makes a single attempt at a call. It may connect to multiple servers
-// (all that serve "name"), but will invoke the method on at most one of them
-// (the server running on the most preferred protcol and network amongst all
-// the servers that were successfully connected to and authorized).
-// if requireResolve is true on return, then we shouldn't bother retrying unless
-// you can re-resolve.
-//
-// TODO(toddw): Remove action from out-args, the error should tell us the action.
-func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
-	blessingPattern, name := security.SplitPatternName(name)
-	resolved, err := c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...)
-	switch {
-	case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
-		return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
-	case verror.ErrorID(err) == verror.ErrNoServers.ID:
-		return nil, verror.NoRetry, false, err // avoid unnecessary wrapping
-	case verror.ErrorID(err) == verror.ErrTimeout.ID:
-		return nil, verror.NoRetry, false, err // return timeout without wrapping
-	case err != nil:
-		return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
-	case len(resolved.Servers) == 0:
-		// This should never happen.
-		return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
-	}
-	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
-		return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
-	}
-
-	// servers is now ordered by the priority heurestic implemented in
-	// filterAndOrderServers.
-	//
-	// Try to connect to all servers in parallel.  Provide sufficient
-	// buffering for all of the connections to finish instantaneously. This
-	// is important because we want to process the responses in priority
-	// order; that order is indicated by the order of entries in servers.
-	// So, if two respones come in at the same 'instant', we prefer the
-	// first in the resolved.Servers)
-	//
-	// TODO(toddw): Refactor the parallel dials so that the policy can be changed,
-	// and so that the goroutines for each Call are tracked separately.
-	responses := make([]*xserverStatus, len(resolved.Servers))
-	ch := make(chan *xserverStatus, len(resolved.Servers))
-	authorizer := newServerAuthorizer(blessingPattern, opts...)
-	channelTimeout := time.Duration(0)
-	for _, opt := range opts {
-		if t, ok := opt.(options.ChannelTimeout); ok {
-			channelTimeout = time.Duration(t)
-		}
-	}
-	for i, server := range resolved.Names() {
-		c.mu.Lock()
-		if c.closing {
-			c.mu.Unlock()
-			return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
-		}
-		c.wg.Add(1)
-		c.mu.Unlock()
-
-		go c.tryCreateFlow(ctx, i, name, server, method, args, authorizer, channelTimeout, ch)
-	}
-
-	for {
-		// Block for at least one new response from the server, or the timeout.
-		select {
-		case r := <-ch:
-			responses[r.index] = r
-			// Read as many more responses as we can without blocking.
-		LoopNonBlocking:
-			for {
-				select {
-				default:
-					break LoopNonBlocking
-				case r := <-ch:
-					responses[r.index] = r
-				}
-			}
-		case <-ctx.Done():
-			return c.failedTryCall(ctx, name, method, responses, ch)
-		}
-
-		// Process new responses, in priority order.
-		numResponses := 0
-		for _, r := range responses {
-			if r != nil {
-				numResponses++
-			}
-			if r == nil || r.flow == nil {
-				continue
-			}
-
-			fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
-			if err != nil {
-				return nil, verror.NoRetry, false, err
-			}
-
-			// This is the 'point of no return'; once the RPC is started (fc.start
-			// below) we can't be sure if it makes it to the server or not so, this
-			// code will never call fc.start more than once to ensure that we provide
-			// 'at-most-once' rpc semantics at this level. Retrying the network
-			// connections (i.e. creating flows) is fine since we can cleanup that
-			// state if we abort a call (i.e. close the flow).
-			//
-			// We must ensure that all flows other than r.flow are closed.
-			//
-			// TODO(cnicolaou): all errors below are marked as NoRetry
-			// because we want to provide at-most-once rpc semantics so
-			// we only ever attempt an RPC once. In the future, we'll cache
-			// responses on the server and then we can retry in-flight
-			// RPCs.
-			go xcleanupTryCall(r, responses, ch)
-
-			// TODO(toddw): It's wasteful to create this goroutine just for a vtrace
-			// annotation.  Refactor this when we refactor the parallel dial logic.
-			/*
-				if ctx.Done() != nil {
-					go func() {
-						select {
-						case <-ctx.Done():
-							vtrace.GetSpan(fc.ctx).Annotate("Canceled")
-						case <-fc.flow.Closed():
-						}
-					}()
-				}
-			*/
-			deadline, _ := ctx.Deadline()
-			if verr := fc.start(r.suffix, method, args, deadline, opts); verr != nil {
-				return nil, verror.NoRetry, false, verr
-			}
-			return fc, verror.NoRetry, false, nil
-		}
-		if numResponses == len(responses) {
-			return c.failedTryCall(ctx, name, method, responses, ch)
-		}
-	}
-}
-
-// xcleanupTryCall ensures we've waited for every response from the tryCreateFlow
+// cleanupTryConnectToName ensures we've waited for every response from the tryConnectToServer
 // goroutines, and have closed the flow from each one except skip.  This is a
 // blocking function; it should be called in its own goroutine.
-func xcleanupTryCall(skip *xserverStatus, responses []*xserverStatus, ch chan *xserverStatus) {
+func cleanupTryConnectToName(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
 	numPending := 0
 	for _, r := range responses {
 		switch {
@@ -559,11 +431,11 @@
 	}
 }
 
-// failedTryCall performs asynchronous cleanup for tryCall, and returns an
+// failedTryConnectToName performs asynchronous cleanup for connectToName, and returns an
 // appropriate error from the responses we've already received.  All parallel
-// calls in tryCall failed or we timed out if we get here.
-func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*xserverStatus, ch chan *xserverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
-	go xcleanupTryCall(nil, responses, ch)
+// calls in tryConnectToName failed or we timed out if we get here.
+func (c *xclient) failedTryConnectToName(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (*serverStatus, verror.ActionCode, bool, error) {
+	go cleanupTryConnectToName(nil, responses, ch)
 	c.ns.FlushCacheEntry(ctx, name)
 	suberrs := []verror.SubErr{}
 	topLevelError := verror.ErrNoServers
@@ -572,12 +444,10 @@
 	for _, r := range responses {
 		if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
 			switch verror.ErrorID(r.serverErr.Err) {
-			case /*stream.ErrNotTrusted.ID,*/ verror.ErrNotTrusted.ID, errPeerAuthorizeFailed.ID:
+			case verror.ErrNotTrusted.ID, errPeerAuthorizeFailed.ID:
 				topLevelError = verror.ErrNotTrusted
 				topLevelAction = verror.NoRetry
 				onlyErrNetwork = false
-			/*case stream.ErrAborted.ID, stream.ErrNetwork.ID:*/
-			// do nothing
 			case verror.ErrTimeout.ID:
 				topLevelError = verror.ErrTimeout
 				onlyErrNetwork = false
@@ -813,32 +683,6 @@
 	return nil
 }
 
-// decodeNetError tests for a net.Error from the lower stream code and
-// translates it into an appropriate error to be returned by the higher level
-// RPC api calls. It also tests for the net.Error being a stream.NetError
-// and if so, uses the error it stores rather than the stream.NetError itself
-// as its retrun value. This allows for the stack trace of the original
-// error to be chained to that of any verror created with it as a first parameter.
-func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
-	if neterr, ok := err.(net.Error); ok {
-		if neterr.Timeout() || neterr.Temporary() {
-			// If a read is canceled in the lower levels we see
-			// a timeout error - see readLocked in vc/reader.go
-			if ctx.Err() == context.Canceled {
-				return verror.ErrCanceled, err
-			}
-			return verror.ErrTimeout, err
-		}
-	}
-	if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
-		return verror.IDAction{
-			ID:     id,
-			Action: verror.Action(err),
-		}, err
-	}
-	return verror.ErrBadProtocol, err
-}
-
 func (fc *flowXClient) CloseSend() error {
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	return fc.closeSend()
@@ -957,3 +801,155 @@
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	return fc.secCall
 }
+
+type typeFlowAuthorizer struct{}
+
+func (a typeFlowAuthorizer) AuthorizePeer(
+	ctx *context.T,
+	localEP, remoteEP naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+	return nil, nil, nil
+}
+
+func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+	security.Blessings, map[string]security.Discharge, error) {
+	return security.Blessings{}, nil, nil
+}
+
+type peerAuthorizer struct {
+	auth   security.Authorizer
+	method string
+	args   []interface{}
+}
+
+func (x peerAuthorizer) AuthorizePeer(
+	ctx *context.T,
+	localEP, remoteEP naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+	localPrincipal := v23.GetPrincipal(ctx)
+	// The "Method" and "Suffix" fields of the call are not populated
+	// as they are considered irrelevant for authorizing server blessings.
+	call := security.NewCall(&security.CallParams{
+		Timestamp:        time.Now(),
+		LocalPrincipal:   localPrincipal,
+		LocalEndpoint:    localEP,
+		RemoteBlessings:  remoteBlessings,
+		RemoteDischarges: remoteDischarges,
+		RemoteEndpoint:   remoteEP,
+	})
+	if err := x.auth.Authorize(ctx, call); err != nil {
+		return nil, nil, verror.New(errPeerAuthorizeFailed, ctx, call.RemoteBlessings(), err)
+	}
+	peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
+	return peerNames, rejectedPeerNames, nil
+}
+
+func (x peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+	security.Blessings, map[string]security.Discharge, error) {
+	localPrincipal := v23.GetPrincipal(ctx)
+	clientB := localPrincipal.BlessingStore().ForPeer(peerNames...)
+	dis, _ := slib.PrepareDischarges(ctx, clientB, peerNames, x.method, x.args)
+	return clientB, dis, nil
+}
+
+func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
+	// Context specified deadline.
+	deadline, hasDeadline := ctx.Deadline()
+	if !hasDeadline {
+		// Default deadline.
+		deadline = time.Now().Add(defaultCallTimeout)
+	}
+	if r, ok := getRetryTimeoutOpt(opts); ok {
+		// Caller specified deadline.
+		deadline = time.Now().Add(r)
+	}
+	return deadline
+}
+
+func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
+	switch {
+	case noRetry(opts):
+		return false
+	case action != verror.RetryBackoff:
+		return false
+	case time.Now().After(deadline):
+		return false
+	}
+	return true
+}
+
+func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
+	switch {
+	case noRetry(opts):
+		return false
+	case action != verror.RetryConnection && action != verror.RetryRefetch:
+		return false
+	case time.Now().After(deadline):
+		return false
+	case requireResolve && getNoNamespaceOpt(opts):
+		// If we're skipping resolution and there are no servers for
+		// this call retrying is not going to help, we can't come up
+		// with new servers if there is no resolution.
+		return false
+	}
+	return true
+}
+
+// A randomized exponential backoff. The randomness deters error convoys
+// from forming.  The first time you retry n should be 0, then 1 etc.
+func backoff(n uint, deadline time.Time) bool {
+	// This is ((100 to 200) * 2^n) ms.
+	b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
+	if b > maxBackoff {
+		b = maxBackoff
+	}
+	r := deadline.Sub(time.Now())
+	if b > r {
+		// We need to leave a little time for the call to start or
+		// we'll just timeout in startCall before we actually do
+		// anything.  If we just have a millisecond left, give up.
+		if r <= time.Millisecond {
+			return false
+		}
+		b = r - time.Millisecond
+	}
+	time.Sleep(b)
+	return true
+}
+
+func suberrName(server, name, method string) string {
+	// In the case the client directly dialed an endpoint we want to avoid printing
+	// the endpoint twice.
+	if server == name {
+		return fmt.Sprintf("%s.%s", server, method)
+	}
+	return fmt.Sprintf("%s:%s.%s", server, name, method)
+}
+
+// decodeNetError tests for a net.Error from the lower stream code and
+// translates it into an appropriate error to be returned by the higher level
+// RPC api calls. It also tests for the net.Error being a stream.NetError
+// and if so, uses the error it stores rather than the stream.NetError itself
+// as its retrun value. This allows for the stack trace of the original
+// error to be chained to that of any verror created with it as a first parameter.
+func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
+	if neterr, ok := err.(net.Error); ok {
+		if neterr.Timeout() || neterr.Temporary() {
+			// If a read is canceled in the lower levels we see
+			// a timeout error - see readLocked in vc/reader.go
+			if ctx.Err() == context.Canceled {
+				return verror.ErrCanceled, err
+			}
+			return verror.ErrTimeout, err
+		}
+	}
+	if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
+		return verror.IDAction{
+			ID:     id,
+			Action: verror.Action(err),
+		}, err
+	}
+	return verror.ErrBadProtocol, err
+}
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index 4c8645f..c900102 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -10,6 +10,7 @@
 	"sync"
 
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/vdl"
@@ -86,13 +87,12 @@
 	return call.Finish(outArgs...)
 }
 
-// Close implements rpc.Client
-func (*simpleMockClient) Close() {
+// Implement rpc.Client.
+func (*simpleMockClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+	return nil, nil
 }
-
-func (*simpleMockClient) Closed() <-chan struct{} {
-	return nil
-}
+func (*simpleMockClient) Close()                  {}
+func (*simpleMockClient) Closed() <-chan struct{} { return nil }
 
 // mockCall implements rpc.ClientCall
 type mockCall struct {