| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package rpc |
| |
| import ( |
| "fmt" |
| "net" |
| "time" |
| |
| "v.io/x/lib/vlog" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/i18n" |
| "v.io/v23/namespace" |
| "v.io/v23/naming" |
| "v.io/v23/rpc" |
| "v.io/v23/security" |
| "v.io/v23/verror" |
| "v.io/v23/vtrace" |
| |
| inaming "v.io/x/ref/runtime/internal/naming" |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| "v.io/x/ref/runtime/internal/rpc/stream/vc" |
| ) |
| |
| type xclient struct { |
| streamMgr stream.XManager |
| ns namespace.T |
| vcOpts []stream.VCOpt // vc opts passed to dial |
| preferredProtocols []string |
| |
| // We cache the IP networks on the device since it is not that cheap to read |
| // network interfaces through os syscall. |
| // TODO(jhahn): Add monitoring the network interface changes. |
| ipNets []*net.IPNet |
| |
| dc vc.DischargeClient |
| } |
| |
| var _ rpc.Client = (*xclient)(nil) |
| |
| func XInternalNewClient(streamMgr stream.XManager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) { |
| c := &xclient{ |
| streamMgr: streamMgr, |
| ns: ns, |
| ipNets: ipNetworks(), |
| } |
| c.dc = InternalNewDischargeClient(nil, c, 0) |
| for _, opt := range opts { |
| // Collect all client opts that are also vc opts. |
| switch v := opt.(type) { |
| case stream.VCOpt: |
| c.vcOpts = append(c.vcOpts, v) |
| case PreferredProtocols: |
| c.preferredProtocols = v |
| } |
| } |
| |
| return c, nil |
| } |
| |
| func (c *xclient) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) { |
| suberr := func(err error) *verror.SubErr { |
| return &verror.SubErr{Err: err, Options: verror.Print} |
| } |
| sm := c.streamMgr |
| flow, err := sm.Dial(ep, principal, vcOpts...) |
| if err != nil { |
| return nil, suberr(err) |
| } |
| return flow, nil |
| } |
| |
| func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) { |
| return c.startCall(ctx, name, method, args, opts) |
| } |
| |
| func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error { |
| deadline := getDeadline(ctx, opts) |
| |
| var lastErr error |
| for retries := uint(0); ; retries++ { |
| call, err := c.startCall(ctx, name, method, inArgs, opts) |
| if err != nil { |
| return err |
| } |
| err = call.Finish(outArgs...) |
| if err == nil { |
| return nil |
| } |
| lastErr = err |
| // We only retry if RetryBackoff is returned by the application because other |
| // RetryConnection and RetryRefetch required actions by the client before |
| // retrying. |
| if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) { |
| vlog.VI(4).Infof("Cannot retry after error: %s", lastErr) |
| break |
| } |
| if !backoff(retries, deadline) { |
| break |
| } |
| vlog.VI(4).Infof("Retrying due to error: %s", lastErr) |
| } |
| return lastErr |
| } |
| |
| // startCall ensures StartCall always returns verror.E. |
| func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) { |
| if !ctx.Initialized() { |
| return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized") |
| } |
| ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method)) |
| if err := canCreateServerAuthorizer(ctx, opts); err != nil { |
| return nil, verror.New(verror.ErrBadArg, ctx, err) |
| } |
| |
| deadline := getDeadline(ctx, opts) |
| |
| var lastErr error |
| for retries := uint(0); ; retries++ { |
| call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts) |
| if err == nil { |
| return call, nil |
| } |
| lastErr = err |
| if !shouldRetry(action, requireResolve, deadline, opts) { |
| span.Annotatef("Cannot retry after error: %s", err) |
| break |
| } |
| if !backoff(retries, deadline) { |
| break |
| } |
| span.Annotatef("Retrying due to error: %s", err) |
| } |
| return nil, lastErr |
| } |
| |
| // tryCreateFlow attempts to establish a Flow to "server" (which must be a |
| // rooted name), over which a method invocation request could be sent. |
| // |
| // The server at the remote end of the flow is authorized using the provided |
| // authorizer, both during creation of the VC underlying the flow and the |
| // flow itself. |
| // TODO(cnicolaou): implement real, configurable load balancing. |
| func (c *xclient) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) { |
| status := &serverStatus{index: index, server: server} |
| var span vtrace.Span |
| ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow") |
| span.Annotatef("address:%v", server) |
| defer func() { |
| ch <- status |
| span.Finish() |
| }() |
| |
| suberr := func(err error) *verror.SubErr { |
| return &verror.SubErr{ |
| Name: suberrName(server, name, method), |
| Err: err, |
| Options: verror.Print, |
| } |
| } |
| |
| address, suffix := naming.SplitAddressName(server) |
| if len(address) == 0 { |
| status.serverErr = suberr(verror.New(errNonRootedName, ctx, server)) |
| return |
| } |
| status.suffix = suffix |
| |
| ep, err := inaming.NewEndpoint(address) |
| if err != nil { |
| status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx)) |
| return |
| } |
| if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil { |
| status.serverErr.Name = suberrName(server, name, method) |
| vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err) |
| return |
| } |
| |
| // Authorize the remote end of the flow using the provided authorizer. |
| if status.flow.LocalPrincipal() == nil { |
| // LocalPrincipal is nil which means we are operating under |
| // SecurityNone. |
| return |
| } |
| |
| seccall := security.NewCall(&security.CallParams{ |
| LocalPrincipal: status.flow.LocalPrincipal(), |
| LocalBlessings: status.flow.LocalBlessings(), |
| RemoteBlessings: status.flow.RemoteBlessings(), |
| LocalEndpoint: status.flow.LocalEndpoint(), |
| RemoteEndpoint: status.flow.RemoteEndpoint(), |
| RemoteDischarges: status.flow.RemoteDischarges(), |
| Method: method, |
| Suffix: status.suffix, |
| }) |
| if err := auth.Authorize(ctx, seccall); err != nil { |
| // We will test for errServerAuthorizeFailed in failedTryCall and report |
| // verror.ErrNotTrusted |
| status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err)) |
| vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err) |
| status.flow.Close() |
| status.flow = nil |
| return |
| } |
| status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall) |
| return |
| } |
| |
| // tryCall makes a single attempt at a call. It may connect to multiple servers |
| // (all that serve "name"), but will invoke the method on at most one of them |
| // (the server running on the most preferred protcol and network amongst all |
| // the servers that were successfully connected to and authorized). |
| // if requireResolve is true on return, then we shouldn't bother retrying unless |
| // you can re-resolve. |
| func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) { |
| var resolved *naming.MountEntry |
| var blessingPattern security.BlessingPattern |
| blessingPattern, name = security.SplitPatternName(name) |
| if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil { |
| // We always return NoServers as the error so that the caller knows |
| // that's ok to retry the operation since the name may be registered |
| // in the near future. |
| switch { |
| case verror.ErrorID(err) == naming.ErrNoSuchName.ID: |
| return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name) |
| case verror.ErrorID(err) == verror.ErrNoServers.ID: |
| // Avoid wrapping errors unnecessarily. |
| return nil, verror.NoRetry, false, err |
| default: |
| return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err) |
| } |
| } else { |
| if len(resolved.Servers) == 0 { |
| // This should never happen. |
| return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name) |
| } |
| // An empty set of protocols means all protocols... |
| if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil { |
| return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err) |
| } |
| } |
| |
| // We need to ensure calls to v23 factory methods do not occur during runtime |
| // initialization. Currently, the agent, which uses SecurityNone, is the only caller |
| // during runtime initialization. We would like to set the principal in the context |
| // to nil if we are running in SecurityNone, but this always results in a panic since |
| // the agent client would trigger the call v23.WithPrincipal during runtime |
| // initialization. So, we gate the call to v23.GetPrincipal instead since the agent |
| // client will have callEncrypted == false. |
| // Potential solutions to this are: |
| // (1) Create a separate client for the agent so that this code doesn't have to |
| // account for its use during runtime initialization. |
| // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate |
| // on here. |
| var principal security.Principal |
| if callEncrypted(opts) { |
| if principal = v23.GetPrincipal(ctx); principal == nil { |
| return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx) |
| } |
| } |
| |
| // servers is now ordered by the priority heurestic implemented in |
| // filterAndOrderServers. |
| // |
| // Try to connect to all servers in parallel. Provide sufficient |
| // buffering for all of the connections to finish instantaneously. This |
| // is important because we want to process the responses in priority |
| // order; that order is indicated by the order of entries in servers. |
| // So, if two respones come in at the same 'instant', we prefer the |
| // first in the resolved.Servers) |
| attempts := len(resolved.Servers) |
| |
| responses := make([]*serverStatus, attempts) |
| ch := make(chan *serverStatus, attempts) |
| vcOpts := append(getVCOpts(opts), c.vcOpts...) |
| authorizer := newServerAuthorizer(blessingPattern, opts...) |
| for i, server := range resolved.Names() { |
| // Create a copy of vcOpts for each call to tryCreateFlow |
| // to avoid concurrent tryCreateFlows from stepping on each |
| // other while manipulating their copy of the options. |
| vcOptsCopy := make([]stream.VCOpt, len(vcOpts)) |
| copy(vcOptsCopy, vcOpts) |
| go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy) |
| } |
| |
| var timeoutChan <-chan time.Time |
| if deadline, ok := ctx.Deadline(); ok { |
| timeoutChan = time.After(deadline.Sub(time.Now())) |
| } |
| |
| for { |
| // Block for at least one new response from the server, or the timeout. |
| select { |
| case r := <-ch: |
| responses[r.index] = r |
| // Read as many more responses as we can without blocking. |
| LoopNonBlocking: |
| for { |
| select { |
| default: |
| break LoopNonBlocking |
| case r := <-ch: |
| responses[r.index] = r |
| } |
| } |
| case <-timeoutChan: |
| vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name) |
| _, _, _, err := c.failedTryCall(ctx, name, method, responses, ch) |
| if verror.ErrorID(err) != verror.ErrTimeout.ID { |
| return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err) |
| } |
| return nil, verror.NoRetry, false, err |
| } |
| |
| dc := c.dc |
| if shouldNotFetchDischarges(opts) { |
| dc = nil |
| } |
| // Process new responses, in priority order. |
| numResponses := 0 |
| for _, r := range responses { |
| if r != nil { |
| numResponses++ |
| } |
| if r == nil || r.flow == nil { |
| continue |
| } |
| |
| doneChan := ctx.Done() |
| r.flow.SetDeadline(doneChan) |
| fc, err := newFlowClient(ctx, r.flow, r.blessings, dc) |
| if err != nil { |
| return nil, verror.NoRetry, false, err |
| } |
| |
| if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil { |
| r.serverErr = &verror.SubErr{ |
| Name: suberrName(r.server, name, method), |
| Options: verror.Print, |
| Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)), |
| } |
| vlog.VI(2).Infof("rpc: err: %s", r.serverErr) |
| r.flow.Close() |
| r.flow = nil |
| continue |
| } |
| |
| // This is the 'point of no return'; once the RPC is started (fc.start |
| // below) we can't be sure if it makes it to the server or not so, this |
| // code will never call fc.start more than once to ensure that we provide |
| // 'at-most-once' rpc semantics at this level. Retrying the network |
| // connections (i.e. creating flows) is fine since we can cleanup that |
| // state if we abort a call (i.e. close the flow). |
| // |
| // We must ensure that all flows other than r.flow are closed. |
| // |
| // TODO(cnicolaou): all errors below are marked as NoRetry |
| // because we want to provide at-most-once rpc semantics so |
| // we only ever attempt an RPC once. In the future, we'll cache |
| // responses on the server and then we can retry in-flight |
| // RPCs. |
| go cleanupTryCall(r, responses, ch) |
| |
| if doneChan != nil { |
| go func() { |
| select { |
| case <-doneChan: |
| vtrace.GetSpan(fc.ctx).Annotate("Canceled") |
| fc.flow.Cancel() |
| case <-fc.flow.Closed(): |
| } |
| }() |
| } |
| |
| deadline, _ := ctx.Deadline() |
| if verr := fc.start(r.suffix, method, args, deadline); verr != nil { |
| return nil, verror.NoRetry, false, verr |
| } |
| return fc, verror.NoRetry, false, nil |
| } |
| if numResponses == len(responses) { |
| return c.failedTryCall(ctx, name, method, responses, ch) |
| } |
| } |
| } |
| |
| // failedTryCall performs ©asynchronous cleanup for tryCall, and returns an |
| // appropriate error from the responses we've already received. All parallel |
| // calls in tryCall failed or we timed out if we get here. |
| func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) { |
| go cleanupTryCall(nil, responses, ch) |
| c.ns.FlushCacheEntry(name) |
| suberrs := []verror.SubErr{} |
| topLevelError := verror.ErrNoServers |
| topLevelAction := verror.RetryRefetch |
| onlyErrNetwork := true |
| for _, r := range responses { |
| if r != nil && r.serverErr != nil && r.serverErr.Err != nil { |
| switch verror.ErrorID(r.serverErr.Err) { |
| case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID: |
| topLevelError = verror.ErrNotTrusted |
| topLevelAction = verror.NoRetry |
| onlyErrNetwork = false |
| case stream.ErrAborted.ID, stream.ErrNetwork.ID: |
| // do nothing |
| default: |
| onlyErrNetwork = false |
| } |
| suberrs = append(suberrs, *r.serverErr) |
| } |
| } |
| |
| if onlyErrNetwork { |
| // If we only encountered network errors, then report ErrBadProtocol. |
| topLevelError = verror.ErrBadProtocol |
| } |
| |
| // TODO(cnicolaou): we get system errors for things like dialing using |
| // the 'ws' protocol which can never succeed even if we retry the connection, |
| // hence we return RetryRefetch below except for the case where the servers |
| // are not trusted, in case there's no point in retrying at all. |
| // TODO(cnicolaou): implementing at-most-once rpc semantics in the future |
| // will require thinking through all of the cases where the RPC can |
| // be retried by the client whilst it's actually being executed on the |
| // server. |
| return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...) |
| } |
| |
| func (c *xclient) Close() { |
| // TODO(suharshs): Implement this. |
| } |