Jiri Simsa | d7616c9 | 2015-03-24 23:44:30 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 5 | package rpc |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 6 | |
| 7 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 8 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 9 | "io" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 10 | "math/rand" |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 11 | "net" |
Suharsh Sivakumar | 67ef84a | 2015-02-13 13:04:44 -0800 | [diff] [blame] | 12 | "reflect" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 13 | "sync" |
| 14 | "time" |
| 15 | |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 16 | "v.io/x/lib/vlog" |
| 17 | |
Suharsh Sivakumar | 2ad4e10 | 2015-03-17 21:23:37 -0700 | [diff] [blame] | 18 | "v.io/v23" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 19 | "v.io/v23/context" |
| 20 | "v.io/v23/i18n" |
Todd Wang | 5082a55 | 2015-04-02 10:56:11 -0700 | [diff] [blame] | 21 | "v.io/v23/namespace" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 22 | "v.io/v23/naming" |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 23 | "v.io/v23/rpc" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 24 | "v.io/v23/security" |
| 25 | "v.io/v23/vdl" |
Todd Wang | f6a0688 | 2015-02-27 17:38:01 -0800 | [diff] [blame] | 26 | vtime "v.io/v23/vdlroot/time" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 27 | "v.io/v23/verror" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 28 | "v.io/v23/vom" |
| 29 | "v.io/v23/vtrace" |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 30 | |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 31 | "v.io/x/ref/lib/apilog" |
Suharsh Sivakumar | dcc11d7 | 2015-05-11 12:19:20 -0700 | [diff] [blame] | 32 | inaming "v.io/x/ref/runtime/internal/naming" |
| 33 | "v.io/x/ref/runtime/internal/rpc/stream" |
| 34 | "v.io/x/ref/runtime/internal/rpc/stream/vc" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 35 | ) |
| 36 | |
Suharsh Sivakumar | dcc11d7 | 2015-05-11 12:19:20 -0700 | [diff] [blame] | 37 | const pkgPath = "v.io/x/ref/runtime/internal/rpc" |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 38 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 39 | func reg(id, msg string) verror.IDAction { |
| 40 | // Note: the error action is never used and is instead computed |
| 41 | // at a higher level. The errors here are purely for informational |
| 42 | // purposes. |
| 43 | return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg) |
| 44 | } |
| 45 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 46 | var ( |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 47 | // These errors are intended to be used as arguments to higher |
| 48 | // level errors and hence {1}{2} is omitted from their format |
| 49 | // strings to avoid repeating these n-times in the final error |
| 50 | // message visible to the user. |
| 51 | errClientCloseAlreadyCalled = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called") |
| 52 | errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called") |
| 53 | errNonRootedName = reg(".errNonRootedName", "{3} does not appear to contain an address") |
| 54 | errInvalidEndpoint = reg(".errInvalidEndpoint", "failed to parse endpoint") |
| 55 | errIncompatibleEndpoint = reg(".errIncompatibleEndpoint", "incompatible endpoint") |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 56 | errRequestEncoding = reg(".errRequestEncoding", "failed to encode request {3}{:4}") |
| 57 | errDischargeEncoding = reg(".errDischargeEncoding", "failed to encode discharges {:3}") |
| 58 | errBlessingEncoding = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}") |
| 59 | errArgEncoding = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}") |
| 60 | errMismatchedResults = reg(".errMismatchedResults", "got {3} results, but want {4}") |
| 61 | errResultDecoding = reg(".errResultDecoding", "failed to decode result #{3}{:4}") |
| 62 | errResponseDecoding = reg(".errResponseDecoding", "failed to decode response{:3}") |
| 63 | errRemainingStreamResults = reg(".errRemaingStreamResults", "stream closed with remaining stream results") |
| 64 | errNoBlessingsForPeer = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}") |
| 65 | errBlessingGrant = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}") |
| 66 | errBlessingAdd = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}") |
Suharsh Sivakumar | 4c7e0b7 | 2015-04-24 16:57:13 -0700 | [diff] [blame] | 67 | errServerAuthorizeFailed = reg(".errServerAuthorizedFailed", "failed to authorize flow with remote blessings{:3} {:4}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 68 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 69 | errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 70 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 71 | errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}") |
| 72 | errNoPrincipal = reg(".errNoPrincipal", "principal required for secure connections") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 73 | ) |
| 74 | |
| 75 | type client struct { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 76 | streamMgr stream.Manager |
Todd Wang | 5082a55 | 2015-04-02 10:56:11 -0700 | [diff] [blame] | 77 | ns namespace.T |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 78 | vcOpts []stream.VCOpt // vc opts passed to dial |
| 79 | preferredProtocols []string |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 80 | |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame] | 81 | // We cache the IP networks on the device since it is not that cheap to read |
| 82 | // network interfaces through os syscall. |
| 83 | // TODO(jhahn): Add monitoring the network interface changes. |
| 84 | ipNets []*net.IPNet |
| 85 | |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 86 | vcCache *vc.VCCache |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 87 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 88 | dc vc.DischargeClient |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 89 | } |
| 90 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 91 | var _ rpc.Client = (*client)(nil) |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 92 | |
Todd Wang | 5082a55 | 2015-04-02 10:56:11 -0700 | [diff] [blame] | 93 | func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 94 | c := &client{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 95 | streamMgr: streamMgr, |
| 96 | ns: ns, |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame] | 97 | ipNets: ipNetworks(), |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 98 | vcCache: vc.NewVCCache(), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 99 | } |
Suharsh Sivakumar | 0891858 | 2015-03-03 15:16:36 -0800 | [diff] [blame] | 100 | c.dc = InternalNewDischargeClient(nil, c, 0) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 101 | for _, opt := range opts { |
| 102 | // Collect all client opts that are also vc opts. |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 103 | switch v := opt.(type) { |
| 104 | case stream.VCOpt: |
| 105 | c.vcOpts = append(c.vcOpts, v) |
Suharsh Sivakumar | d7a6519 | 2015-01-27 22:57:15 -0800 | [diff] [blame] | 106 | case PreferredProtocols: |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 107 | c.preferredProtocols = v |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 108 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 109 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 110 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 111 | return c, nil |
| 112 | } |
| 113 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 114 | func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 115 | suberr := func(err error) *verror.SubErr { |
| 116 | return &verror.SubErr{Err: err, Options: verror.Print} |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 117 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 118 | |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 119 | found, err := c.vcCache.ReservedFind(ep, principal) |
| 120 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 121 | return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx)) |
| 122 | } |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 123 | defer c.vcCache.Unreserve(ep, principal) |
| 124 | if found != nil { |
| 125 | // We are serializing the creation of all flows per VC. This is okay |
| 126 | // because if one flow creation is to block, it is likely that all others |
| 127 | // for that VC would block as well. |
| 128 | if flow, err := found.Connect(); err == nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 129 | return flow, nil |
| 130 | } |
| 131 | // If the vc fails to establish a new flow, we assume it's |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 132 | // broken, remove it from the cache, and proceed to establishing |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 133 | // a new vc. |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 134 | // |
| 135 | // TODO(suharshs): The decision to redial 1 time when the dialing the vc |
| 136 | // in the cache fails is a bit inconsistent with the behavior when a newly |
| 137 | // dialed vc.Connect fails. We should revisit this. |
| 138 | // |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 139 | // TODO(caprita): Should we distinguish errors due to vc being |
| 140 | // closed from other errors? If not, should we call vc.Close() |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 141 | // before removing the vc from the cache? |
| 142 | if err := c.vcCache.Delete(found); err != nil { |
| 143 | return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx)) |
| 144 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 145 | } |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 146 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 147 | sm := c.streamMgr |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 148 | v, err := sm.Dial(ep, principal, vcOpts...) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 149 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 150 | return nil, suberr(err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 151 | } |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 152 | |
| 153 | flow, err := v.Connect() |
| 154 | if err != nil { |
| 155 | return nil, suberr(err) |
| 156 | } |
| 157 | |
| 158 | if err := c.vcCache.Insert(v.(*vc.VC)); err != nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 159 | sm.ShutdownEndpoint(ep) |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 160 | return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx)) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 161 | } |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 162 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 163 | return flow, nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 164 | } |
| 165 | |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 166 | // A randomized exponential backoff. The randomness deters error convoys |
Matt Rosencrantz | 254d570 | 2015-04-01 09:47:38 -0700 | [diff] [blame] | 167 | // from forming. The first time you retry n should be 0, then 1 etc. |
| 168 | func backoff(n uint, deadline time.Time) bool { |
| 169 | // This is ((100 to 200) * 2^n) ms. |
| 170 | b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 171 | if b > maxBackoff { |
| 172 | b = maxBackoff |
| 173 | } |
| 174 | r := deadline.Sub(time.Now()) |
| 175 | if b > r { |
| 176 | // We need to leave a little time for the call to start or |
| 177 | // we'll just timeout in startCall before we actually do |
| 178 | // anything. If we just have a millisecond left, give up. |
| 179 | if r <= time.Millisecond { |
| 180 | return false |
| 181 | } |
| 182 | b = r - time.Millisecond |
| 183 | } |
| 184 | time.Sleep(b) |
| 185 | return true |
| 186 | } |
| 187 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 188 | func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 189 | defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,args=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 190 | return c.startCall(ctx, name, method, args, opts) |
| 191 | } |
| 192 | |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 193 | func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 194 | defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 195 | deadline := getDeadline(ctx, opts) |
| 196 | |
| 197 | var lastErr error |
| 198 | for retries := uint(0); ; retries++ { |
| 199 | call, err := c.startCall(ctx, name, method, inArgs, opts) |
| 200 | if err != nil { |
| 201 | return err |
| 202 | } |
| 203 | err = call.Finish(outArgs...) |
| 204 | if err == nil { |
| 205 | return nil |
| 206 | } |
| 207 | lastErr = err |
| 208 | // We only retry if RetryBackoff is returned by the application because other |
| 209 | // RetryConnection and RetryRefetch required actions by the client before |
| 210 | // retrying. |
| 211 | if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) { |
Suharsh Sivakumar | c1641f6 | 2015-04-10 13:09:22 -0700 | [diff] [blame] | 212 | vlog.VI(4).Infof("Cannot retry after error: %s", lastErr) |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 213 | break |
| 214 | } |
| 215 | if !backoff(retries, deadline) { |
| 216 | break |
| 217 | } |
Suharsh Sivakumar | c1641f6 | 2015-04-10 13:09:22 -0700 | [diff] [blame] | 218 | vlog.VI(4).Infof("Retrying due to error: %s", lastErr) |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 219 | } |
| 220 | return lastErr |
| 221 | } |
| 222 | |
| 223 | func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time { |
| 224 | // Context specified deadline. |
| 225 | deadline, hasDeadline := ctx.Deadline() |
| 226 | if !hasDeadline { |
| 227 | // Default deadline. |
| 228 | deadline = time.Now().Add(defaultCallTimeout) |
| 229 | } |
| 230 | if r, ok := getRetryTimeoutOpt(opts); ok { |
| 231 | // Caller specified deadline. |
| 232 | deadline = time.Now().Add(r) |
| 233 | } |
| 234 | return deadline |
| 235 | } |
| 236 | |
| 237 | func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool { |
| 238 | switch { |
| 239 | case noRetry(opts): |
| 240 | return false |
| 241 | case action != verror.RetryBackoff: |
| 242 | return false |
| 243 | case time.Now().After(deadline): |
| 244 | return false |
| 245 | } |
| 246 | return true |
| 247 | } |
| 248 | |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 249 | func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool { |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 250 | switch { |
| 251 | case noRetry(opts): |
| 252 | return false |
| 253 | case action != verror.RetryConnection && action != verror.RetryRefetch: |
| 254 | return false |
| 255 | case time.Now().After(deadline): |
| 256 | return false |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 257 | case requireResolve && getNoNamespaceOpt(opts): |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 258 | // If we're skipping resolution and there are no servers for |
| 259 | // this call retrying is not going to help, we can't come up |
| 260 | // with new servers if there is no resolution. |
| 261 | return false |
| 262 | } |
| 263 | return true |
| 264 | } |
| 265 | |
Todd Wang | b31da59 | 2015-02-20 12:50:39 -0800 | [diff] [blame] | 266 | func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) { |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 267 | var impetus security.DischargeImpetus |
| 268 | if len(serverBlessings) > 0 { |
| 269 | impetus.Server = make([]security.BlessingPattern, len(serverBlessings)) |
| 270 | for i, b := range serverBlessings { |
| 271 | impetus.Server[i] = security.BlessingPattern(b) |
| 272 | } |
| 273 | } |
| 274 | impetus.Method = method |
| 275 | if len(args) > 0 { |
Todd Wang | b31da59 | 2015-02-20 12:50:39 -0800 | [diff] [blame] | 276 | impetus.Arguments = make([]*vdl.Value, len(args)) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 277 | for i, a := range args { |
Todd Wang | b31da59 | 2015-02-20 12:50:39 -0800 | [diff] [blame] | 278 | vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a)) |
| 279 | if err != nil { |
| 280 | return security.DischargeImpetus{}, err |
| 281 | } |
| 282 | impetus.Arguments[i] = vArg |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 283 | } |
| 284 | } |
Todd Wang | b31da59 | 2015-02-20 12:50:39 -0800 | [diff] [blame] | 285 | return impetus, nil |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 286 | } |
| 287 | |
Mike Burrows | 2ec2bb3 | 2015-02-26 15:14:43 -0800 | [diff] [blame] | 288 | // startCall ensures StartCall always returns verror.E. |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 289 | func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) { |
Matt Rosencrantz | 4f8ac60 | 2014-12-29 14:42:48 -0800 | [diff] [blame] | 290 | if !ctx.Initialized() { |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 291 | return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized") |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 292 | } |
Todd Wang | ad49204 | 2015-04-17 15:58:40 -0700 | [diff] [blame] | 293 | ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method)) |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 294 | if err := canCreateServerAuthorizer(ctx, opts); err != nil { |
| 295 | return nil, verror.New(verror.ErrBadArg, ctx, err) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 296 | } |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 297 | |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 298 | deadline := getDeadline(ctx, opts) |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 299 | |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 300 | var lastErr error |
Matt Rosencrantz | 254d570 | 2015-04-01 09:47:38 -0700 | [diff] [blame] | 301 | for retries := uint(0); ; retries++ { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 302 | call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 303 | if err == nil { |
| 304 | return call, nil |
| 305 | } |
| 306 | lastErr = err |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 307 | if !shouldRetry(action, requireResolve, deadline, opts) { |
Matt Rosencrantz | abacd43 | 2014-11-24 10:44:31 -0800 | [diff] [blame] | 308 | span.Annotatef("Cannot retry after error: %s", err) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 309 | break |
| 310 | } |
Matt Rosencrantz | 254d570 | 2015-04-01 09:47:38 -0700 | [diff] [blame] | 311 | if !backoff(retries, deadline) { |
| 312 | break |
| 313 | } |
Suharsh Sivakumar | 076e953 | 2015-04-09 17:36:25 -0700 | [diff] [blame] | 314 | span.Annotatef("Retrying due to error: %s", err) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 315 | } |
| 316 | return nil, lastErr |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 317 | } |
| 318 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 319 | type serverStatus struct { |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 320 | index int |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 321 | server, suffix string |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 322 | flow stream.Flow |
| 323 | blessings []string // authorized server blessings |
| 324 | rejectedBlessings []security.RejectedBlessing // rejected server blessings |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 325 | serverErr *verror.SubErr |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 326 | } |
| 327 | |
Suharsh Sivakumar | 56bc5ee | 2015-04-16 13:46:11 -0700 | [diff] [blame] | 328 | func suberrName(server, name, method string) string { |
| 329 | // In the case the client directly dialed an endpoint we want to avoid printing |
| 330 | // the endpoint twice. |
| 331 | if server == name { |
| 332 | return fmt.Sprintf("%s.%s", server, method) |
| 333 | } |
| 334 | return fmt.Sprintf("%s:%s.%s", server, name, method) |
| 335 | } |
| 336 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 337 | // tryCreateFlow attempts to establish a Flow to "server" (which must be a |
| 338 | // rooted name), over which a method invocation request could be sent. |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 339 | // |
| 340 | // The server at the remote end of the flow is authorized using the provided |
| 341 | // authorizer, both during creation of the VC underlying the flow and the |
| 342 | // flow itself. |
Cosmos Nicolaou | 00a0f80 | 2014-11-16 22:44:55 -0800 | [diff] [blame] | 343 | // TODO(cnicolaou): implement real, configurable load balancing. |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 344 | func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 345 | status := &serverStatus{index: index, server: server} |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 346 | var span vtrace.Span |
Todd Wang | ad49204 | 2015-04-17 15:58:40 -0700 | [diff] [blame] | 347 | ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow") |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 348 | span.Annotatef("address:%v", server) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 349 | defer func() { |
| 350 | ch <- status |
| 351 | span.Finish() |
| 352 | }() |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 353 | |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 354 | suberr := func(err error) *verror.SubErr { |
| 355 | return &verror.SubErr{ |
Suharsh Sivakumar | 56bc5ee | 2015-04-16 13:46:11 -0700 | [diff] [blame] | 356 | Name: suberrName(server, name, method), |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 357 | Err: err, |
| 358 | Options: verror.Print, |
| 359 | } |
| 360 | } |
| 361 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 362 | address, suffix := naming.SplitAddressName(server) |
| 363 | if len(address) == 0 { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 364 | status.serverErr = suberr(verror.New(errNonRootedName, ctx, server)) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 365 | return |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 366 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 367 | status.suffix = suffix |
| 368 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 369 | ep, err := inaming.NewEndpoint(address) |
| 370 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 371 | status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx)) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 372 | return |
| 373 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 374 | if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil { |
Suharsh Sivakumar | 56bc5ee | 2015-04-16 13:46:11 -0700 | [diff] [blame] | 375 | status.serverErr.Name = suberrName(server, name, method) |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 376 | vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 377 | return |
| 378 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 379 | |
| 380 | // Authorize the remote end of the flow using the provided authorizer. |
| 381 | if status.flow.LocalPrincipal() == nil { |
| 382 | // LocalPrincipal is nil which means we are operating under |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 383 | // SecurityNone. |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 384 | return |
| 385 | } |
| 386 | |
Ankur | d864681 | 2015-03-12 10:48:41 -0700 | [diff] [blame] | 387 | seccall := security.NewCall(&security.CallParams{ |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 388 | LocalPrincipal: status.flow.LocalPrincipal(), |
| 389 | LocalBlessings: status.flow.LocalBlessings(), |
| 390 | RemoteBlessings: status.flow.RemoteBlessings(), |
| 391 | LocalEndpoint: status.flow.LocalEndpoint(), |
| 392 | RemoteEndpoint: status.flow.RemoteEndpoint(), |
| 393 | RemoteDischarges: status.flow.RemoteDischarges(), |
| 394 | Method: method, |
Matt Rosencrantz | 250558f | 2015-03-17 11:37:31 -0700 | [diff] [blame] | 395 | Suffix: status.suffix, |
| 396 | }) |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 397 | if err := auth.Authorize(ctx, seccall); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 398 | // We will test for errServerAuthorizeFailed in failedTryCall and report |
| 399 | // verror.ErrNotTrusted |
| 400 | status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err)) |
| 401 | vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 402 | status.flow.Close() |
| 403 | status.flow = nil |
| 404 | return |
| 405 | } |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 406 | status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 407 | return |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 408 | } |
| 409 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 410 | // tryCall makes a single attempt at a call. It may connect to multiple servers |
| 411 | // (all that serve "name"), but will invoke the method on at most one of them |
| 412 | // (the server running on the most preferred protcol and network amongst all |
| 413 | // the servers that were successfully connected to and authorized). |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 414 | // if requireResolve is true on return, then we shouldn't bother retrying unless |
| 415 | // you can re-resolve. |
| 416 | func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) { |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 417 | var resolved *naming.MountEntry |
Asim Shankar | 263c73b | 2015-03-19 18:31:26 -0700 | [diff] [blame] | 418 | var blessingPattern security.BlessingPattern |
| 419 | blessingPattern, name = security.SplitPatternName(name) |
David Why Use Two When One Will Do Presotto | 38788d4 | 2015-03-31 17:13:54 -0700 | [diff] [blame] | 420 | if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil { |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 421 | // We always return NoServers as the error so that the caller knows |
| 422 | // that's ok to retry the operation since the name may be registered |
| 423 | // in the near future. |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 424 | switch { |
| 425 | case verror.ErrorID(err) == naming.ErrNoSuchName.ID: |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 426 | return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name) |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 427 | case verror.ErrorID(err) == verror.ErrNoServers.ID: |
| 428 | // Avoid wrapping errors unnecessarily. |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 429 | return nil, verror.NoRetry, false, err |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 430 | default: |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 431 | return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err) |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 432 | } |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 433 | } else { |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 434 | if len(resolved.Servers) == 0 { |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 435 | // This should never happen. |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 436 | return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name) |
Suharsh Sivakumar | 65e44c2 | 2014-12-10 17:15:19 -0800 | [diff] [blame] | 437 | } |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 438 | // An empty set of protocols means all protocols... |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame] | 439 | if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 440 | return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 441 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 442 | } |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 443 | |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 444 | // We need to ensure calls to v23 factory methods do not occur during runtime |
| 445 | // initialization. Currently, the agent, which uses SecurityNone, is the only caller |
| 446 | // during runtime initialization. We would like to set the principal in the context |
| 447 | // to nil if we are running in SecurityNone, but this always results in a panic since |
Todd Wang | ad49204 | 2015-04-17 15:58:40 -0700 | [diff] [blame] | 448 | // the agent client would trigger the call v23.WithPrincipal during runtime |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 449 | // initialization. So, we gate the call to v23.GetPrincipal instead since the agent |
| 450 | // client will have callEncrypted == false. |
| 451 | // Potential solutions to this are: |
| 452 | // (1) Create a separate client for the agent so that this code doesn't have to |
| 453 | // account for its use during runtime initialization. |
| 454 | // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate |
| 455 | // on here. |
| 456 | var principal security.Principal |
| 457 | if callEncrypted(opts) { |
Suharsh Sivakumar | 0ed10c2 | 2015-04-06 12:55:55 -0700 | [diff] [blame] | 458 | if principal = v23.GetPrincipal(ctx); principal == nil { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 459 | return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx) |
Suharsh Sivakumar | 0ed10c2 | 2015-04-06 12:55:55 -0700 | [diff] [blame] | 460 | } |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 461 | } |
| 462 | |
Asim Shankar | b547ea9 | 2015-02-17 18:49:45 -0800 | [diff] [blame] | 463 | // servers is now ordered by the priority heurestic implemented in |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 464 | // filterAndOrderServers. |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 465 | // |
| 466 | // Try to connect to all servers in parallel. Provide sufficient |
| 467 | // buffering for all of the connections to finish instantaneously. This |
| 468 | // is important because we want to process the responses in priority |
| 469 | // order; that order is indicated by the order of entries in servers. |
| 470 | // So, if two respones come in at the same 'instant', we prefer the |
| 471 | // first in the resolved.Servers) |
| 472 | attempts := len(resolved.Servers) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 473 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 474 | responses := make([]*serverStatus, attempts) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 475 | ch := make(chan *serverStatus, attempts) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 476 | vcOpts := append(getVCOpts(opts), c.vcOpts...) |
Asim Shankar | 263c73b | 2015-03-19 18:31:26 -0700 | [diff] [blame] | 477 | authorizer := newServerAuthorizer(blessingPattern, opts...) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 478 | for i, server := range resolved.Names() { |
Asim Shankar | 263c73b | 2015-03-19 18:31:26 -0700 | [diff] [blame] | 479 | // Create a copy of vcOpts for each call to tryCreateFlow |
| 480 | // to avoid concurrent tryCreateFlows from stepping on each |
| 481 | // other while manipulating their copy of the options. |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 482 | vcOptsCopy := make([]stream.VCOpt, len(vcOpts)) |
| 483 | copy(vcOptsCopy, vcOpts) |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 484 | go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 485 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 486 | |
Todd Wang | f6a0688 | 2015-02-27 17:38:01 -0800 | [diff] [blame] | 487 | var timeoutChan <-chan time.Time |
| 488 | if deadline, ok := ctx.Deadline(); ok { |
| 489 | timeoutChan = time.After(deadline.Sub(time.Now())) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 490 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 491 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 492 | for { |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 493 | // Block for at least one new response from the server, or the timeout. |
| 494 | select { |
| 495 | case r := <-ch: |
| 496 | responses[r.index] = r |
| 497 | // Read as many more responses as we can without blocking. |
| 498 | LoopNonBlocking: |
| 499 | for { |
| 500 | select { |
| 501 | default: |
| 502 | break LoopNonBlocking |
| 503 | case r := <-ch: |
| 504 | responses[r.index] = r |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 505 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 506 | } |
| 507 | case <-timeoutChan: |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 508 | vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name) |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 509 | _, _, _, err := c.failedTryCall(ctx, name, method, responses, ch) |
Todd Wang | 8fa3876 | 2015-03-25 14:04:59 -0700 | [diff] [blame] | 510 | if verror.ErrorID(err) != verror.ErrTimeout.ID { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 511 | return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err) |
Cosmos Nicolaou | 38209d4 | 2014-12-09 16:50:38 -0800 | [diff] [blame] | 512 | } |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 513 | return nil, verror.NoRetry, false, err |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 514 | } |
| 515 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 516 | dc := c.dc |
| 517 | if shouldNotFetchDischarges(opts) { |
| 518 | dc = nil |
| 519 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 520 | // Process new responses, in priority order. |
| 521 | numResponses := 0 |
| 522 | for _, r := range responses { |
| 523 | if r != nil { |
| 524 | numResponses++ |
| 525 | } |
| 526 | if r == nil || r.flow == nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 527 | continue |
| 528 | } |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 529 | |
Matt Rosencrantz | 1094d06 | 2015-01-30 06:43:12 -0800 | [diff] [blame] | 530 | doneChan := ctx.Done() |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 531 | r.flow.SetDeadline(doneChan) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 532 | fc, err := newFlowClient(ctx, r.flow, r.blessings, dc) |
| 533 | if err != nil { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 534 | return nil, verror.NoRetry, false, err |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 535 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 536 | |
Ankur | dda1649 | 2015-04-07 12:35:42 -0700 | [diff] [blame] | 537 | if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 538 | r.serverErr = &verror.SubErr{ |
Suharsh Sivakumar | 56bc5ee | 2015-04-16 13:46:11 -0700 | [diff] [blame] | 539 | Name: suberrName(r.server, name, method), |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 540 | Options: verror.Print, |
| 541 | Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)), |
| 542 | } |
| 543 | vlog.VI(2).Infof("rpc: err: %s", r.serverErr) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 544 | r.flow.Close() |
| 545 | r.flow = nil |
| 546 | continue |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 547 | } |
| 548 | |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 549 | // This is the 'point of no return'; once the RPC is started (fc.start |
| 550 | // below) we can't be sure if it makes it to the server or not so, this |
| 551 | // code will never call fc.start more than once to ensure that we provide |
| 552 | // 'at-most-once' rpc semantics at this level. Retrying the network |
| 553 | // connections (i.e. creating flows) is fine since we can cleanup that |
| 554 | // state if we abort a call (i.e. close the flow). |
| 555 | // |
| 556 | // We must ensure that all flows other than r.flow are closed. |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 557 | // |
| 558 | // TODO(cnicolaou): all errors below are marked as NoRetry |
| 559 | // because we want to provide at-most-once rpc semantics so |
| 560 | // we only ever attempt an RPC once. In the future, we'll cache |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 561 | // responses on the server and then we can retry in-flight |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 562 | // RPCs. |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 563 | go cleanupTryCall(r, responses, ch) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 564 | |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 565 | if doneChan != nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 566 | go func() { |
| 567 | select { |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 568 | case <-doneChan: |
Cosmos Nicolaou | 6bed419 | 2015-05-07 21:31:15 -0700 | [diff] [blame] | 569 | vtrace.GetSpan(fc.ctx).Annotate("Canceled") |
Matt Rosencrantz | 9346b41 | 2014-12-18 15:59:19 -0800 | [diff] [blame] | 570 | fc.flow.Cancel() |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 571 | case <-fc.flow.Closed(): |
| 572 | } |
| 573 | }() |
| 574 | } |
| 575 | |
Todd Wang | f6a0688 | 2015-02-27 17:38:01 -0800 | [diff] [blame] | 576 | deadline, _ := ctx.Deadline() |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 577 | if verr := fc.start(r.suffix, method, args, deadline); verr != nil { |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 578 | return nil, verror.NoRetry, false, verr |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 579 | } |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 580 | return fc, verror.NoRetry, false, nil |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 581 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 582 | if numResponses == len(responses) { |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 583 | return c.failedTryCall(ctx, name, method, responses, ch) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 584 | } |
| 585 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 586 | } |
| 587 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 588 | // cleanupTryCall ensures we've waited for every response from the tryCreateFlow |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 589 | // goroutines, and have closed the flow from each one except skip. This is a |
| 590 | // blocking function; it should be called in its own goroutine. |
| 591 | func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) { |
| 592 | numPending := 0 |
| 593 | for _, r := range responses { |
| 594 | switch { |
| 595 | case r == nil: |
| 596 | // The response hasn't arrived yet. |
| 597 | numPending++ |
| 598 | case r == skip || r.flow == nil: |
| 599 | // Either we should skip this flow, or we've closed the flow for this |
| 600 | // response already; nothing more to do. |
| 601 | default: |
| 602 | // We received the response, but haven't closed the flow yet. |
| 603 | r.flow.Close() |
| 604 | } |
| 605 | } |
| 606 | // Now we just need to wait for the pending responses and close their flows. |
| 607 | for i := 0; i < numPending; i++ { |
| 608 | if r := <-ch; r.flow != nil { |
| 609 | r.flow.Close() |
| 610 | } |
| 611 | } |
| 612 | } |
| 613 | |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 614 | // failedTryCall performs ©asynchronous cleanup for tryCall, and returns an |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 615 | // appropriate error from the responses we've already received. All parallel |
| 616 | // calls in tryCall failed or we timed out if we get here. |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 617 | func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) { |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 618 | go cleanupTryCall(nil, responses, ch) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 619 | c.ns.FlushCacheEntry(name) |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 620 | suberrs := []verror.SubErr{} |
| 621 | topLevelError := verror.ErrNoServers |
| 622 | topLevelAction := verror.RetryRefetch |
Cosmos Nicolaou | 472c681 | 2015-04-21 14:02:14 -0700 | [diff] [blame] | 623 | onlyErrNetwork := true |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 624 | for _, r := range responses { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 625 | if r != nil && r.serverErr != nil && r.serverErr.Err != nil { |
| 626 | switch verror.ErrorID(r.serverErr.Err) { |
| 627 | case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID: |
| 628 | topLevelError = verror.ErrNotTrusted |
| 629 | topLevelAction = verror.NoRetry |
Cosmos Nicolaou | 472c681 | 2015-04-21 14:02:14 -0700 | [diff] [blame] | 630 | onlyErrNetwork = false |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 631 | case stream.ErrAborted.ID, stream.ErrNetwork.ID: |
Cosmos Nicolaou | 472c681 | 2015-04-21 14:02:14 -0700 | [diff] [blame] | 632 | // do nothing |
| 633 | default: |
| 634 | onlyErrNetwork = false |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 635 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 636 | suberrs = append(suberrs, *r.serverErr) |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 637 | } |
| 638 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 639 | |
Cosmos Nicolaou | 472c681 | 2015-04-21 14:02:14 -0700 | [diff] [blame] | 640 | if onlyErrNetwork { |
| 641 | // If we only encountered network errors, then report ErrBadProtocol. |
| 642 | topLevelError = verror.ErrBadProtocol |
| 643 | } |
| 644 | |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 645 | // TODO(cnicolaou): we get system errors for things like dialing using |
| 646 | // the 'ws' protocol which can never succeed even if we retry the connection, |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 647 | // hence we return RetryRefetch below except for the case where the servers |
| 648 | // are not trusted, in case there's no point in retrying at all. |
| 649 | // TODO(cnicolaou): implementing at-most-once rpc semantics in the future |
| 650 | // will require thinking through all of the cases where the RPC can |
| 651 | // be retried by the client whilst it's actually being executed on the |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 652 | // server. |
Matt Rosencrantz | ac6cd9b | 2015-04-20 15:49:37 -0700 | [diff] [blame] | 653 | return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 654 | } |
| 655 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 656 | // prepareBlessingsAndDischarges prepares blessings and discharges for |
| 657 | // the call. |
| 658 | // |
| 659 | // This includes: (1) preparing blessings that must be granted to the |
| 660 | // server, (2) preparing blessings that the client authenticates with, |
| 661 | // and, (3) preparing any discharges for third-party caveats on the client's |
| 662 | // blessings. |
Ankur | dda1649 | 2015-04-07 12:35:42 -0700 | [diff] [blame] | 663 | func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error { |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 664 | // LocalPrincipal is nil which means we are operating under |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 665 | // SecurityNone. |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 666 | if fc.flow.LocalPrincipal() == nil { |
| 667 | return nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 668 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 669 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 670 | // Fetch blessings from the client's blessing store that are to be |
| 671 | // shared with the server. |
| 672 | if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() { |
| 673 | // TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings |
| 674 | // to the server. |
| 675 | return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings) |
| 676 | } |
| 677 | |
| 678 | // Fetch any discharges for third-party caveats on the client's blessings. |
| 679 | if !fc.blessings.IsZero() && fc.dc != nil { |
| 680 | impetus, err := mkDischargeImpetus(fc.server, method, args) |
| 681 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 682 | return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err)) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 683 | } |
| 684 | fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus) |
| 685 | } |
Ankur | dda1649 | 2015-04-07 12:35:42 -0700 | [diff] [blame] | 686 | |
| 687 | // Prepare blessings that must be granted to the server (using any |
| 688 | // rpc.Granter implementation in 'opts'). |
| 689 | // |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 690 | // NOTE(ataly, suharshs): Before invoking the granter, we set the parameters |
| 691 | // of the current call. The user can now retrieve the principal via |
| 692 | // v23.GetPrincipal(ctx), or via call.LocalPrincipal(). While in theory the |
| 693 | // two principals can be different, the flow.LocalPrincipal == nil check at |
| 694 | // the beginning of this method ensures that the two are the same and non-nil |
| 695 | // at this point in the code. |
Ankur | dda1649 | 2015-04-07 12:35:42 -0700 | [diff] [blame] | 696 | ldischargeMap := make(map[string]security.Discharge) |
| 697 | for _, d := range fc.discharges { |
| 698 | ldischargeMap[d.ID()] = d |
| 699 | } |
| 700 | seccall := security.NewCall(&security.CallParams{ |
| 701 | LocalPrincipal: fc.flow.LocalPrincipal(), |
| 702 | LocalBlessings: fc.blessings, |
| 703 | RemoteBlessings: fc.flow.RemoteBlessings(), |
| 704 | LocalEndpoint: fc.flow.LocalEndpoint(), |
| 705 | RemoteEndpoint: fc.flow.RemoteEndpoint(), |
| 706 | LocalDischarges: ldischargeMap, |
| 707 | RemoteDischarges: fc.flow.RemoteDischarges(), |
| 708 | Method: method, |
| 709 | Suffix: suffix, |
| 710 | }) |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 711 | if err := fc.prepareGrantedBlessings(ctx, seccall, opts); err != nil { |
Ankur | dda1649 | 2015-04-07 12:35:42 -0700 | [diff] [blame] | 712 | return err |
| 713 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 714 | return nil |
| 715 | } |
| 716 | |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 717 | func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, call security.Call, opts []rpc.CallOpt) error { |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 718 | for _, o := range opts { |
| 719 | switch v := o.(type) { |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 720 | case rpc.Granter: |
Todd Wang | 4264e4b | 2015-04-16 22:43:40 -0700 | [diff] [blame] | 721 | if b, err := v.Grant(ctx, call); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 722 | return verror.New(errBlessingGrant, fc.ctx, err) |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 723 | } else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 724 | return verror.New(errBlessingAdd, fc.ctx, err) |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 725 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 726 | } |
| 727 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 728 | return nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 729 | } |
| 730 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 731 | func (c *client) Close() { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 732 | defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Suharsh Sivakumar | 04e0e28 | 2015-05-02 23:24:04 -0700 | [diff] [blame] | 733 | for _, v := range c.vcCache.Close() { |
| 734 | c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 735 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 736 | } |
| 737 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 738 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 739 | // flow that's already connected to the server. |
| 740 | type flowClient struct { |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 741 | ctx *context.T // context to annotate with call details |
| 742 | dec *vom.Decoder // to decode responses and results from the server |
| 743 | enc *vom.Encoder // to encode requests and args to the server |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 744 | server []string // Blessings bound to the server that authorize it to receive the RPC request from the client. |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 745 | flow stream.Flow // the underlying flow |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 746 | response rpc.Response // each decoded response message is kept here |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 747 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 748 | discharges []security.Discharge // discharges used for this request |
| 749 | dc vc.DischargeClient // client-global discharge-client |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 750 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 751 | blessings security.Blessings // the local blessings for the current RPC. |
| 752 | grantedBlessings security.Blessings // the blessings granted to the server. |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 753 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 754 | sendClosedMu sync.Mutex |
| 755 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 756 | finished bool // has Finish() already been called? |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 757 | } |
| 758 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 759 | var _ rpc.ClientCall = (*flowClient)(nil) |
| 760 | var _ rpc.Stream = (*flowClient)(nil) |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 761 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 762 | func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) { |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 763 | fc := &flowClient{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 764 | ctx: ctx, |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 765 | flow: flow, |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 766 | server: server, |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 767 | dc: dc, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 768 | } |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 769 | typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{}) |
| 770 | if typeenc == nil { |
Jungho Ahn | 5d1fe97 | 2015-04-27 17:51:32 -0700 | [diff] [blame] | 771 | fc.enc = vom.NewEncoder(flow) |
| 772 | fc.dec = vom.NewDecoder(flow) |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 773 | } else { |
Jungho Ahn | 5d1fe97 | 2015-04-27 17:51:32 -0700 | [diff] [blame] | 774 | fc.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)) |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 775 | typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{}) |
Jungho Ahn | 5d1fe97 | 2015-04-27 17:51:32 -0700 | [diff] [blame] | 776 | fc.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)) |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 777 | } |
| 778 | return fc, nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 779 | } |
| 780 | |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 781 | // close determines the appropriate error to return, in particular, |
| 782 | // if a timeout or cancelation has occured then any error |
| 783 | // is turned into a timeout or cancelation as appropriate. |
| 784 | // Cancelation takes precedence over timeout. This is needed because |
| 785 | // a timeout can lead to any other number of errors due to the underlying |
| 786 | // network connection being shutdown abruptly. |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 787 | func (fc *flowClient) close(err error) error { |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 788 | subErr := verror.SubErr{Err: err, Options: verror.Print} |
| 789 | subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String() |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 790 | if cerr := fc.flow.Close(); cerr != nil && err == nil { |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 791 | return verror.New(verror.ErrInternal, fc.ctx, subErr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 792 | } |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 793 | if err == nil { |
| 794 | return nil |
| 795 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 796 | switch verror.ErrorID(err) { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 797 | case verror.ErrCanceled.ID: |
| 798 | return err |
| 799 | case verror.ErrTimeout.ID: |
| 800 | // Canceled trumps timeout. |
| 801 | if fc.ctx.Err() == context.Canceled { |
| 802 | return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr) |
| 803 | } |
| 804 | return err |
| 805 | default: |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 806 | switch fc.ctx.Err() { |
| 807 | case context.DeadlineExceeded: |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 808 | timeout := verror.New(verror.ErrTimeout, fc.ctx) |
| 809 | err := verror.AddSubErrs(timeout, fc.ctx, subErr) |
| 810 | return err |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 811 | case context.Canceled: |
Cosmos Nicolaou | b291d6c | 2015-03-27 09:01:46 -0700 | [diff] [blame] | 812 | canceled := verror.New(verror.ErrCanceled, fc.ctx) |
| 813 | err := verror.AddSubErrs(canceled, fc.ctx, subErr) |
| 814 | return err |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 815 | } |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 816 | } |
| 817 | switch verror.ErrorID(err) { |
Jungho Ahn | 5d1fe97 | 2015-04-27 17:51:32 -0700 | [diff] [blame] | 818 | case errRequestEncoding.ID, errArgEncoding.ID, errResponseDecoding.ID: |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 819 | return verror.New(verror.ErrBadProtocol, fc.ctx, err) |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 820 | } |
| 821 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 822 | } |
| 823 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 824 | func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error { |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 825 | // Encode the Blessings information for the client to authorize the flow. |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 826 | var blessingsRequest rpc.BlessingsRequest |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 827 | if fc.flow.LocalPrincipal() != nil { |
Jungho Ahn | 44d8daf | 2015-01-16 10:39:15 -0800 | [diff] [blame] | 828 | blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings) |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 829 | } |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 830 | req := rpc.Request{ |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 831 | Suffix: suffix, |
| 832 | Method: method, |
| 833 | NumPosArgs: uint64(len(args)), |
Todd Wang | f6a0688 | 2015-02-27 17:38:01 -0800 | [diff] [blame] | 834 | Deadline: vtime.Deadline{deadline}, |
Asim Shankar | b07ec69 | 2015-02-27 23:40:44 -0800 | [diff] [blame] | 835 | GrantedBlessings: fc.grantedBlessings, |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 836 | Blessings: blessingsRequest, |
Asim Shankar | 0864282 | 2015-03-02 21:21:09 -0800 | [diff] [blame] | 837 | Discharges: fc.discharges, |
Matt Rosencrantz | 2803fe9 | 2015-03-09 15:26:32 -0700 | [diff] [blame] | 838 | TraceRequest: vtrace.GetRequest(fc.ctx), |
Matt Rosencrantz | 88be118 | 2015-04-27 13:45:43 -0700 | [diff] [blame] | 839 | Language: string(i18n.GetLangID(fc.ctx)), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 840 | } |
| 841 | if err := fc.enc.Encode(req); err != nil { |
Jungho Ahn | 5d1fe97 | 2015-04-27 17:51:32 -0700 | [diff] [blame] | 842 | berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 843 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 844 | } |
| 845 | for ix, arg := range args { |
| 846 | if err := fc.enc.Encode(arg); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 847 | berr := verror.New(errArgEncoding, fc.ctx, ix, err) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 848 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 849 | } |
| 850 | } |
| 851 | return nil |
| 852 | } |
| 853 | |
| 854 | func (fc *flowClient) Send(item interface{}) error { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 855 | defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 856 | if fc.sendClosed { |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 857 | return verror.New(verror.ErrAborted, fc.ctx) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 858 | } |
| 859 | |
| 860 | // The empty request header indicates what follows is a streaming arg. |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 861 | if err := fc.enc.Encode(rpc.Request{}); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 862 | berr := verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 863 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 864 | } |
| 865 | if err := fc.enc.Encode(item); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 866 | berr := verror.New(errArgEncoding, fc.ctx, -1, err) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 867 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 868 | } |
| 869 | return nil |
| 870 | } |
| 871 | |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 872 | // decodeNetError tests for a net.Error from the lower stream code and |
| 873 | // translates it into an appropriate error to be returned by the higher level |
| 874 | // RPC api calls. It also tests for the net.Error being a stream.NetError |
| 875 | // and if so, uses the error it stores rather than the stream.NetError itself |
| 876 | // as its retrun value. This allows for the stack trace of the original |
| 877 | // error to be chained to that of any verror created with it as a first parameter. |
| 878 | func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 879 | if neterr, ok := err.(net.Error); ok { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 880 | if streamNeterr, ok := err.(*stream.NetError); ok { |
| 881 | err = streamNeterr.Err() // return the error stored in the stream.NetError |
| 882 | } |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 883 | if neterr.Timeout() || neterr.Temporary() { |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 884 | // If a read is canceled in the lower levels we see |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 885 | // a timeout error - see readLocked in vc/reader.go |
| 886 | if ctx.Err() == context.Canceled { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 887 | return verror.ErrCanceled, err |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 888 | } |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 889 | return verror.ErrTimeout, err |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 890 | } |
| 891 | } |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 892 | if id := verror.ErrorID(err); id != verror.ErrUnknown.ID { |
| 893 | return verror.IDAction{id, verror.Action(err)}, err |
| 894 | } |
| 895 | return verror.ErrBadProtocol, err |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 896 | } |
| 897 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 898 | func (fc *flowClient) Recv(itemptr interface{}) error { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 899 | defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 900 | switch { |
| 901 | case fc.response.Error != nil: |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 902 | return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 903 | case fc.response.EndStreamResults: |
| 904 | return io.EOF |
| 905 | } |
| 906 | |
| 907 | // Decode the response header and handle errors and EOF. |
| 908 | if err := fc.dec.Decode(&fc.response); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 909 | id, verr := decodeNetError(fc.ctx, err) |
| 910 | berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 911 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 912 | } |
| 913 | if fc.response.Error != nil { |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 914 | return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 915 | } |
| 916 | if fc.response.EndStreamResults { |
| 917 | // Return EOF to indicate to the caller that there are no more stream |
| 918 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 919 | // returned to the user in Finish. |
| 920 | return io.EOF |
| 921 | } |
| 922 | // Decode the streaming result. |
| 923 | if err := fc.dec.Decode(itemptr); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 924 | id, verr := decodeNetError(fc.ctx, err) |
| 925 | berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 926 | // TODO(cnicolaou): should we be caching this? |
| 927 | fc.response.Error = berr |
| 928 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 929 | } |
| 930 | return nil |
| 931 | } |
| 932 | |
| 933 | func (fc *flowClient) CloseSend() error { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 934 | defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 935 | return fc.closeSend() |
| 936 | } |
| 937 | |
Mike Burrows | 2ec2bb3 | 2015-02-26 15:14:43 -0800 | [diff] [blame] | 938 | // closeSend ensures CloseSend always returns verror.E. |
Mike Burrows | dc6b360 | 2015-02-05 15:52:12 -0800 | [diff] [blame] | 939 | func (fc *flowClient) closeSend() error { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 940 | fc.sendClosedMu.Lock() |
| 941 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 942 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 943 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 944 | } |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 945 | if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 946 | // TODO(caprita): Indiscriminately closing the flow below causes |
| 947 | // a race as described in: |
| 948 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 949 | // |
| 950 | // There should be a finer grained way to fix this (for example, |
| 951 | // encoding errors should probably still result in closing the |
| 952 | // flow); on the flip side, there may exist other instances |
| 953 | // where we are closing the flow but should not. |
| 954 | // |
| 955 | // For now, commenting out the line below removes the flakiness |
| 956 | // from our existing unit tests, but this needs to be revisited |
| 957 | // and fixed correctly. |
| 958 | // |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 959 | // return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 960 | } |
| 961 | fc.sendClosed = true |
| 962 | return nil |
| 963 | } |
| 964 | |
| 965 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 966 | defer apilog.LogCallf(nil, "resultptrs...=%v", resultptrs)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 967 | err := fc.finish(resultptrs...) |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 968 | vtrace.GetSpan(fc.ctx).Finish() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 969 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 970 | } |
| 971 | |
Mike Burrows | 2ec2bb3 | 2015-02-26 15:14:43 -0800 | [diff] [blame] | 972 | // finish ensures Finish always returns a verror.E. |
Cosmos Nicolaou | d1ca686 | 2015-01-30 11:43:39 -0800 | [diff] [blame] | 973 | func (fc *flowClient) finish(resultptrs ...interface{}) error { |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 974 | if fc.finished { |
Todd Wang | ff73e1f | 2015-02-10 21:45:52 -0800 | [diff] [blame] | 975 | err := verror.New(errClientFinishAlreadyCalled, fc.ctx) |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 976 | return fc.close(verror.New(verror.ErrBadState, fc.ctx, err)) |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 977 | } |
| 978 | fc.finished = true |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 979 | |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 980 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 981 | // three cases: |
| 982 | // 1) Server is blocked on Recv waiting for the final request message. |
| 983 | // 2) Server has already finished processing, the final response message and |
| 984 | // out args are queued up on the client, and the flow is closed. |
| 985 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 986 | // response and args aren't queued up yet, and the flow isn't closed. |
| 987 | // |
| 988 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 989 | // we'll deadlock with both client and server waiting for each other. We must |
| 990 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 991 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 992 | // always return an error. But this isn't a "real" error; the client should |
| 993 | // read the rest of the results and succeed. |
| 994 | _ = fc.closeSend() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 995 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 996 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 997 | if err := fc.dec.Decode(&fc.response); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 998 | id, verr := decodeNetError(fc.ctx, err) |
| 999 | berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 1000 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1001 | } |
| 1002 | // The response header must indicate the streaming results have ended. |
| 1003 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 1004 | berr := verror.New(errRemainingStreamResults, fc.ctx) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 1005 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1006 | } |
| 1007 | } |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 1008 | if fc.response.AckBlessings { |
| 1009 | clientAckBlessings(fc.flow.VCDataCache(), fc.blessings) |
| 1010 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 1011 | // Incorporate any VTrace info that was returned. |
Matt Rosencrantz | 2803fe9 | 2015-03-09 15:26:32 -0700 | [diff] [blame] | 1012 | vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1013 | if fc.response.Error != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 1014 | id := verror.ErrorID(fc.response.Error) |
| 1015 | if id == verror.ErrNoAccess.ID && fc.dc != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 1016 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 1017 | // with retrying again and again with this discharge. As there is no direct way |
| 1018 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 1019 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Asim Shankar | 77befba | 2015-01-09 12:49:04 -0800 | [diff] [blame] | 1020 | vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 1021 | fc.dc.Invalidate(fc.discharges...) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 1022 | } |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 1023 | if id == errBadNumInputArgs.ID || id == errBadInputArg.ID { |
| 1024 | return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)) |
| 1025 | } |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 1026 | return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1027 | } |
| 1028 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 1029 | berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 1030 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1031 | } |
| 1032 | for ix, r := range resultptrs { |
| 1033 | if err := fc.dec.Decode(r); err != nil { |
Cosmos Nicolaou | fd54ed9 | 2015-04-27 14:12:27 -0700 | [diff] [blame] | 1034 | id, verr := decodeNetError(fc.ctx, err) |
| 1035 | berr := verror.New(id, fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, verr)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 1036 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1037 | } |
| 1038 | } |
| 1039 | return fc.close(nil) |
| 1040 | } |
| 1041 | |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 1042 | func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) { |
Cosmos Nicolaou | f3c1909 | 2015-05-27 17:53:37 -0700 | [diff] [blame] | 1043 | defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 1044 | return fc.server, fc.flow.RemoteBlessings() |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 1045 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1046 | |
| 1047 | func bpatterns(patterns []string) []security.BlessingPattern { |
| 1048 | if patterns == nil { |
| 1049 | return nil |
| 1050 | } |
| 1051 | bpatterns := make([]security.BlessingPattern, len(patterns)) |
| 1052 | for i, p := range patterns { |
| 1053 | bpatterns[i] = security.BlessingPattern(p) |
| 1054 | } |
| 1055 | return bpatterns |
| 1056 | } |