blob: 6d32f6b7fbbb3ece1804820b278658f280759fb3 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"fmt"
"net"
"time"
"v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/i18n"
"v.io/v23/namespace"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vtrace"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
)
type xclient struct {
streamMgr stream.XManager
ns namespace.T
vcOpts []stream.VCOpt // vc opts passed to dial
preferredProtocols []string
// We cache the IP networks on the device since it is not that cheap to read
// network interfaces through os syscall.
// TODO(jhahn): Add monitoring the network interface changes.
ipNets []*net.IPNet
dc vc.DischargeClient
}
var _ rpc.Client = (*xclient)(nil)
func XInternalNewClient(streamMgr stream.XManager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &xclient{
streamMgr: streamMgr,
ns: ns,
ipNets: ipNetworks(),
}
c.dc = InternalNewDischargeClient(nil, c, 0)
for _, opt := range opts {
// Collect all client opts that are also vc opts.
switch v := opt.(type) {
case stream.VCOpt:
c.vcOpts = append(c.vcOpts, v)
case PreferredProtocols:
c.preferredProtocols = v
}
}
return c, nil
}
func (c *xclient) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{Err: err, Options: verror.Print}
}
sm := c.streamMgr
flow, err := sm.Dial(ep, principal, vcOpts...)
if err != nil {
return nil, suberr(err)
}
return flow, nil
}
func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
return c.startCall(ctx, name, method, args, opts)
}
func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
deadline := getDeadline(ctx, opts)
var lastErr error
for retries := uint(0); ; retries++ {
call, err := c.startCall(ctx, name, method, inArgs, opts)
if err != nil {
return err
}
err = call.Finish(outArgs...)
if err == nil {
return nil
}
lastErr = err
// We only retry if RetryBackoff is returned by the application because other
// RetryConnection and RetryRefetch required actions by the client before
// retrying.
if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
break
}
if !backoff(retries, deadline) {
break
}
vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
}
return lastErr
}
// startCall ensures StartCall always returns verror.E.
func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
if !ctx.Initialized() {
return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
}
ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
if err := canCreateServerAuthorizer(ctx, opts); err != nil {
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
deadline := getDeadline(ctx, opts)
var lastErr error
for retries := uint(0); ; retries++ {
call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
if err == nil {
return call, nil
}
lastErr = err
if !shouldRetry(action, requireResolve, deadline, opts) {
span.Annotatef("Cannot retry after error: %s", err)
break
}
if !backoff(retries, deadline) {
break
}
span.Annotatef("Retrying due to error: %s", err)
}
return nil, lastErr
}
// tryCreateFlow attempts to establish a Flow to "server" (which must be a
// rooted name), over which a method invocation request could be sent.
//
// The server at the remote end of the flow is authorized using the provided
// authorizer, both during creation of the VC underlying the flow and the
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
func (c *xclient) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
status := &serverStatus{index: index, server: server}
var span vtrace.Span
ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
span.Annotatef("address:%v", server)
defer func() {
ch <- status
span.Finish()
}()
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{
Name: suberrName(server, name, method),
Err: err,
Options: verror.Print,
}
}
address, suffix := naming.SplitAddressName(server)
if len(address) == 0 {
status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
return
}
status.suffix = suffix
ep, err := inaming.NewEndpoint(address)
if err != nil {
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
status.serverErr.Name = suberrName(server, name, method)
vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
return
}
// Authorize the remote end of the flow using the provided authorizer.
if status.flow.LocalPrincipal() == nil {
// LocalPrincipal is nil which means we are operating under
// SecurityNone.
return
}
seccall := security.NewCall(&security.CallParams{
LocalPrincipal: status.flow.LocalPrincipal(),
LocalBlessings: status.flow.LocalBlessings(),
RemoteBlessings: status.flow.RemoteBlessings(),
LocalEndpoint: status.flow.LocalEndpoint(),
RemoteEndpoint: status.flow.RemoteEndpoint(),
RemoteDischarges: status.flow.RemoteDischarges(),
Method: method,
Suffix: status.suffix,
})
if err := auth.Authorize(ctx, seccall); err != nil {
// We will test for errServerAuthorizeFailed in failedTryCall and report
// verror.ErrNotTrusted
status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
status.flow.Close()
status.flow = nil
return
}
status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
return
}
// tryCall makes a single attempt at a call. It may connect to multiple servers
// (all that serve "name"), but will invoke the method on at most one of them
// (the server running on the most preferred protcol and network amongst all
// the servers that were successfully connected to and authorized).
// if requireResolve is true on return, then we shouldn't bother retrying unless
// you can re-resolve.
func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
var resolved *naming.MountEntry
var blessingPattern security.BlessingPattern
blessingPattern, name = security.SplitPatternName(name)
if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
// We always return NoServers as the error so that the caller knows
// that's ok to retry the operation since the name may be registered
// in the near future.
switch {
case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
case verror.ErrorID(err) == verror.ErrNoServers.ID:
// Avoid wrapping errors unnecessarily.
return nil, verror.NoRetry, false, err
default:
return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
}
} else {
if len(resolved.Servers) == 0 {
// This should never happen.
return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
}
// An empty set of protocols means all protocols...
if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
}
}
// We need to ensure calls to v23 factory methods do not occur during runtime
// initialization. Currently, the agent, which uses SecurityNone, is the only caller
// during runtime initialization. We would like to set the principal in the context
// to nil if we are running in SecurityNone, but this always results in a panic since
// the agent client would trigger the call v23.WithPrincipal during runtime
// initialization. So, we gate the call to v23.GetPrincipal instead since the agent
// client will have callEncrypted == false.
// Potential solutions to this are:
// (1) Create a separate client for the agent so that this code doesn't have to
// account for its use during runtime initialization.
// (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
// on here.
var principal security.Principal
if callEncrypted(opts) {
if principal = v23.GetPrincipal(ctx); principal == nil {
return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
}
}
// servers is now ordered by the priority heurestic implemented in
// filterAndOrderServers.
//
// Try to connect to all servers in parallel. Provide sufficient
// buffering for all of the connections to finish instantaneously. This
// is important because we want to process the responses in priority
// order; that order is indicated by the order of entries in servers.
// So, if two respones come in at the same 'instant', we prefer the
// first in the resolved.Servers)
attempts := len(resolved.Servers)
responses := make([]*serverStatus, attempts)
ch := make(chan *serverStatus, attempts)
vcOpts := append(getVCOpts(opts), c.vcOpts...)
authorizer := newServerAuthorizer(blessingPattern, opts...)
for i, server := range resolved.Names() {
// Create a copy of vcOpts for each call to tryCreateFlow
// to avoid concurrent tryCreateFlows from stepping on each
// other while manipulating their copy of the options.
vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
copy(vcOptsCopy, vcOpts)
go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
}
var timeoutChan <-chan time.Time
if deadline, ok := ctx.Deadline(); ok {
timeoutChan = time.After(deadline.Sub(time.Now()))
}
for {
// Block for at least one new response from the server, or the timeout.
select {
case r := <-ch:
responses[r.index] = r
// Read as many more responses as we can without blocking.
LoopNonBlocking:
for {
select {
default:
break LoopNonBlocking
case r := <-ch:
responses[r.index] = r
}
}
case <-timeoutChan:
vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
if verror.ErrorID(err) != verror.ErrTimeout.ID {
return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
}
return nil, verror.NoRetry, false, err
}
dc := c.dc
if shouldNotFetchDischarges(opts) {
dc = nil
}
// Process new responses, in priority order.
numResponses := 0
for _, r := range responses {
if r != nil {
numResponses++
}
if r == nil || r.flow == nil {
continue
}
doneChan := ctx.Done()
r.flow.SetDeadline(doneChan)
fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
if err != nil {
return nil, verror.NoRetry, false, err
}
if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
r.serverErr = &verror.SubErr{
Name: suberrName(r.server, name, method),
Options: verror.Print,
Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
}
vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
r.flow.Close()
r.flow = nil
continue
}
// This is the 'point of no return'; once the RPC is started (fc.start
// below) we can't be sure if it makes it to the server or not so, this
// code will never call fc.start more than once to ensure that we provide
// 'at-most-once' rpc semantics at this level. Retrying the network
// connections (i.e. creating flows) is fine since we can cleanup that
// state if we abort a call (i.e. close the flow).
//
// We must ensure that all flows other than r.flow are closed.
//
// TODO(cnicolaou): all errors below are marked as NoRetry
// because we want to provide at-most-once rpc semantics so
// we only ever attempt an RPC once. In the future, we'll cache
// responses on the server and then we can retry in-flight
// RPCs.
go cleanupTryCall(r, responses, ch)
if doneChan != nil {
go func() {
select {
case <-doneChan:
vtrace.GetSpan(fc.ctx).Annotate("Canceled")
fc.flow.Cancel()
case <-fc.flow.Closed():
}
}()
}
deadline, _ := ctx.Deadline()
if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
return nil, verror.NoRetry, false, verr
}
return fc, verror.NoRetry, false, nil
}
if numResponses == len(responses) {
return c.failedTryCall(ctx, name, method, responses, ch)
}
}
}
// failedTryCall performs ©asynchronous cleanup for tryCall, and returns an
// appropriate error from the responses we've already received. All parallel
// calls in tryCall failed or we timed out if we get here.
func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
go cleanupTryCall(nil, responses, ch)
c.ns.FlushCacheEntry(name)
suberrs := []verror.SubErr{}
topLevelError := verror.ErrNoServers
topLevelAction := verror.RetryRefetch
onlyErrNetwork := true
for _, r := range responses {
if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
switch verror.ErrorID(r.serverErr.Err) {
case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
topLevelError = verror.ErrNotTrusted
topLevelAction = verror.NoRetry
onlyErrNetwork = false
case stream.ErrAborted.ID, stream.ErrNetwork.ID:
// do nothing
default:
onlyErrNetwork = false
}
suberrs = append(suberrs, *r.serverErr)
}
}
if onlyErrNetwork {
// If we only encountered network errors, then report ErrBadProtocol.
topLevelError = verror.ErrBadProtocol
}
// TODO(cnicolaou): we get system errors for things like dialing using
// the 'ws' protocol which can never succeed even if we retry the connection,
// hence we return RetryRefetch below except for the case where the servers
// are not trusted, in case there's no point in retrying at all.
// TODO(cnicolaou): implementing at-most-once rpc semantics in the future
// will require thinking through all of the cases where the RPC can
// be retried by the client whilst it's actually being executed on the
// server.
return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
}
func (c *xclient) Close() {
// TODO(suharshs): Implement this.
}