blob: 40dde634c1624b02bd944c1e2994ae662f24a370 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"fmt"
"io"
"math/rand"
"net"
"reflect"
"sync"
"time"
"v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/i18n"
"v.io/v23/namespace"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/vdl"
vtime "v.io/v23/vdlroot/time"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/v23/vtrace"
inaming "v.io/x/ref/profiles/internal/naming"
"v.io/x/ref/profiles/internal/rpc/stream"
"v.io/x/ref/profiles/internal/rpc/stream/vc"
)
const pkgPath = "v.io/x/ref/profiles/internal/rpc"
func reg(id, msg string) verror.IDAction {
// Note: the error action is never used and is instead computed
// at a higher level. The errors here are purely for informational
// purposes.
return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
}
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
// strings to avoid repeating these n-times in the final error
// message visible to the user.
errClientCloseAlreadyCalled = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called")
errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called")
errNonRootedName = reg(".errNonRootedName", "{3} does not appear to contain an address")
errInvalidEndpoint = reg(".errInvalidEndpoint", "failed to parse endpoint")
errIncompatibleEndpoint = reg(".errIncompatibleEndpoint", "incompatible endpoint")
errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
errRequestEncoding = reg(".errRequestEncoding", "failed to encode request {3}{:4}")
errDischargeEncoding = reg(".errDischargeEncoding", "failed to encode discharges {:3}")
errBlessingEncoding = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}")
errArgEncoding = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}")
errMismatchedResults = reg(".errMismatchedResults", "got {3} results, but want {4}")
errResultDecoding = reg(".errResultDecoding", "failed to decode result #{3}{:4}")
errResponseDecoding = reg(".errResponseDecoding", "failed to decode response{:3}")
errRemainingStreamResults = reg(".errRemaingStreamResults", "stream closed with remaining stream results")
errNoBlessingsForPeer = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}")
errBlessingGrant = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}")
errBlessingAdd = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}")
errServerAuthorizeFailed = reg(".errServerAuthorizedFailed", "failed to authorize flow with remote blessings{:3} {:4}")
errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}")
errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}")
errNoPrincipal = reg(".errNoPrincipal", "principal required for secure connections")
)
type client struct {
streamMgr stream.Manager
ns namespace.T
vcOpts []stream.VCOpt // vc opts passed to dial
preferredProtocols []string
// We cache the IP networks on the device since it is not that cheap to read
// network interfaces through os syscall.
// TODO(jhahn): Add monitoring the network interface changes.
ipNets []*net.IPNet
// We support concurrent calls to StartCall and Close, so we must protect the
// vcMap. Everything else is initialized upon client construction, and safe
// to use concurrently.
vcMapMu sync.Mutex
vcMap map[vcMapKey]*vcInfo
dc vc.DischargeClient
}
var _ rpc.Client = (*client)(nil)
type vcInfo struct {
vc stream.VC
remoteEP naming.Endpoint
}
type vcMapKey struct {
endpoint string
clientPublicKey string // clientPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
}
func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &client{
streamMgr: streamMgr,
ns: ns,
ipNets: ipNetworks(),
vcMap: make(map[vcMapKey]*vcInfo),
}
c.dc = InternalNewDischargeClient(nil, c, 0)
for _, opt := range opts {
// Collect all client opts that are also vc opts.
switch v := opt.(type) {
case stream.VCOpt:
c.vcOpts = append(c.vcOpts, v)
case PreferredProtocols:
c.preferredProtocols = v
}
}
return c, nil
}
func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{Err: err, Options: verror.Print}
}
if c.vcMap == nil {
return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
}
vcKey := vcMapKey{endpoint: ep.String()}
if principal != nil {
vcKey.clientPublicKey = principal.PublicKey().String()
}
if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
if flow, err := vcinfo.vc.Connect(); err == nil {
return flow, nil
}
// If the vc fails to establish a new flow, we assume it's
// broken, remove it from the map, and proceed to establishing
// a new vc.
// TODO(caprita): Should we distinguish errors due to vc being
// closed from other errors? If not, should we call vc.Close()
// before removing the vc from the map?
delete(c.vcMap, vcKey)
}
sm := c.streamMgr
c.vcMapMu.Unlock()
vc, err := sm.Dial(ep, principal, vcOpts...)
c.vcMapMu.Lock()
if err != nil {
return nil, suberr(err)
}
if c.vcMap == nil {
sm.ShutdownEndpoint(ep)
return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
}
if othervc, exists := c.vcMap[vcKey]; exists {
go vc.Close(nil)
vc = othervc.vc
} else {
c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
}
flow, err := vc.Connect()
if err != nil {
return nil, suberr(err)
}
return flow, nil
}
// A randomized exponential backoff. The randomness deters error convoys
// from forming. The first time you retry n should be 0, then 1 etc.
func backoff(n uint, deadline time.Time) bool {
// This is ((100 to 200) * 2^n) ms.
b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
if b > maxBackoff {
b = maxBackoff
}
r := deadline.Sub(time.Now())
if b > r {
// We need to leave a little time for the call to start or
// we'll just timeout in startCall before we actually do
// anything. If we just have a millisecond left, give up.
if r <= time.Millisecond {
return false
}
b = r - time.Millisecond
}
time.Sleep(b)
return true
}
func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
defer vlog.LogCall()()
return c.startCall(ctx, name, method, args, opts)
}
func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
defer vlog.LogCall()()
deadline := getDeadline(ctx, opts)
var lastErr error
for retries := uint(0); ; retries++ {
call, err := c.startCall(ctx, name, method, inArgs, opts)
if err != nil {
return err
}
err = call.Finish(outArgs...)
if err == nil {
return nil
}
lastErr = err
// We only retry if RetryBackoff is returned by the application because other
// RetryConnection and RetryRefetch required actions by the client before
// retrying.
if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
break
}
if !backoff(retries, deadline) {
break
}
vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
}
return lastErr
}
func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
// Context specified deadline.
deadline, hasDeadline := ctx.Deadline()
if !hasDeadline {
// Default deadline.
deadline = time.Now().Add(defaultCallTimeout)
}
if r, ok := getRetryTimeoutOpt(opts); ok {
// Caller specified deadline.
deadline = time.Now().Add(r)
}
return deadline
}
func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
switch {
case noRetry(opts):
return false
case action != verror.RetryBackoff:
return false
case time.Now().After(deadline):
return false
}
return true
}
func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
switch {
case noRetry(opts):
return false
case action != verror.RetryConnection && action != verror.RetryRefetch:
return false
case time.Now().After(deadline):
return false
case requireResolve && getNoNamespaceOpt(opts):
// If we're skipping resolution and there are no servers for
// this call retrying is not going to help, we can't come up
// with new servers if there is no resolution.
return false
}
return true
}
func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) {
var impetus security.DischargeImpetus
if len(serverBlessings) > 0 {
impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
for i, b := range serverBlessings {
impetus.Server[i] = security.BlessingPattern(b)
}
}
impetus.Method = method
if len(args) > 0 {
impetus.Arguments = make([]*vdl.Value, len(args))
for i, a := range args {
vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a))
if err != nil {
return security.DischargeImpetus{}, err
}
impetus.Arguments[i] = vArg
}
}
return impetus, nil
}
// startCall ensures StartCall always returns verror.E.
func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
if !ctx.Initialized() {
return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
}
ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
if err := canCreateServerAuthorizer(ctx, opts); err != nil {
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
deadline := getDeadline(ctx, opts)
var lastErr error
for retries := uint(0); ; retries++ {
call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
if err == nil {
return call, nil
}
lastErr = err
if !shouldRetry(action, requireResolve, deadline, opts) {
span.Annotatef("Cannot retry after error: %s", err)
break
}
if !backoff(retries, deadline) {
break
}
span.Annotatef("Retrying due to error: %s", err)
}
return nil, lastErr
}
type serverStatus struct {
index int
server, suffix string
flow stream.Flow
blessings []string // authorized server blessings
rejectedBlessings []security.RejectedBlessing // rejected server blessings
serverErr *verror.SubErr
}
func suberrName(server, name, method string) string {
// In the case the client directly dialed an endpoint we want to avoid printing
// the endpoint twice.
if server == name {
return fmt.Sprintf("%s.%s", server, method)
}
return fmt.Sprintf("%s:%s.%s", server, name, method)
}
// tryCreateFlow attempts to establish a Flow to "server" (which must be a
// rooted name), over which a method invocation request could be sent.
//
// The server at the remote end of the flow is authorized using the provided
// authorizer, both during creation of the VC underlying the flow and the
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
status := &serverStatus{index: index, server: server}
var span vtrace.Span
ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
span.Annotatef("address:%v", server)
defer func() {
ch <- status
span.Finish()
}()
suberr := func(err error) *verror.SubErr {
return &verror.SubErr{
Name: suberrName(server, name, method),
Err: err,
Options: verror.Print,
}
}
address, suffix := naming.SplitAddressName(server)
if len(address) == 0 {
status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
return
}
status.suffix = suffix
ep, err := inaming.NewEndpoint(address)
if err != nil {
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
status.serverErr.Name = suberrName(server, name, method)
vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
return
}
// Authorize the remote end of the flow using the provided authorizer.
if status.flow.LocalPrincipal() == nil {
// LocalPrincipal is nil which means we are operating under
// SecurityNone.
return
}
seccall := security.NewCall(&security.CallParams{
LocalPrincipal: status.flow.LocalPrincipal(),
LocalBlessings: status.flow.LocalBlessings(),
RemoteBlessings: status.flow.RemoteBlessings(),
LocalEndpoint: status.flow.LocalEndpoint(),
RemoteEndpoint: status.flow.RemoteEndpoint(),
RemoteDischarges: status.flow.RemoteDischarges(),
Method: method,
Suffix: status.suffix,
})
if err := auth.Authorize(ctx, seccall); err != nil {
// We will test for errServerAuthorizeFailed in failedTryCall and report
// verror.ErrNotTrusted
status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
status.flow.Close()
status.flow = nil
return
}
status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
return
}
// tryCall makes a single attempt at a call. It may connect to multiple servers
// (all that serve "name"), but will invoke the method on at most one of them
// (the server running on the most preferred protcol and network amongst all
// the servers that were successfully connected to and authorized).
// if requireResolve is true on return, then we shouldn't bother retrying unless
// you can re-resolve.
func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
var resolved *naming.MountEntry
var blessingPattern security.BlessingPattern
blessingPattern, name = security.SplitPatternName(name)
if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
// We always return NoServers as the error so that the caller knows
// that's ok to retry the operation since the name may be registered
// in the near future.
switch {
case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
case verror.ErrorID(err) == verror.ErrNoServers.ID:
// Avoid wrapping errors unnecessarily.
return nil, verror.NoRetry, false, err
default:
return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
}
} else {
if len(resolved.Servers) == 0 {
// This should never happen.
return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
}
// An empty set of protocols means all protocols...
if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
}
}
// We need to ensure calls to v23 factory methods do not occur during runtime
// initialization. Currently, the agent, which uses SecurityNone, is the only caller
// during runtime initialization. We would like to set the principal in the context
// to nil if we are running in SecurityNone, but this always results in a panic since
// the agent client would trigger the call v23.WithPrincipal during runtime
// initialization. So, we gate the call to v23.GetPrincipal instead since the agent
// client will have callEncrypted == false.
// Potential solutions to this are:
// (1) Create a separate client for the agent so that this code doesn't have to
// account for its use during runtime initialization.
// (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
// on here.
var principal security.Principal
if callEncrypted(opts) {
if principal = v23.GetPrincipal(ctx); principal == nil {
return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
}
}
// servers is now ordered by the priority heurestic implemented in
// filterAndOrderServers.
//
// Try to connect to all servers in parallel. Provide sufficient
// buffering for all of the connections to finish instantaneously. This
// is important because we want to process the responses in priority
// order; that order is indicated by the order of entries in servers.
// So, if two respones come in at the same 'instant', we prefer the
// first in the resolved.Servers)
attempts := len(resolved.Servers)
responses := make([]*serverStatus, attempts)
ch := make(chan *serverStatus, attempts)
vcOpts := append(getVCOpts(opts), c.vcOpts...)
authorizer := newServerAuthorizer(blessingPattern, opts...)
for i, server := range resolved.Names() {
// Create a copy of vcOpts for each call to tryCreateFlow
// to avoid concurrent tryCreateFlows from stepping on each
// other while manipulating their copy of the options.
vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
copy(vcOptsCopy, vcOpts)
go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
}
var timeoutChan <-chan time.Time
if deadline, ok := ctx.Deadline(); ok {
timeoutChan = time.After(deadline.Sub(time.Now()))
}
for {
// Block for at least one new response from the server, or the timeout.
select {
case r := <-ch:
responses[r.index] = r
// Read as many more responses as we can without blocking.
LoopNonBlocking:
for {
select {
default:
break LoopNonBlocking
case r := <-ch:
responses[r.index] = r
}
}
case <-timeoutChan:
vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
if verror.ErrorID(err) != verror.ErrTimeout.ID {
return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
}
return nil, verror.NoRetry, false, err
}
dc := c.dc
if shouldNotFetchDischarges(opts) {
dc = nil
}
// Process new responses, in priority order.
numResponses := 0
for _, r := range responses {
if r != nil {
numResponses++
}
if r == nil || r.flow == nil {
continue
}
doneChan := ctx.Done()
r.flow.SetDeadline(doneChan)
fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
if err != nil {
return nil, verror.NoRetry, false, err
}
if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
r.serverErr = &verror.SubErr{
Name: suberrName(r.server, name, method),
Options: verror.Print,
Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
}
vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
r.flow.Close()
r.flow = nil
continue
}
// This is the 'point of no return'; once the RPC is started (fc.start
// below) we can't be sure if it makes it to the server or not so, this
// code will never call fc.start more than once to ensure that we provide
// 'at-most-once' rpc semantics at this level. Retrying the network
// connections (i.e. creating flows) is fine since we can cleanup that
// state if we abort a call (i.e. close the flow).
//
// We must ensure that all flows other than r.flow are closed.
//
// TODO(cnicolaou): all errors below are marked as NoRetry
// because we want to provide at-most-once rpc semantics so
// we only ever attempt an RPC once. In the future, we'll cache
// responses on the server and then we can retry in-flight
// RPCs.
go cleanupTryCall(r, responses, ch)
if doneChan != nil {
go func() {
select {
case <-doneChan:
vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
fc.flow.Cancel()
case <-fc.flow.Closed():
}
}()
}
deadline, _ := ctx.Deadline()
if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
return nil, verror.NoRetry, false, verr
}
return fc, verror.NoRetry, false, nil
}
if numResponses == len(responses) {
return c.failedTryCall(ctx, name, method, responses, ch)
}
}
}
// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
// goroutines, and have closed the flow from each one except skip. This is a
// blocking function; it should be called in its own goroutine.
func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
numPending := 0
for _, r := range responses {
switch {
case r == nil:
// The response hasn't arrived yet.
numPending++
case r == skip || r.flow == nil:
// Either we should skip this flow, or we've closed the flow for this
// response already; nothing more to do.
default:
// We received the response, but haven't closed the flow yet.
r.flow.Close()
}
}
// Now we just need to wait for the pending responses and close their flows.
for i := 0; i < numPending; i++ {
if r := <-ch; r.flow != nil {
r.flow.Close()
}
}
}
// failedTryCall performs asynchronous cleanup for tryCall, and returns an
// appropriate error from the responses we've already received. All parallel
// calls in tryCall failed or we timed out if we get here.
func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
go cleanupTryCall(nil, responses, ch)
c.ns.FlushCacheEntry(name)
suberrs := []verror.SubErr{}
topLevelError := verror.ErrNoServers
topLevelAction := verror.RetryRefetch
onlyErrNetwork := true
for _, r := range responses {
if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
switch verror.ErrorID(r.serverErr.Err) {
case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
topLevelError = verror.ErrNotTrusted
topLevelAction = verror.NoRetry
onlyErrNetwork = false
case stream.ErrAborted.ID, stream.ErrNetwork.ID:
// do nothing
default:
onlyErrNetwork = false
}
suberrs = append(suberrs, *r.serverErr)
}
}
if onlyErrNetwork {
// If we only encountered network errors, then report ErrBadProtocol.
topLevelError = verror.ErrBadProtocol
}
// TODO(cnicolaou): we get system errors for things like dialing using
// the 'ws' protocol which can never succeed even if we retry the connection,
// hence we return RetryRefetch below except for the case where the servers
// are not trusted, in case there's no point in retrying at all.
// TODO(cnicolaou): implementing at-most-once rpc semantics in the future
// will require thinking through all of the cases where the RPC can
// be retried by the client whilst it's actually being executed on the
// server.
return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
}
// prepareBlessingsAndDischarges prepares blessings and discharges for
// the call.
//
// This includes: (1) preparing blessings that must be granted to the
// server, (2) preparing blessings that the client authenticates with,
// and, (3) preparing any discharges for third-party caveats on the client's
// blessings.
func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error {
// LocalPrincipal is nil which means we are operating under
// SecurityNone.
if fc.flow.LocalPrincipal() == nil {
return nil
}
// Fetch blessings from the client's blessing store that are to be
// shared with the server.
if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() {
// TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings
// to the server.
return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings)
}
// Fetch any discharges for third-party caveats on the client's blessings.
if !fc.blessings.IsZero() && fc.dc != nil {
impetus, err := mkDischargeImpetus(fc.server, method, args)
if err != nil {
return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err))
}
fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
}
// Prepare blessings that must be granted to the server (using any
// rpc.Granter implementation in 'opts').
//
// NOTE(ataly, suharshs): Before invoking the granter, we set the parameters
// of the current call. The user can now retrieve the principal via
// v23.GetPrincipal(ctx), or via call.LocalPrincipal(). While in theory the
// two principals can be different, the flow.LocalPrincipal == nil check at
// the beginning of this method ensures that the two are the same and non-nil
// at this point in the code.
ldischargeMap := make(map[string]security.Discharge)
for _, d := range fc.discharges {
ldischargeMap[d.ID()] = d
}
seccall := security.NewCall(&security.CallParams{
LocalPrincipal: fc.flow.LocalPrincipal(),
LocalBlessings: fc.blessings,
RemoteBlessings: fc.flow.RemoteBlessings(),
LocalEndpoint: fc.flow.LocalEndpoint(),
RemoteEndpoint: fc.flow.RemoteEndpoint(),
LocalDischarges: ldischargeMap,
RemoteDischarges: fc.flow.RemoteDischarges(),
Method: method,
Suffix: suffix,
})
if err := fc.prepareGrantedBlessings(ctx, seccall, opts); err != nil {
return err
}
return nil
}
func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, call security.Call, opts []rpc.CallOpt) error {
for _, o := range opts {
switch v := o.(type) {
case rpc.Granter:
if b, err := v.Grant(ctx, call); err != nil {
return verror.New(errBlessingGrant, fc.ctx, err)
} else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
return verror.New(errBlessingAdd, fc.ctx, err)
}
}
}
return nil
}
func (c *client) Close() {
defer vlog.LogCall()()
c.vcMapMu.Lock()
for _, v := range c.vcMap {
c.streamMgr.ShutdownEndpoint(v.remoteEP)
}
c.vcMap = nil
c.vcMapMu.Unlock()
}
// flowClient implements the RPC client-side protocol for a single RPC, over a
// flow that's already connected to the server.
type flowClient struct {
ctx *context.T // context to annotate with call details
dec *vom.Decoder // to decode responses and results from the server
enc *vom.Encoder // to encode requests and args to the server
server []string // Blessings bound to the server that authorize it to receive the RPC request from the client.
flow stream.Flow // the underlying flow
response rpc.Response // each decoded response message is kept here
discharges []security.Discharge // discharges used for this request
dc vc.DischargeClient // client-global discharge-client
blessings security.Blessings // the local blessings for the current RPC.
grantedBlessings security.Blessings // the blessings granted to the server.
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
finished bool // has Finish() already been called?
}
var _ rpc.ClientCall = (*flowClient)(nil)
var _ rpc.Stream = (*flowClient)(nil)
func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) {
fc := &flowClient{
ctx: ctx,
flow: flow,
server: server,
dc: dc,
}
var err error
typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
if typeenc == nil {
if fc.enc, err = vom.NewEncoder(flow); err != nil {
// In practice, this will never fail because of a networking
// problem since the encoder writes the 'magic byte' which
// will be buffered and not written to the network immediately.
berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
if fc.dec, err = vom.NewDecoder(flow); err != nil {
berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
} else {
if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
return nil, fc.close(berr)
}
}
return fc, nil
}
func (fc *flowClient) close(err error) error {
subErr := verror.SubErr{Err: err, Options: verror.Print}
subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
if cerr := fc.flow.Close(); cerr != nil && err == nil {
return verror.New(verror.ErrInternal, fc.ctx, subErr)
}
switch verror.ErrorID(err) {
case verror.ErrBadProtocol.ID, errRequestEncoding.ID, errArgEncoding.ID:
switch fc.ctx.Err() {
case context.DeadlineExceeded:
timeout := verror.New(verror.ErrTimeout, fc.ctx)
err := verror.AddSubErrs(timeout, fc.ctx, subErr)
return err
case context.Canceled:
canceled := verror.New(verror.ErrCanceled, fc.ctx)
err := verror.AddSubErrs(canceled, fc.ctx, subErr)
return err
}
case errVomEncoder.ID, errVomDecoder.ID:
badProtocol := verror.New(verror.ErrBadProtocol, fc.ctx)
err = verror.AddSubErrs(badProtocol, fc.ctx, subErr)
case verror.ErrTimeout.ID:
// Canceled trumps timeout.
if fc.ctx.Err() == context.Canceled {
return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr)
}
}
return err
}
func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
// Encode the Blessings information for the client to authorize the flow.
var blessingsRequest rpc.BlessingsRequest
if fc.flow.LocalPrincipal() != nil {
blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
}
req := rpc.Request{
Suffix: suffix,
Method: method,
NumPosArgs: uint64(len(args)),
Deadline: vtime.Deadline{deadline},
GrantedBlessings: fc.grantedBlessings,
Blessings: blessingsRequest,
Discharges: fc.discharges,
TraceRequest: vtrace.GetRequest(fc.ctx),
Language: string(i18n.GetLangID(fc.ctx)),
}
if err := fc.enc.Encode(req); err != nil {
berr := verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err)
return fc.close(berr)
}
for ix, arg := range args {
if err := fc.enc.Encode(arg); err != nil {
berr := verror.New(errArgEncoding, fc.ctx, ix, err)
return fc.close(berr)
}
}
return nil
}
func (fc *flowClient) Send(item interface{}) error {
defer vlog.LogCall()()
if fc.sendClosed {
return verror.New(verror.ErrAborted, fc.ctx)
}
// The empty request header indicates what follows is a streaming arg.
if err := fc.enc.Encode(rpc.Request{}); err != nil {
berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err))
return fc.close(berr)
}
if err := fc.enc.Encode(item); err != nil {
berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, -1, err))
return fc.close(berr)
}
return nil
}
func decodeNetError(ctx *context.T, err error) verror.IDAction {
if neterr, ok := err.(net.Error); ok {
if neterr.Timeout() || neterr.Temporary() {
// If a read is canceled in the lower levels we see
// a timeout error - see readLocked in vc/reader.go
if ctx.Err() == context.Canceled {
return verror.ErrCanceled
}
return verror.ErrTimeout
}
}
return verror.ErrBadProtocol
}
func (fc *flowClient) Recv(itemptr interface{}) error {
defer vlog.LogCall()()
switch {
case fc.response.Error != nil:
// TODO(cnicolaou): this will become a verror.E when we convert the
// server.
return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
case fc.response.EndStreamResults:
return io.EOF
}
// Decode the response header and handle errors and EOF.
if err := fc.dec.Decode(&fc.response); err != nil {
berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
return fc.close(berr)
}
if fc.response.Error != nil {
// TODO(cnicolaou): this will become a verror.E when we convert the
// server.
return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
}
if fc.response.EndStreamResults {
// Return EOF to indicate to the caller that there are no more stream
// results. Any error sent by the server is kept in fc.response.Error, and
// returned to the user in Finish.
return io.EOF
}
// Decode the streaming result.
if err := fc.dec.Decode(itemptr); err != nil {
berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
// TODO(cnicolaou): should we be caching this?
fc.response.Error = berr
return fc.close(berr)
}
return nil
}
func (fc *flowClient) CloseSend() error {
defer vlog.LogCall()()
return fc.closeSend()
}
// closeSend ensures CloseSend always returns verror.E.
func (fc *flowClient) closeSend() error {
fc.sendClosedMu.Lock()
defer fc.sendClosedMu.Unlock()
if fc.sendClosed {
return nil
}
if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil {
// TODO(caprita): Indiscriminately closing the flow below causes
// a race as described in:
// https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
//
// There should be a finer grained way to fix this (for example,
// encoding errors should probably still result in closing the
// flow); on the flip side, there may exist other instances
// where we are closing the flow but should not.
//
// For now, commenting out the line below removes the flakiness
// from our existing unit tests, but this needs to be revisited
// and fixed correctly.
//
// return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err))
}
fc.sendClosed = true
return nil
}
func (fc *flowClient) Finish(resultptrs ...interface{}) error {
defer vlog.LogCall()()
err := fc.finish(resultptrs...)
vtrace.GetSpan(fc.ctx).Finish()
return err
}
// finish ensures Finish always returns a verror.E.
func (fc *flowClient) finish(resultptrs ...interface{}) error {
if fc.finished {
err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
}
fc.finished = true
// Call closeSend implicitly, if the user hasn't already called it. There are
// three cases:
// 1) Server is blocked on Recv waiting for the final request message.
// 2) Server has already finished processing, the final response message and
// out args are queued up on the client, and the flow is closed.
// 3) Between 1 and 2: the server isn't blocked on Recv, but the final
// response and args aren't queued up yet, and the flow isn't closed.
//
// We must call closeSend to handle case (1) and unblock the server; otherwise
// we'll deadlock with both client and server waiting for each other. We must
// ignore the error (if any) to handle case (2). In that case the flow is
// closed, meaning writes will fail and reads will succeed, and closeSend will
// always return an error. But this isn't a "real" error; the client should
// read the rest of the results and succeed.
_ = fc.closeSend()
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
return fc.close(berr)
}
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRemainingStreamResults, fc.ctx))
return fc.close(berr)
}
}
if fc.response.AckBlessings {
clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
}
// Incorporate any VTrace info that was returned.
vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
if fc.response.Error != nil {
id := verror.ErrorID(fc.response.Error)
if id == verror.ErrNoAccess.ID && fc.dc != nil {
// In case the error was caused by a bad discharge, we do not want to get stuck
// with retrying again and again with this discharge. As there is no direct way
// to detect it, we conservatively flush all discharges we used from the cache.
// TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
fc.dc.Invalidate(fc.discharges...)
}
if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
}
return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
}
if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
return fc.close(berr)
}
for ix, r := range resultptrs {
if err := fc.dec.Decode(r); err != nil {
berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, err))
return fc.close(berr)
}
}
return fc.close(nil)
}
func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
return fc.server, fc.flow.RemoteBlessings()
}
func bpatterns(patterns []string) []security.BlessingPattern {
if patterns == nil {
return nil
}
bpatterns := make([]security.BlessingPattern, len(patterns))
for i, p := range patterns {
bpatterns[i] = security.BlessingPattern(p)
}
return bpatterns
}