Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 4 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | "io" |
| 6 | "sync" |
| 7 | "time" |
| 8 | |
| 9 | "veyron/runtimes/google/ipc/version" |
| 10 | inaming "veyron/runtimes/google/naming" |
| 11 | isecurity "veyron/runtimes/google/security" |
| 12 | |
| 13 | "veyron2" |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 14 | "veyron2/context" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 15 | "veyron2/ipc" |
| 16 | "veyron2/ipc/stream" |
| 17 | "veyron2/naming" |
| 18 | "veyron2/security" |
| 19 | "veyron2/verror" |
| 20 | "veyron2/vlog" |
| 21 | "veyron2/vom" |
| 22 | ) |
| 23 | |
| 24 | var ( |
| 25 | errNoServers = verror.NotFoundf("ipc: no servers") |
| 26 | errFlowClosed = verror.Abortedf("ipc: flow closed") |
| 27 | errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results") |
| 28 | errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name") |
| 29 | ) |
| 30 | |
| 31 | type client struct { |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 32 | streamMgr stream.Manager |
| 33 | ns naming.Namespace |
| 34 | vcOpts []stream.VCOpt // vc opts passed to dial |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 35 | |
| 36 | // We support concurrent calls to StartCall and Close, so we must protect the |
| 37 | // vcMap. Everything else is initialized upon client construction, and safe |
| 38 | // to use concurrently. |
| 39 | vcMapMu sync.Mutex |
| 40 | // TODO(ashankar): Additionally, should vcMap be keyed with other options also? |
| 41 | vcMap map[string]*vcInfo // map from endpoint.String() to vc info |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 42 | |
| 43 | dischargeCache dischargeCache |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 44 | } |
| 45 | |
| 46 | type vcInfo struct { |
| 47 | vc stream.VC |
| 48 | remoteEP naming.Endpoint |
| 49 | // TODO(toddw): Add type and cancel flows. |
| 50 | } |
| 51 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 52 | func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 53 | c := &client{ |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 54 | streamMgr: streamMgr, |
| 55 | ns: ns, |
| 56 | vcMap: make(map[string]*vcInfo), |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 57 | dischargeCache: dischargeCache{CaveatDischargeMap: make(security.CaveatDischargeMap)}, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 58 | } |
| 59 | for _, opt := range opts { |
| 60 | // Collect all client opts that are also vc opts. |
| 61 | if vcOpt, ok := opt.(stream.VCOpt); ok { |
| 62 | c.vcOpts = append(c.vcOpts, vcOpt) |
| 63 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 64 | } |
| 65 | return c, nil |
| 66 | } |
| 67 | |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 68 | func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 69 | c.vcMapMu.Lock() |
| 70 | defer c.vcMapMu.Unlock() |
| 71 | if vcinfo := c.vcMap[ep.String()]; vcinfo != nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 72 | if flow, err := vcinfo.vc.Connect(); err == nil { |
| 73 | return flow, nil |
| 74 | } |
| 75 | // If the vc fails to establish a new flow, we assume it's |
| 76 | // broken, remove it from the map, and proceed to establishing |
| 77 | // a new vc. |
| 78 | // TODO(caprita): Should we distinguish errors due to vc being |
| 79 | // closed from other errors? If not, should we call vc.Close() |
| 80 | // before removing the vc from the map? |
| 81 | delete(c.vcMap, ep.String()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 82 | } |
| 83 | vc, err := c.streamMgr.Dial(ep, c.vcOpts...) |
| 84 | if err != nil { |
| 85 | return nil, err |
| 86 | } |
| 87 | // TODO(toddw): Add connections for the type and cancel flows. |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 88 | c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep} |
| 89 | return vc.Connect() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 90 | } |
| 91 | |
| 92 | // connectFlow parses an endpoint and a suffix out of the server and establishes |
| 93 | // a flow to the endpoint, returning the parsed suffix. |
| 94 | // The server name passed in should be a rooted name, of the form "/ep/suffix" or |
| 95 | // "/ep//suffix", or just "/ep". |
| 96 | func (c *client) connectFlow(server string) (stream.Flow, string, error) { |
| 97 | address, suffix := naming.SplitAddressName(server) |
| 98 | if len(address) == 0 { |
| 99 | return nil, "", errNonRootedName |
| 100 | } |
| 101 | ep, err := inaming.NewEndpoint(address) |
| 102 | if err != nil { |
| 103 | return nil, "", err |
| 104 | } |
| 105 | if err = version.CheckCompatibility(ep); err != nil { |
| 106 | return nil, "", err |
| 107 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 108 | flow, err := c.createFlow(ep) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 109 | if err != nil { |
| 110 | return nil, "", err |
| 111 | } |
| 112 | return flow, suffix, nil |
| 113 | } |
| 114 | |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 115 | func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) { |
| 116 | return c.startCall(ctx, name, method, args, opts...) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 117 | } |
| 118 | |
| 119 | // startCall ensures StartCall always returns verror.E. |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 120 | func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) { |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 121 | if ctx == nil { |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 122 | return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method) |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 123 | } |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 124 | servers, err := c.ns.Resolve(ctx, name) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 125 | if err != nil { |
| 126 | return nil, verror.NotFoundf("ipc: Resolve(%q) failed: %v", name, err) |
| 127 | } |
| 128 | // Try all servers, and if none of them are authorized for the call then return the error of the last server |
| 129 | // that was tried. |
| 130 | var lastErr verror.E |
| 131 | for _, server := range servers { |
| 132 | flow, suffix, err := c.connectFlow(server) |
| 133 | if err != nil { |
| 134 | lastErr = verror.NotFoundf("ipc: couldn't connect to server %v: %v", server, err) |
| 135 | vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err) |
| 136 | continue // Try the next server. |
| 137 | } |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 138 | timeout := time.Duration(ipc.NoTimeout) |
| 139 | if deadline, hasDeadline := ctx.Deadline(); hasDeadline { |
| 140 | timeout = deadline.Sub(time.Now()) |
| 141 | if err := flow.SetDeadline(deadline); err != nil { |
| 142 | lastErr = verror.Internalf("ipc: flow.SetDeadline failed: %v", err) |
| 143 | continue |
| 144 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 145 | } |
| 146 | |
| 147 | // Validate caveats on the server's identity for the context associated with this call. |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 148 | blessing, err := authorizeServer(flow.LocalID(), flow.RemoteID(), opts) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 149 | if err != nil { |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 150 | lastErr = verror.NotAuthorizedf("ipc: client unwilling to talk to server %q: %v", flow.RemoteID(), err) |
| 151 | flow.Close() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 152 | continue |
| 153 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 154 | |
Asim Shankar | a94e507 | 2014-08-19 18:18:36 -0700 | [diff] [blame] | 155 | discharges := c.prepareDischarges(ctx, flow.LocalID(), flow.RemoteID(), method, args, opts) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 156 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 157 | lastErr = nil |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 158 | fc := newFlowClient(flow, &c.dischargeCache, discharges) |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 159 | |
| 160 | go func() { |
| 161 | <-ctx.Done() |
| 162 | fc.Cancel() |
| 163 | }() |
| 164 | |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 165 | if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 166 | return nil, verr |
| 167 | } |
| 168 | return fc, nil |
| 169 | } |
| 170 | if lastErr != nil { |
David Why Use Two When One Will Do Presotto | f3f39ae | 2014-08-27 11:13:27 -0700 | [diff] [blame] | 171 | // If there was any problem starting the call, flush the cache entry under the |
| 172 | // assumption that it was caused by stale data. |
| 173 | c.ns.FlushCacheEntry(name) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 174 | return nil, lastErr |
| 175 | } |
| 176 | return nil, errNoServers |
| 177 | } |
| 178 | |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 179 | // authorizeServer validates that server has an identity that the client is willing to converse |
| 180 | // with, and if so returns a blessing to be provided to the server. This blessing can be nil, |
| 181 | // which indicates that the client does wish to talk to the server but not provide any blessings. |
| 182 | func authorizeServer(client, server security.PublicID, opts []ipc.CallOpt) (security.PublicID, error) { |
| 183 | if server == nil { |
| 184 | return nil, fmt.Errorf("server identity cannot be nil") |
| 185 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 186 | // TODO(ataly,andreser): Check the third-party discharges the server presents |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 187 | // TODO(ataly): What should the label be for the context? Typically the label is the security.Label |
| 188 | // of the method but we don't have that information here at the client. |
| 189 | authID, err := server.Authorize(isecurity.NewContext(isecurity.ContextArgs{ |
| 190 | LocalID: client, |
| 191 | RemoteID: server, |
| 192 | })) |
| 193 | if err != nil { |
| 194 | return nil, err |
| 195 | } |
| 196 | var granter ipc.Granter |
| 197 | for _, o := range opts { |
| 198 | switch v := o.(type) { |
| 199 | case veyron2.RemoteID: |
Asim Shankar | 6bc6458 | 2014-08-27 12:51:42 -0700 | [diff] [blame] | 200 | if !security.BlessingPattern(v).MatchedBy(authID.Names()...) { |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 201 | return nil, fmt.Errorf("server %q does not match the provided pattern %q", authID, v) |
| 202 | } |
| 203 | case ipc.Granter: |
| 204 | // Later Granters take precedence over earlier ones. |
| 205 | // Or should fail if there are multiple provided? |
| 206 | granter = v |
| 207 | } |
| 208 | } |
| 209 | var blessing security.PublicID |
| 210 | if granter != nil { |
| 211 | if blessing, err = granter.Grant(authID); err != nil { |
| 212 | return nil, fmt.Errorf("failed to grant credentials to server %q: %v", authID, err) |
| 213 | } |
| 214 | } |
| 215 | return blessing, nil |
| 216 | } |
| 217 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 218 | func (c *client) Close() { |
| 219 | c.vcMapMu.Lock() |
| 220 | for _, v := range c.vcMap { |
| 221 | c.streamMgr.ShutdownEndpoint(v.remoteEP) |
| 222 | } |
| 223 | c.vcMap = nil |
| 224 | c.vcMapMu.Unlock() |
| 225 | } |
| 226 | |
| 227 | // IPCBindOpt makes client implement BindOpt. |
| 228 | func (c *client) IPCBindOpt() {} |
| 229 | |
| 230 | var _ ipc.BindOpt = (*client)(nil) |
| 231 | |
| 232 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 233 | // flow that's already connected to the server. |
| 234 | type flowClient struct { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 235 | dec *vom.Decoder // to decode responses and results from the server |
| 236 | enc *vom.Encoder // to encode requests and args to the server |
| 237 | flow stream.Flow // the underlying flow |
| 238 | response ipc.Response // each decoded response message is kept here |
| 239 | |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 240 | discharges []security.ThirdPartyDischarge // discharges used for this request |
| 241 | dischargeCache *dischargeCache // client-global discharge cache reference type |
| 242 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 243 | sendClosedMu sync.Mutex |
| 244 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 245 | } |
| 246 | |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 247 | func newFlowClient(flow stream.Flow, dischargeCache *dischargeCache, discharges []security.ThirdPartyDischarge) *flowClient { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 248 | return &flowClient{ |
| 249 | // TODO(toddw): Support different codecs |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 250 | dec: vom.NewDecoder(flow), |
| 251 | enc: vom.NewEncoder(flow), |
| 252 | flow: flow, |
| 253 | discharges: discharges, |
| 254 | dischargeCache: dischargeCache, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 255 | } |
| 256 | } |
| 257 | |
| 258 | func (fc *flowClient) close(verr verror.E) verror.E { |
| 259 | if err := fc.flow.Close(); err != nil && verr == nil { |
| 260 | verr = verror.Internalf("ipc: flow close failed: %v", err) |
| 261 | } |
| 262 | return verr |
| 263 | } |
| 264 | |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 265 | func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 266 | req := ipc.Request{ |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 267 | Suffix: suffix, |
| 268 | Method: method, |
| 269 | NumPosArgs: uint64(len(args)), |
| 270 | Timeout: int64(timeout), |
| 271 | HasBlessing: blessing != nil, |
| 272 | NumDischarges: uint64(len(fc.discharges)), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 273 | } |
| 274 | if err := fc.enc.Encode(req); err != nil { |
| 275 | return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err)) |
| 276 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 277 | if blessing != nil { |
| 278 | if err := fc.enc.Encode(blessing); err != nil { |
| 279 | return fc.close(verror.BadProtocolf("ipc: blessing encoding failed: %v", err)) |
| 280 | } |
| 281 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 282 | for _, d := range fc.discharges { |
| 283 | if err := fc.enc.Encode(d); err != nil { |
| 284 | return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.CaveatID(), err)) |
| 285 | } |
| 286 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 287 | for ix, arg := range args { |
| 288 | if err := fc.enc.Encode(arg); err != nil { |
| 289 | return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err)) |
| 290 | } |
| 291 | } |
| 292 | return nil |
| 293 | } |
| 294 | |
| 295 | func (fc *flowClient) Send(item interface{}) error { |
| 296 | if fc.sendClosed { |
| 297 | return errFlowClosed |
| 298 | } |
| 299 | |
| 300 | // The empty request header indicates what follows is a streaming arg. |
| 301 | if err := fc.enc.Encode(ipc.Request{}); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 302 | return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 303 | } |
| 304 | if err := fc.enc.Encode(item); err != nil { |
| 305 | return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err)) |
| 306 | } |
| 307 | return nil |
| 308 | } |
| 309 | |
| 310 | func (fc *flowClient) Recv(itemptr interface{}) error { |
| 311 | switch { |
| 312 | case fc.response.Error != nil: |
| 313 | return fc.response.Error |
| 314 | case fc.response.EndStreamResults: |
| 315 | return io.EOF |
| 316 | } |
| 317 | |
| 318 | // Decode the response header and handle errors and EOF. |
| 319 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 320 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 321 | } |
| 322 | if fc.response.Error != nil { |
| 323 | return fc.response.Error |
| 324 | } |
| 325 | if fc.response.EndStreamResults { |
| 326 | // Return EOF to indicate to the caller that there are no more stream |
| 327 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 328 | // returned to the user in Finish. |
| 329 | return io.EOF |
| 330 | } |
| 331 | // Decode the streaming result. |
| 332 | if err := fc.dec.Decode(itemptr); err != nil { |
| 333 | return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err)) |
| 334 | } |
| 335 | return nil |
| 336 | } |
| 337 | |
| 338 | func (fc *flowClient) CloseSend() error { |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 339 | return fc.closeSend() |
| 340 | } |
| 341 | |
| 342 | // closeSend ensures CloseSend always returns verror.E. |
| 343 | func (fc *flowClient) closeSend() verror.E { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 344 | fc.sendClosedMu.Lock() |
| 345 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 346 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 347 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 348 | } |
| 349 | if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil { |
| 350 | return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err)) |
| 351 | } |
| 352 | fc.sendClosed = true |
| 353 | return nil |
| 354 | } |
| 355 | |
| 356 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
| 357 | return fc.finish(resultptrs...) |
| 358 | } |
| 359 | |
| 360 | // finish ensures Finish always returns verror.E. |
| 361 | func (fc *flowClient) finish(resultptrs ...interface{}) verror.E { |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 362 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 363 | // three cases: |
| 364 | // 1) Server is blocked on Recv waiting for the final request message. |
| 365 | // 2) Server has already finished processing, the final response message and |
| 366 | // out args are queued up on the client, and the flow is closed. |
| 367 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 368 | // response and args aren't queued up yet, and the flow isn't closed. |
| 369 | // |
| 370 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 371 | // we'll deadlock with both client and server waiting for each other. We must |
| 372 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 373 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 374 | // always return an error. But this isn't a "real" error; the client should |
| 375 | // read the rest of the results and succeed. |
| 376 | _ = fc.closeSend() |
| 377 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 378 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 379 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 380 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 381 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 382 | } |
| 383 | // The response header must indicate the streaming results have ended. |
| 384 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 385 | return fc.close(errRemainingStreamResults) |
| 386 | } |
| 387 | } |
| 388 | if fc.response.Error != nil { |
Ankur | 57444f3 | 2014-08-13 11:03:39 -0700 | [diff] [blame] | 389 | if verror.Is(fc.response.Error, verror.NotAuthorized) && fc.dischargeCache != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 390 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 391 | // with retrying again and again with this discharge. As there is no direct way |
| 392 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 393 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Ankur | 57444f3 | 2014-08-13 11:03:39 -0700 | [diff] [blame] | 394 | vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 395 | fc.dischargeCache.Invalidate(fc.discharges...) |
| 396 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 397 | return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error)) |
| 398 | } |
| 399 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
| 400 | return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d", got, want)) |
| 401 | } |
| 402 | for ix, r := range resultptrs { |
| 403 | if err := fc.dec.Decode(r); err != nil { |
| 404 | return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err)) |
| 405 | } |
| 406 | } |
| 407 | return fc.close(nil) |
| 408 | } |
| 409 | |
| 410 | func (fc *flowClient) Cancel() { |
| 411 | fc.flow.Cancel() |
| 412 | } |