blob: 0e9c17166b55b0203eb8db3ebd6ac99e360456e5 [file] [log] [blame]
package ipc
import (
"fmt"
"io"
"math"
"math/rand"
"strings"
"sync"
"time"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
isecurity "veyron.io/veyron/veyron/runtimes/google/security"
"veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
)
var (
errNoServers = verror.NoExistf("ipc: no servers")
errFlowClosed = verror.Abortedf("ipc: flow closed")
errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
)
type client struct {
streamMgr stream.Manager
ns naming.Namespace
vcOpts []stream.VCOpt // vc opts passed to dial
// We support concurrent calls to StartCall and Close, so we must protect the
// vcMap. Everything else is initialized upon client construction, and safe
// to use concurrently.
vcMapMu sync.Mutex
// TODO(ashankar): The key should be a function of the blessings shared with the server?
vcMap map[string]*vcInfo // map key is endpoint.String
dischargeCache dischargeCache
}
type vcInfo struct {
vc stream.VC
remoteEP naming.Endpoint
}
func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
c := &client{
streamMgr: streamMgr,
ns: ns,
vcMap: make(map[string]*vcInfo),
dischargeCache: dischargeCache{cache: make(map[string]security.Discharge)},
}
for _, opt := range opts {
// Collect all client opts that are also vc opts.
if vcOpt, ok := opt.(stream.VCOpt); ok {
c.vcOpts = append(c.vcOpts, vcOpt)
}
}
return c, nil
}
func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
if flow, err := vcinfo.vc.Connect(); err == nil {
return flow, nil
}
// If the vc fails to establish a new flow, we assume it's
// broken, remove it from the map, and proceed to establishing
// a new vc.
// TODO(caprita): Should we distinguish errors due to vc being
// closed from other errors? If not, should we call vc.Close()
// before removing the vc from the map?
delete(c.vcMap, ep.String())
}
vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
if err != nil {
return nil, err
}
c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
return vc.Connect()
}
// connectFlow parses an endpoint and a suffix out of the server and establishes
// a flow to the endpoint, returning the parsed suffix.
// The server name passed in should be a rooted name, of the form "/ep/suffix" or
// "/ep//suffix", or just "/ep".
func (c *client) connectFlow(server string) (stream.Flow, string, error) {
address, suffix := naming.SplitAddressName(server)
if len(address) == 0 {
return nil, "", errNonRootedName
}
ep, err := inaming.NewEndpoint(address)
if err != nil {
return nil, "", err
}
if err = version.CheckCompatibility(ep); err != nil {
return nil, "", err
}
flow, err := c.createFlow(ep)
if err != nil {
return nil, "", err
}
return flow, suffix, nil
}
// A randomized exponential backoff. The randomness deters error convoys from forming.
func backoff(n int, deadline time.Time) bool {
b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
if b > maxBackoff {
b = maxBackoff
}
r := deadline.Sub(time.Now())
if b > r {
// We need to leave a little time for the call to start or
// we'll just timeout in startCall before we actually do
// anything. If we just have a millisecond left, give up.
if r <= time.Millisecond {
return false
}
b = r - time.Millisecond
}
time.Sleep(b)
return true
}
// TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack.
func retriable(err error) bool {
e := err.Error()
// Authentication errors are permanent.
if strings.Contains(e, "authorized") {
return false
}
// Resolution errors are retriable.
if strings.Contains(e, "ipc: Resolve") {
return true
}
// Kernel level errors are retriable.
if strings.Contains(e, "errno") {
return true
}
// Connection refused is retriable.
if strings.Contains(e, "connection refused") {
return true
}
return false
}
func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
for _, o := range opts {
if r, ok := o.(veyron2.RetryTimeoutOpt); ok {
return time.Duration(r), true
}
}
return 0, false
}
func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
defer vlog.LogCall()()
// Context specified deadline.
deadline, hasDeadline := ctx.Deadline()
if !hasDeadline {
// Default deadline.
deadline = time.Now().Add(defaultCallTimeout)
}
if r, ok := getRetryTimeoutOpt(opts); ok {
// Caller specified deadline.
deadline = time.Now().Add(time.Duration(r))
}
var lastErr verror.E
for retries := 0; ; retries++ {
if retries != 0 {
if !backoff(retries, deadline) {
break
}
}
call, err := c.startCall(ctx, name, method, args, opts...)
if err == nil {
return call, nil
}
lastErr = err
if time.Now().After(deadline) || !retriable(err) {
break
}
}
return nil, lastErr
}
func getNoResolveOpt(opts []ipc.CallOpt) bool {
for _, o := range opts {
if r, ok := o.(veyron2.NoResolveOpt); ok {
return bool(r)
}
}
return false
}
// startCall ensures StartCall always returns verror.E.
func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) {
if ctx == nil {
return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
}
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method))
// Resolve name unless told not to.
var servers []string
if getNoResolveOpt(opts) {
servers = []string{name}
} else {
var err error
if servers, err = c.ns.Resolve(ctx, name); err != nil {
return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
}
}
// Try all servers, and if none of them are authorized for the call then return the error of the last server
// that was tried.
var lastErr verror.E
for _, server := range servers {
flow, suffix, err := c.connectFlow(server)
if err != nil {
lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err)
vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
continue // Try the next server.
}
flow.SetDeadline(ctx.Done())
// Validate caveats on the server's identity for the context associated with this call.
serverB, grantedB, err := c.authorizeServer(flow, name, suffix, method, opts)
if err != nil {
lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
flow.Close()
continue
}
// Fetch any discharges for third-party caveats on the client's blessings.
var discharges []security.Discharge
if self := flow.LocalBlessings(); self != nil {
if tpcavs := self.ThirdPartyCaveats(); len(tpcavs) > 0 {
discharges = c.prepareDischarges(ctx, tpcavs, mkDischargeImpetus(serverB, method, args), opts)
}
}
lastErr = nil
fc := newFlowClient(ctx, serverB, flow, &c.dischargeCache, discharges)
if doneChan := ctx.Done(); doneChan != nil {
go func() {
select {
case <-ctx.Done():
fc.Cancel()
case <-fc.flow.Closed():
}
}()
}
timeout := time.Duration(ipc.NoTimeout)
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
timeout = deadline.Sub(time.Now())
}
if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
return nil, verr
}
return fc, nil
}
if lastErr != nil {
// If there was any problem starting the call, flush the cache entry under the
// assumption that it was caused by stale data.
c.ns.FlushCacheEntry(name)
return nil, lastErr
}
return nil, errNoServers
}
// authorizeServer validates that the server (remote end of flow) has the credentials to serve
// the RPC name.method for the client (local end of the flow). It returns the blessings at the
// server that are authorized for this purpose and any blessings that are to be granted to
// the server (via ipc.Granter implementations in opts.)
func (c *client) authorizeServer(flow stream.Flow, name, suffix, method string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) {
if flow.RemoteID() == nil && flow.RemoteBlessings() == nil {
return nil, nil, fmt.Errorf("server has not presented any blessings")
}
authctx := isecurity.NewContext(isecurity.ContextArgs{
LocalID: flow.LocalID(),
RemoteID: flow.RemoteID(),
Debug: "ClientAuthorizingServer",
LocalPrincipal: flow.LocalPrincipal(),
LocalBlessings: flow.LocalBlessings(),
RemoteBlessings: flow.RemoteBlessings(),
/* TODO(ashankar,ataly): Uncomment this! This is disabled till the hack to skip third-party caveat
validation on a server's blessings are disabled. Commenting out the next three lines affects more
than third-party caveats, so yeah, have to remove this soon!
Method: method,
Name: name,
Suffix: suffix, */
LocalEndpoint: flow.LocalEndpoint(),
RemoteEndpoint: flow.RemoteEndpoint(),
})
if serverID := flow.RemoteID(); flow.RemoteBlessings() == nil && serverID != nil {
serverID, err = serverID.Authorize(authctx)
if err != nil {
return nil, nil, err
}
serverBlessings = serverID.Names()
}
if server := flow.RemoteBlessings(); server != nil {
serverBlessings = server.ForContext(authctx)
}
for _, o := range opts {
switch v := o.(type) {
case veyron2.RemoteID:
if !security.BlessingPattern(v).MatchedBy(serverBlessings...) {
return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, v)
}
case ipc.Granter:
if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err)
} else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err)
}
}
}
return serverBlessings, grantedBlessings, nil
}
func (c *client) Close() {
defer vlog.LogCall()()
c.vcMapMu.Lock()
for _, v := range c.vcMap {
c.streamMgr.ShutdownEndpoint(v.remoteEP)
}
c.vcMap = nil
c.vcMapMu.Unlock()
}
// IPCBindOpt makes client implement BindOpt.
func (c *client) IPCBindOpt() {
//nologcall
}
var _ ipc.BindOpt = (*client)(nil)
// flowClient implements the RPC client-side protocol for a single RPC, over a
// flow that's already connected to the server.
type flowClient struct {
ctx context.T // context to annotate with call details
dec *vom.Decoder // to decode responses and results from the server
enc *vom.Encoder // to encode requests and args to the server
server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
flow stream.Flow // the underlying flow
response ipc.Response // each decoded response message is kept here
discharges []security.Discharge // discharges used for this request
dischargeCache *dischargeCache // client-global discharge cache reference type
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
finished bool // has Finish() already been called?
}
func newFlowClient(ctx context.T, server []string, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient {
return &flowClient{
ctx: ctx,
dec: vom.NewDecoder(flow),
enc: vom.NewEncoder(flow),
server: server,
flow: flow,
discharges: discharges,
dischargeCache: dischargeCache,
}
}
func (fc *flowClient) close(verr verror.E) verror.E {
if err := fc.flow.Close(); err != nil && verr == nil {
verr = verror.Internalf("ipc: flow close failed: %v", err)
}
return verr
}
func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E {
req := ipc.Request{
Suffix: suffix,
Method: method,
NumPosArgs: uint64(len(args)),
Timeout: int64(timeout),
GrantedBlessings: security.MarshalBlessings(blessings),
NumDischarges: uint64(len(fc.discharges)),
TraceRequest: vtrace.Request(fc.ctx),
}
if err := fc.enc.Encode(req); err != nil {
return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
}
for _, d := range fc.discharges {
if err := fc.enc.Encode(d); err != nil {
return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err))
}
}
for ix, arg := range args {
if err := fc.enc.Encode(arg); err != nil {
return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
}
}
return nil
}
func (fc *flowClient) Send(item interface{}) error {
defer vlog.LogCall()()
if fc.sendClosed {
return errFlowClosed
}
// The empty request header indicates what follows is a streaming arg.
if err := fc.enc.Encode(ipc.Request{}); err != nil {
return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
}
if err := fc.enc.Encode(item); err != nil {
return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
}
return nil
}
func (fc *flowClient) Recv(itemptr interface{}) error {
defer vlog.LogCall()()
switch {
case fc.response.Error != nil:
return fc.response.Error
case fc.response.EndStreamResults:
return io.EOF
}
// Decode the response header and handle errors and EOF.
if err := fc.dec.Decode(&fc.response); err != nil {
return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
}
if fc.response.Error != nil {
return fc.response.Error
}
if fc.response.EndStreamResults {
// Return EOF to indicate to the caller that there are no more stream
// results. Any error sent by the server is kept in fc.response.Error, and
// returned to the user in Finish.
return io.EOF
}
// Decode the streaming result.
if err := fc.dec.Decode(itemptr); err != nil {
return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err))
}
return nil
}
func (fc *flowClient) CloseSend() error {
defer vlog.LogCall()()
return fc.closeSend()
}
// closeSend ensures CloseSend always returns verror.E.
func (fc *flowClient) closeSend() verror.E {
fc.sendClosedMu.Lock()
defer fc.sendClosedMu.Unlock()
if fc.sendClosed {
return nil
}
if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
// TODO(caprita): Indiscriminately closing the flow below causes
// a race as described in:
// https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
//
// There should be a finer grained way to fix this (for example,
// encoding errors should probably still result in closing the
// flow); on the flip side, there may exist other instances
// where we are closing the flow but should not.
//
// For now, commenting out the line below removes the flakiness
// from our existing unit tests, but this needs to be revisited
// and fixed correctly.
//
// return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
}
fc.sendClosed = true
return nil
}
func (fc *flowClient) Finish(resultptrs ...interface{}) error {
defer vlog.LogCall()()
err := fc.finish(resultptrs...)
vtrace.FromContext(fc.ctx).Annotate("Finished")
return err
}
// finish ensures Finish always returns verror.E.
func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
if fc.finished {
return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed"))
}
fc.finished = true
// Call closeSend implicitly, if the user hasn't already called it. There are
// three cases:
// 1) Server is blocked on Recv waiting for the final request message.
// 2) Server has already finished processing, the final response message and
// out args are queued up on the client, and the flow is closed.
// 3) Between 1 and 2: the server isn't blocked on Recv, but the final
// response and args aren't queued up yet, and the flow isn't closed.
//
// We must call closeSend to handle case (1) and unblock the server; otherwise
// we'll deadlock with both client and server waiting for each other. We must
// ignore the error (if any) to handle case (2). In that case the flow is
// closed, meaning writes will fail and reads will succeed, and closeSend will
// always return an error. But this isn't a "real" error; the client should
// read the rest of the results and succeed.
_ = fc.closeSend()
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
}
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
return fc.close(errRemainingStreamResults)
}
}
// Incorporate any VTrace info that was returned.
vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
if fc.response.Error != nil {
if verror.Is(fc.response.Error, verror.NoAccess) && fc.dischargeCache != nil {
// In case the error was caused by a bad discharge, we do not want to get stuck
// with retrying again and again with this discharge. As there is no direct way
// to detect it, we conservatively flush all discharges we used from the cache.
// TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
fc.dischargeCache.Invalidate(fc.discharges...)
}
return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
}
if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs))
}
for ix, r := range resultptrs {
if err := fc.dec.Decode(r); err != nil {
return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err))
}
}
return fc.close(nil)
}
func (fc *flowClient) Cancel() {
defer vlog.LogCall()()
vtrace.FromContext(fc.ctx).Annotate("Cancelled")
fc.flow.Cancel()
}
func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
return fc.server, fc.flow.RemoteBlessings()
}