ref: Introduce client.Connection and refactor xclient.go.
This should hopefully make changing the parallel dial
policy in the future simpler.
Next step: Use the client.Connection function in syncbase
peer selection.
Improvements observed during this change:
We should get rid of RetryTimeout:
- No one uses it currently.
- It allowed for a bug that made it possible for nested
calls to rpcs to mess with the perceived timeout of
inner nested calls.
MultiPart: 2/2
Change-Id: I60d7c2231d06ee0a8ae9b8c305bab9494541e255
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 9c1f62e..0809d9e 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -133,6 +133,9 @@
func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
return nil, nil
}
+func (fc *fakeDischargeClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+ return nil, nil
+}
func (fc *fakeDischargeClient) Close() {}
func (fc *fakeDischargeClient) Closed() <-chan struct{} { return nil }
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index bd784f2..8199210 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -954,3 +954,33 @@
t.Error(err)
}
}
+
+func TestClientConnection(t *testing.T) {
+ ctx, shutdown := test.V23InitWithMounttable()
+ defer shutdown()
+
+ ctx, cancel := context.WithCancel(ctx)
+ name := "mountpoint/server"
+ _, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() { <-server.Closed() }()
+ defer cancel()
+
+ client := v23.GetClient(ctx)
+
+ conn, err := client.Connection(ctx, name)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // StartCall should use the same connection that was just created.
+ call, err := client.StartCall(ctx, name, "Closure", nil, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if got, want := call.Security().LocalEndpoint().String(), conn.LocalEndpoint().String(); got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 2d85b14..1528ee9 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -138,50 +138,6 @@
return c.startCall(ctx, name, method, args, deadline, opts)
}
-func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (rpc.ClientCall, error) {
- ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
- for retries := uint(0); ; retries++ {
- call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
- switch {
- case err == nil:
- return call, nil
- case !shouldRetry(action, requireResolve, deadline, opts):
- span.Annotatef("Cannot retry after error: %s", err)
- span.Finish()
- return nil, err
- case !backoff(retries, deadline):
- span.Annotatef("Retries exhausted")
- span.Finish()
- return nil, err
- default:
- span.Annotatef("Retrying due to error: %s", err)
- ctx.VI(2).Infof("Retrying due to error: %s", err)
- }
- }
-}
-
-// A randomized exponential backoff. The randomness deters error convoys
-// from forming. The first time you retry n should be 0, then 1 etc.
-func backoff(n uint, deadline time.Time) bool {
- // This is ((100 to 200) * 2^n) ms.
- b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
- if b > maxBackoff {
- b = maxBackoff
- }
- r := deadline.Sub(time.Now())
- if b > r {
- // We need to leave a little time for the call to start or
- // we'll just timeout in startCall before we actually do
- // anything. If we just have a millisecond left, give up.
- if r <= time.Millisecond {
- return false
- }
- b = r - time.Millisecond
- }
- time.Sleep(b)
- return true
-}
-
func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
deadline := getDeadline(ctx, opts)
@@ -204,50 +160,37 @@
}
}
-func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
- // Context specified deadline.
- deadline, hasDeadline := ctx.Deadline()
- if !hasDeadline {
- // Default deadline.
- deadline = time.Now().Add(defaultCallTimeout)
+func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (rpc.ClientCall, error) {
+ ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
+ r, err := c.connectToName(ctx, name, method, args, deadline, opts)
+ if err != nil {
+ return nil, err
}
- if r, ok := getRetryTimeoutOpt(opts); ok {
- // Caller specified deadline.
- deadline = time.Now().Add(r)
+
+ fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
+ if err != nil {
+ return nil, err
}
- return deadline
+
+ deadline, _ = ctx.Deadline()
+ if verr := fc.start(r.suffix, method, args, deadline, opts); verr != nil {
+ return nil, verr
+ }
+ return fc, nil
}
-func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
- switch {
- case noRetry(opts):
- return false
- case action != verror.RetryBackoff:
- return false
- case time.Now().After(deadline):
- return false
+func (c *xclient) Connection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.ManagedConn, error) {
+ deadline := getDeadline(ctx, opts)
+ r, err := c.connectToName(ctx, name, "", nil, deadline, opts)
+ if err != nil {
+ return nil, err
}
- return true
+ conn := r.flow.Conn()
+ r.flow.Close()
+ return conn, nil
}
-func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
- switch {
- case noRetry(opts):
- return false
- case action != verror.RetryConnection && action != verror.RetryRefetch:
- return false
- case time.Now().After(deadline):
- return false
- case requireResolve && getNoNamespaceOpt(opts):
- // If we're skipping resolution and there are no servers for
- // this call retrying is not going to help, we can't come up
- // with new servers if there is no resolution.
- return false
- }
- return true
-}
-
-type xserverStatus struct {
+type serverStatus struct {
index int
server, suffix string
flow flow.Flow
@@ -256,34 +199,152 @@
typeDec *vom.TypeDecoder
}
-func suberrName(server, name, method string) string {
- // In the case the client directly dialed an endpoint we want to avoid printing
- // the endpoint twice.
- if server == name {
- return fmt.Sprintf("%s.%s", server, method)
+// connectToName attempts too connect to the provided name. It may retry connecting
+// to the servers the name resolves to based on the type of error encountered.
+// Once deadline is reached, it will stop retrying.
+func (c *xclient) connectToName(ctx *context.T, name, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) (*serverStatus, error) {
+ span := vtrace.GetSpan(ctx)
+ for retries := uint(0); ; retries++ {
+ r, action, requireResolve, err := c.tryConnectToName(ctx, name, method, args, opts)
+ switch {
+ case err == nil:
+ return r, nil
+ case !shouldRetry(action, requireResolve, deadline, opts):
+ span.Annotatef("Cannot retry after error: %s", err)
+ span.Finish()
+ return nil, err
+ case !backoff(retries, deadline):
+ span.Annotatef("Retries exhausted")
+ span.Finish()
+ return nil, err
+ default:
+ span.Annotatef("Retrying due to error: %s", err)
+ ctx.VI(2).Infof("Retrying due to error: %s", err)
+ }
}
- return fmt.Sprintf("%s:%s.%s", server, name, method)
}
-// tryCreateFlow attempts to establish a Flow to "server" (which must be a
-// rooted name), over which a method invocation request could be sent.
+// tryConnectToName makes a single attempt in connecting to a name. It may
+// connect to multiple servers (all that serve "name"), but will return a
+// serverStatus for at most one of them (the server running on the most
+// preferred protocol and network amongst all the servers that were successfully
+// connected to and authorized).
+// If requireResolve is true on return, then we shouldn't bother retrying unless
+// you can re-resolve.
+//
+// TODO(toddw): Remove action from out-args, the error should tell us the action.
+func (c *xclient) tryConnectToName(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (*serverStatus, verror.ActionCode, bool, error) {
+ blessingPattern, name := security.SplitPatternName(name)
+ resolved, err := c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...)
+ switch {
+ case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
+ return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
+ case verror.ErrorID(err) == verror.ErrNoServers.ID:
+ return nil, verror.NoRetry, false, err // avoid unnecessary wrapping
+ case verror.ErrorID(err) == verror.ErrTimeout.ID:
+ return nil, verror.NoRetry, false, err // return timeout without wrapping
+ case err != nil:
+ return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
+ case len(resolved.Servers) == 0:
+ // This should never happen.
+ return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
+ }
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
+ return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
+ }
+
+ // servers is now ordered by the priority heurestic implemented in
+ // filterAndOrderServers.
+ //
+ // Try to connect to all servers in parallel. Provide sufficient
+ // buffering for all of the connections to finish instantaneously. This
+ // is important because we want to process the responses in priority
+ // order; that order is indicated by the order of entries in servers.
+ // So, if two respones come in at the same 'instant', we prefer the
+ // first in the resolved.Servers)
+ //
+ // TODO(toddw): Refactor the parallel dials so that the policy can be changed,
+ // and so that the goroutines for each Call are tracked separately.
+ responses := make([]*serverStatus, len(resolved.Servers))
+ ch := make(chan *serverStatus, len(resolved.Servers))
+ authorizer := newServerAuthorizer(blessingPattern, opts...)
+ peerAuth := peerAuthorizer{authorizer, method, args}
+ channelTimeout := time.Duration(0)
+ for _, opt := range opts {
+ if t, ok := opt.(options.ChannelTimeout); ok {
+ channelTimeout = time.Duration(t)
+ }
+ }
+ for i, server := range resolved.Names() {
+ c.mu.Lock()
+ if c.closing {
+ c.mu.Unlock()
+ return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
+ }
+ c.wg.Add(1)
+ c.mu.Unlock()
+
+ go c.tryConnectToServer(ctx, i, name, server, method, args, peerAuth, channelTimeout, ch)
+ }
+
+ for {
+ // Block for at least one new response from the server, or the timeout.
+ select {
+ case r := <-ch:
+ responses[r.index] = r
+ // Read as many more responses as we can without blocking.
+ LoopNonBlocking:
+ for {
+ select {
+ default:
+ break LoopNonBlocking
+ case r := <-ch:
+ responses[r.index] = r
+ }
+ }
+ case <-ctx.Done():
+ return c.failedTryConnectToName(ctx, name, method, responses, ch)
+ }
+
+ // Process new responses, in priority order.
+ numResponses := 0
+ for _, r := range responses {
+ if r != nil {
+ numResponses++
+ }
+ if r == nil || r.flow == nil {
+ continue
+ }
+ // We must ensure that all flows other than r.flow are closed.
+ go cleanupTryConnectToName(r, responses, ch)
+ return r, verror.NoRetry, false, nil
+ }
+ if numResponses == len(responses) {
+ return c.failedTryConnectToName(ctx, name, method, responses, ch)
+ }
+ }
+}
+
+// tryConnectToServer attempts to establish a Flow to a single "server"
+// (which must be a rooted name), over which a method invocation request
+// could be sent.
//
// The server at the remote end of the flow is authorized using the provided
// authorizer, both during creation of the VC underlying the flow and the
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
-func (c *xclient) tryCreateFlow(
+func (c *xclient) tryConnectToServer(
ctx *context.T,
index int,
name, server, method string,
args []interface{},
- auth security.Authorizer,
+ auth flow.PeerAuthorizer,
channelTimeout time.Duration,
- ch chan<- *xserverStatus) {
+ ch chan<- *serverStatus) {
defer c.wg.Done()
- status := &xserverStatus{index: index, server: server}
+ status := &serverStatus{index: index, server: server}
var span vtrace.Span
- ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow "+server)
+ ctx, span = vtrace.WithNewSpan(ctx, "<client>tryConnectToServer "+server)
defer func() {
ch <- status
span.Finish()
@@ -308,8 +369,7 @@
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
- peerAuth := peerAuthorizer{auth, method, args}
- flow, err := c.flowMgr.Dial(ctx, ep, peerAuth, channelTimeout)
+ flow, err := c.flowMgr.Dial(ctx, ep, auth, channelTimeout)
if err != nil {
ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
status.serverErr = suberr(err)
@@ -342,198 +402,10 @@
status.flow = flow
}
-type typeFlowAuthorizer struct{}
-
-func (a typeFlowAuthorizer) AuthorizePeer(
- ctx *context.T,
- localEP, remoteEP naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
- return nil, nil, nil
-}
-
-func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
- security.Blessings, map[string]security.Discharge, error) {
- return security.Blessings{}, nil, nil
-}
-
-type peerAuthorizer struct {
- auth security.Authorizer
- method string
- args []interface{}
-}
-
-func (x peerAuthorizer) AuthorizePeer(
- ctx *context.T,
- localEP, remoteEP naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
- localPrincipal := v23.GetPrincipal(ctx)
- // The "Method" and "Suffix" fields of the call are not populated
- // as they are considered irrelevant for authorizing server blessings.
- call := security.NewCall(&security.CallParams{
- Timestamp: time.Now(),
- LocalPrincipal: localPrincipal,
- LocalEndpoint: localEP,
- RemoteBlessings: remoteBlessings,
- RemoteDischarges: remoteDischarges,
- RemoteEndpoint: remoteEP,
- })
- if err := x.auth.Authorize(ctx, call); err != nil {
- return nil, nil, verror.New(errPeerAuthorizeFailed, ctx, call.RemoteBlessings(), err)
- }
- peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
- return peerNames, rejectedPeerNames, nil
-}
-
-func (x peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
- security.Blessings, map[string]security.Discharge, error) {
- localPrincipal := v23.GetPrincipal(ctx)
- clientB := localPrincipal.BlessingStore().ForPeer(peerNames...)
- dis, _ := slib.PrepareDischarges(ctx, clientB, peerNames, x.method, x.args)
- return clientB, dis, nil
-}
-
-// tryCall makes a single attempt at a call. It may connect to multiple servers
-// (all that serve "name"), but will invoke the method on at most one of them
-// (the server running on the most preferred protcol and network amongst all
-// the servers that were successfully connected to and authorized).
-// if requireResolve is true on return, then we shouldn't bother retrying unless
-// you can re-resolve.
-//
-// TODO(toddw): Remove action from out-args, the error should tell us the action.
-func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
- blessingPattern, name := security.SplitPatternName(name)
- resolved, err := c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...)
- switch {
- case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
- return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
- case verror.ErrorID(err) == verror.ErrNoServers.ID:
- return nil, verror.NoRetry, false, err // avoid unnecessary wrapping
- case verror.ErrorID(err) == verror.ErrTimeout.ID:
- return nil, verror.NoRetry, false, err // return timeout without wrapping
- case err != nil:
- return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
- case len(resolved.Servers) == 0:
- // This should never happen.
- return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
- }
- if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
- return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
- }
-
- // servers is now ordered by the priority heurestic implemented in
- // filterAndOrderServers.
- //
- // Try to connect to all servers in parallel. Provide sufficient
- // buffering for all of the connections to finish instantaneously. This
- // is important because we want to process the responses in priority
- // order; that order is indicated by the order of entries in servers.
- // So, if two respones come in at the same 'instant', we prefer the
- // first in the resolved.Servers)
- //
- // TODO(toddw): Refactor the parallel dials so that the policy can be changed,
- // and so that the goroutines for each Call are tracked separately.
- responses := make([]*xserverStatus, len(resolved.Servers))
- ch := make(chan *xserverStatus, len(resolved.Servers))
- authorizer := newServerAuthorizer(blessingPattern, opts...)
- channelTimeout := time.Duration(0)
- for _, opt := range opts {
- if t, ok := opt.(options.ChannelTimeout); ok {
- channelTimeout = time.Duration(t)
- }
- }
- for i, server := range resolved.Names() {
- c.mu.Lock()
- if c.closing {
- c.mu.Unlock()
- return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
- }
- c.wg.Add(1)
- c.mu.Unlock()
-
- go c.tryCreateFlow(ctx, i, name, server, method, args, authorizer, channelTimeout, ch)
- }
-
- for {
- // Block for at least one new response from the server, or the timeout.
- select {
- case r := <-ch:
- responses[r.index] = r
- // Read as many more responses as we can without blocking.
- LoopNonBlocking:
- for {
- select {
- default:
- break LoopNonBlocking
- case r := <-ch:
- responses[r.index] = r
- }
- }
- case <-ctx.Done():
- return c.failedTryCall(ctx, name, method, responses, ch)
- }
-
- // Process new responses, in priority order.
- numResponses := 0
- for _, r := range responses {
- if r != nil {
- numResponses++
- }
- if r == nil || r.flow == nil {
- continue
- }
-
- fc, err := newFlowXClient(ctx, r.flow, r.typeEnc, r.typeDec)
- if err != nil {
- return nil, verror.NoRetry, false, err
- }
-
- // This is the 'point of no return'; once the RPC is started (fc.start
- // below) we can't be sure if it makes it to the server or not so, this
- // code will never call fc.start more than once to ensure that we provide
- // 'at-most-once' rpc semantics at this level. Retrying the network
- // connections (i.e. creating flows) is fine since we can cleanup that
- // state if we abort a call (i.e. close the flow).
- //
- // We must ensure that all flows other than r.flow are closed.
- //
- // TODO(cnicolaou): all errors below are marked as NoRetry
- // because we want to provide at-most-once rpc semantics so
- // we only ever attempt an RPC once. In the future, we'll cache
- // responses on the server and then we can retry in-flight
- // RPCs.
- go xcleanupTryCall(r, responses, ch)
-
- // TODO(toddw): It's wasteful to create this goroutine just for a vtrace
- // annotation. Refactor this when we refactor the parallel dial logic.
- /*
- if ctx.Done() != nil {
- go func() {
- select {
- case <-ctx.Done():
- vtrace.GetSpan(fc.ctx).Annotate("Canceled")
- case <-fc.flow.Closed():
- }
- }()
- }
- */
- deadline, _ := ctx.Deadline()
- if verr := fc.start(r.suffix, method, args, deadline, opts); verr != nil {
- return nil, verror.NoRetry, false, verr
- }
- return fc, verror.NoRetry, false, nil
- }
- if numResponses == len(responses) {
- return c.failedTryCall(ctx, name, method, responses, ch)
- }
- }
-}
-
-// xcleanupTryCall ensures we've waited for every response from the tryCreateFlow
+// cleanupTryConnectToName ensures we've waited for every response from the tryConnectToServer
// goroutines, and have closed the flow from each one except skip. This is a
// blocking function; it should be called in its own goroutine.
-func xcleanupTryCall(skip *xserverStatus, responses []*xserverStatus, ch chan *xserverStatus) {
+func cleanupTryConnectToName(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
numPending := 0
for _, r := range responses {
switch {
@@ -559,11 +431,11 @@
}
}
-// failedTryCall performs asynchronous cleanup for tryCall, and returns an
+// failedTryConnectToName performs asynchronous cleanup for connectToName, and returns an
// appropriate error from the responses we've already received. All parallel
-// calls in tryCall failed or we timed out if we get here.
-func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*xserverStatus, ch chan *xserverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
- go xcleanupTryCall(nil, responses, ch)
+// calls in tryConnectToName failed or we timed out if we get here.
+func (c *xclient) failedTryConnectToName(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (*serverStatus, verror.ActionCode, bool, error) {
+ go cleanupTryConnectToName(nil, responses, ch)
c.ns.FlushCacheEntry(ctx, name)
suberrs := []verror.SubErr{}
topLevelError := verror.ErrNoServers
@@ -572,12 +444,10 @@
for _, r := range responses {
if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
switch verror.ErrorID(r.serverErr.Err) {
- case /*stream.ErrNotTrusted.ID,*/ verror.ErrNotTrusted.ID, errPeerAuthorizeFailed.ID:
+ case verror.ErrNotTrusted.ID, errPeerAuthorizeFailed.ID:
topLevelError = verror.ErrNotTrusted
topLevelAction = verror.NoRetry
onlyErrNetwork = false
- /*case stream.ErrAborted.ID, stream.ErrNetwork.ID:*/
- // do nothing
case verror.ErrTimeout.ID:
topLevelError = verror.ErrTimeout
onlyErrNetwork = false
@@ -813,32 +683,6 @@
return nil
}
-// decodeNetError tests for a net.Error from the lower stream code and
-// translates it into an appropriate error to be returned by the higher level
-// RPC api calls. It also tests for the net.Error being a stream.NetError
-// and if so, uses the error it stores rather than the stream.NetError itself
-// as its retrun value. This allows for the stack trace of the original
-// error to be chained to that of any verror created with it as a first parameter.
-func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
- if neterr, ok := err.(net.Error); ok {
- if neterr.Timeout() || neterr.Temporary() {
- // If a read is canceled in the lower levels we see
- // a timeout error - see readLocked in vc/reader.go
- if ctx.Err() == context.Canceled {
- return verror.ErrCanceled, err
- }
- return verror.ErrTimeout, err
- }
- }
- if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
- return verror.IDAction{
- ID: id,
- Action: verror.Action(err),
- }, err
- }
- return verror.ErrBadProtocol, err
-}
-
func (fc *flowXClient) CloseSend() error {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return fc.closeSend()
@@ -957,3 +801,155 @@
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return fc.secCall
}
+
+type typeFlowAuthorizer struct{}
+
+func (a typeFlowAuthorizer) AuthorizePeer(
+ ctx *context.T,
+ localEP, remoteEP naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+ return nil, nil, nil
+}
+
+func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+ security.Blessings, map[string]security.Discharge, error) {
+ return security.Blessings{}, nil, nil
+}
+
+type peerAuthorizer struct {
+ auth security.Authorizer
+ method string
+ args []interface{}
+}
+
+func (x peerAuthorizer) AuthorizePeer(
+ ctx *context.T,
+ localEP, remoteEP naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+ localPrincipal := v23.GetPrincipal(ctx)
+ // The "Method" and "Suffix" fields of the call are not populated
+ // as they are considered irrelevant for authorizing server blessings.
+ call := security.NewCall(&security.CallParams{
+ Timestamp: time.Now(),
+ LocalPrincipal: localPrincipal,
+ LocalEndpoint: localEP,
+ RemoteBlessings: remoteBlessings,
+ RemoteDischarges: remoteDischarges,
+ RemoteEndpoint: remoteEP,
+ })
+ if err := x.auth.Authorize(ctx, call); err != nil {
+ return nil, nil, verror.New(errPeerAuthorizeFailed, ctx, call.RemoteBlessings(), err)
+ }
+ peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
+ return peerNames, rejectedPeerNames, nil
+}
+
+func (x peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+ security.Blessings, map[string]security.Discharge, error) {
+ localPrincipal := v23.GetPrincipal(ctx)
+ clientB := localPrincipal.BlessingStore().ForPeer(peerNames...)
+ dis, _ := slib.PrepareDischarges(ctx, clientB, peerNames, x.method, x.args)
+ return clientB, dis, nil
+}
+
+func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
+ // Context specified deadline.
+ deadline, hasDeadline := ctx.Deadline()
+ if !hasDeadline {
+ // Default deadline.
+ deadline = time.Now().Add(defaultCallTimeout)
+ }
+ if r, ok := getRetryTimeoutOpt(opts); ok {
+ // Caller specified deadline.
+ deadline = time.Now().Add(r)
+ }
+ return deadline
+}
+
+func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
+ switch {
+ case noRetry(opts):
+ return false
+ case action != verror.RetryBackoff:
+ return false
+ case time.Now().After(deadline):
+ return false
+ }
+ return true
+}
+
+func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
+ switch {
+ case noRetry(opts):
+ return false
+ case action != verror.RetryConnection && action != verror.RetryRefetch:
+ return false
+ case time.Now().After(deadline):
+ return false
+ case requireResolve && getNoNamespaceOpt(opts):
+ // If we're skipping resolution and there are no servers for
+ // this call retrying is not going to help, we can't come up
+ // with new servers if there is no resolution.
+ return false
+ }
+ return true
+}
+
+// A randomized exponential backoff. The randomness deters error convoys
+// from forming. The first time you retry n should be 0, then 1 etc.
+func backoff(n uint, deadline time.Time) bool {
+ // This is ((100 to 200) * 2^n) ms.
+ b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
+ if b > maxBackoff {
+ b = maxBackoff
+ }
+ r := deadline.Sub(time.Now())
+ if b > r {
+ // We need to leave a little time for the call to start or
+ // we'll just timeout in startCall before we actually do
+ // anything. If we just have a millisecond left, give up.
+ if r <= time.Millisecond {
+ return false
+ }
+ b = r - time.Millisecond
+ }
+ time.Sleep(b)
+ return true
+}
+
+func suberrName(server, name, method string) string {
+ // In the case the client directly dialed an endpoint we want to avoid printing
+ // the endpoint twice.
+ if server == name {
+ return fmt.Sprintf("%s.%s", server, method)
+ }
+ return fmt.Sprintf("%s:%s.%s", server, name, method)
+}
+
+// decodeNetError tests for a net.Error from the lower stream code and
+// translates it into an appropriate error to be returned by the higher level
+// RPC api calls. It also tests for the net.Error being a stream.NetError
+// and if so, uses the error it stores rather than the stream.NetError itself
+// as its retrun value. This allows for the stack trace of the original
+// error to be chained to that of any verror created with it as a first parameter.
+func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
+ if neterr, ok := err.(net.Error); ok {
+ if neterr.Timeout() || neterr.Temporary() {
+ // If a read is canceled in the lower levels we see
+ // a timeout error - see readLocked in vc/reader.go
+ if ctx.Err() == context.Canceled {
+ return verror.ErrCanceled, err
+ }
+ return verror.ErrTimeout, err
+ }
+ }
+ if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
+ return verror.IDAction{
+ ID: id,
+ Action: verror.Action(err),
+ }, err
+ }
+ return verror.ErrBadProtocol, err
+}
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index 4c8645f..c900102 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -10,6 +10,7 @@
"sync"
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/vdl"
@@ -86,13 +87,12 @@
return call.Finish(outArgs...)
}
-// Close implements rpc.Client
-func (*simpleMockClient) Close() {
+// Implement rpc.Client.
+func (*simpleMockClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+ return nil, nil
}
-
-func (*simpleMockClient) Closed() <-chan struct{} {
- return nil
-}
+func (*simpleMockClient) Close() {}
+func (*simpleMockClient) Closed() <-chan struct{} { return nil }
// mockCall implements rpc.ClientCall
type mockCall struct {