blob: 5bc53cea7652b6b9b51da97059f0e02abe824b47 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
6 "sync"
7 "time"
8
9 "veyron/runtimes/google/ipc/version"
10 inaming "veyron/runtimes/google/naming"
11 isecurity "veyron/runtimes/google/security"
12
13 "veyron2"
Matt Rosencrantz29147f72014-06-06 12:46:01 -070014 "veyron2/context"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015 "veyron2/ipc"
16 "veyron2/ipc/stream"
17 "veyron2/naming"
18 "veyron2/security"
19 "veyron2/verror"
20 "veyron2/vlog"
21 "veyron2/vom"
22)
23
24var (
25 errNoServers = verror.NotFoundf("ipc: no servers")
26 errFlowClosed = verror.Abortedf("ipc: flow closed")
27 errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
28 errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
29)
30
31type client struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -070032 streamMgr stream.Manager
33 ns naming.Namespace
34 vcOpts []stream.VCOpt // vc opts passed to dial
Jiri Simsa5293dcb2014-05-10 09:56:38 -070035
36 // We support concurrent calls to StartCall and Close, so we must protect the
37 // vcMap. Everything else is initialized upon client construction, and safe
38 // to use concurrently.
39 vcMapMu sync.Mutex
40 // TODO(ashankar): Additionally, should vcMap be keyed with other options also?
41 vcMap map[string]*vcInfo // map from endpoint.String() to vc info
Andres Erbsenb7f95f32014-07-07 12:07:56 -070042
43 dischargeCache dischargeCache
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044}
45
46type vcInfo struct {
47 vc stream.VC
48 remoteEP naming.Endpoint
49 // TODO(toddw): Add type and cancel flows.
50}
51
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070052func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053 c := &client{
Andres Erbsenb7f95f32014-07-07 12:07:56 -070054 streamMgr: streamMgr,
55 ns: ns,
56 vcMap: make(map[string]*vcInfo),
Andres Erbsenb7f95f32014-07-07 12:07:56 -070057 dischargeCache: dischargeCache{CaveatDischargeMap: make(security.CaveatDischargeMap)},
Jiri Simsa5293dcb2014-05-10 09:56:38 -070058 }
59 for _, opt := range opts {
60 // Collect all client opts that are also vc opts.
61 if vcOpt, ok := opt.(stream.VCOpt); ok {
62 c.vcOpts = append(c.vcOpts, vcOpt)
63 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070064 }
65 return c, nil
66}
67
Bogdan Caprita783f7792014-05-15 09:29:17 -070068func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070069 c.vcMapMu.Lock()
70 defer c.vcMapMu.Unlock()
71 if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -070072 if flow, err := vcinfo.vc.Connect(); err == nil {
73 return flow, nil
74 }
75 // If the vc fails to establish a new flow, we assume it's
76 // broken, remove it from the map, and proceed to establishing
77 // a new vc.
78 // TODO(caprita): Should we distinguish errors due to vc being
79 // closed from other errors? If not, should we call vc.Close()
80 // before removing the vc from the map?
81 delete(c.vcMap, ep.String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070082 }
83 vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
84 if err != nil {
85 return nil, err
86 }
87 // TODO(toddw): Add connections for the type and cancel flows.
Bogdan Caprita783f7792014-05-15 09:29:17 -070088 c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
89 return vc.Connect()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070090}
91
92// connectFlow parses an endpoint and a suffix out of the server and establishes
93// a flow to the endpoint, returning the parsed suffix.
94// The server name passed in should be a rooted name, of the form "/ep/suffix" or
95// "/ep//suffix", or just "/ep".
96func (c *client) connectFlow(server string) (stream.Flow, string, error) {
97 address, suffix := naming.SplitAddressName(server)
98 if len(address) == 0 {
99 return nil, "", errNonRootedName
100 }
101 ep, err := inaming.NewEndpoint(address)
102 if err != nil {
103 return nil, "", err
104 }
105 if err = version.CheckCompatibility(ep); err != nil {
106 return nil, "", err
107 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700108 flow, err := c.createFlow(ep)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700109 if err != nil {
110 return nil, "", err
111 }
112 return flow, suffix, nil
113}
114
Asim Shankarddc0c222014-07-29 15:47:00 -0700115func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
116 return c.startCall(ctx, name, method, args, opts...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700117}
118
119// startCall ensures StartCall always returns verror.E.
Asim Shankarddc0c222014-07-29 15:47:00 -0700120func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) {
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000121 if ctx == nil {
Asim Shankarddc0c222014-07-29 15:47:00 -0700122 return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000123 }
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700124 servers, err := c.ns.Resolve(ctx, name)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700125 if err != nil {
126 return nil, verror.NotFoundf("ipc: Resolve(%q) failed: %v", name, err)
127 }
128 // Try all servers, and if none of them are authorized for the call then return the error of the last server
129 // that was tried.
130 var lastErr verror.E
131 for _, server := range servers {
132 flow, suffix, err := c.connectFlow(server)
133 if err != nil {
134 lastErr = verror.NotFoundf("ipc: couldn't connect to server %v: %v", server, err)
135 vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
136 continue // Try the next server.
137 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700138 timeout := time.Duration(ipc.NoTimeout)
139 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
140 timeout = deadline.Sub(time.Now())
141 if err := flow.SetDeadline(deadline); err != nil {
142 lastErr = verror.Internalf("ipc: flow.SetDeadline failed: %v", err)
143 continue
144 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700145 }
146
147 // Validate caveats on the server's identity for the context associated with this call.
Asim Shankarb54d7642014-06-05 13:08:04 -0700148 blessing, err := authorizeServer(flow.LocalID(), flow.RemoteID(), opts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700149 if err != nil {
Asim Shankarb54d7642014-06-05 13:08:04 -0700150 lastErr = verror.NotAuthorizedf("ipc: client unwilling to talk to server %q: %v", flow.RemoteID(), err)
151 flow.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700152 continue
153 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700154
Asim Shankara94e5072014-08-19 18:18:36 -0700155 discharges := c.prepareDischarges(ctx, flow.LocalID(), flow.RemoteID(), method, args, opts)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700156
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700157 lastErr = nil
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700158 fc := newFlowClient(flow, &c.dischargeCache, discharges)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700159
160 go func() {
161 <-ctx.Done()
162 fc.Cancel()
163 }()
164
Asim Shankarb54d7642014-06-05 13:08:04 -0700165 if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700166 return nil, verr
167 }
168 return fc, nil
169 }
170 if lastErr != nil {
David Why Use Two When One Will Do Presottof3f39ae2014-08-27 11:13:27 -0700171 // If there was any problem starting the call, flush the cache entry under the
172 // assumption that it was caused by stale data.
173 c.ns.FlushCacheEntry(name)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700174 return nil, lastErr
175 }
176 return nil, errNoServers
177}
178
Asim Shankarb54d7642014-06-05 13:08:04 -0700179// authorizeServer validates that server has an identity that the client is willing to converse
180// with, and if so returns a blessing to be provided to the server. This blessing can be nil,
181// which indicates that the client does wish to talk to the server but not provide any blessings.
182func authorizeServer(client, server security.PublicID, opts []ipc.CallOpt) (security.PublicID, error) {
183 if server == nil {
184 return nil, fmt.Errorf("server identity cannot be nil")
185 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700186 // TODO(ataly,andreser): Check the third-party discharges the server presents
Asim Shankarb54d7642014-06-05 13:08:04 -0700187 // TODO(ataly): What should the label be for the context? Typically the label is the security.Label
188 // of the method but we don't have that information here at the client.
189 authID, err := server.Authorize(isecurity.NewContext(isecurity.ContextArgs{
190 LocalID: client,
191 RemoteID: server,
192 }))
193 if err != nil {
194 return nil, err
195 }
196 var granter ipc.Granter
197 for _, o := range opts {
198 switch v := o.(type) {
199 case veyron2.RemoteID:
Asim Shankar6bc64582014-08-27 12:51:42 -0700200 if !security.BlessingPattern(v).MatchedBy(authID.Names()...) {
Asim Shankarb54d7642014-06-05 13:08:04 -0700201 return nil, fmt.Errorf("server %q does not match the provided pattern %q", authID, v)
202 }
203 case ipc.Granter:
204 // Later Granters take precedence over earlier ones.
205 // Or should fail if there are multiple provided?
206 granter = v
207 }
208 }
209 var blessing security.PublicID
210 if granter != nil {
211 if blessing, err = granter.Grant(authID); err != nil {
212 return nil, fmt.Errorf("failed to grant credentials to server %q: %v", authID, err)
213 }
214 }
215 return blessing, nil
216}
217
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700218func (c *client) Close() {
219 c.vcMapMu.Lock()
220 for _, v := range c.vcMap {
221 c.streamMgr.ShutdownEndpoint(v.remoteEP)
222 }
223 c.vcMap = nil
224 c.vcMapMu.Unlock()
225}
226
227// IPCBindOpt makes client implement BindOpt.
228func (c *client) IPCBindOpt() {}
229
230var _ ipc.BindOpt = (*client)(nil)
231
232// flowClient implements the RPC client-side protocol for a single RPC, over a
233// flow that's already connected to the server.
234type flowClient struct {
Asim Shankar1707e432014-05-29 19:42:41 -0700235 dec *vom.Decoder // to decode responses and results from the server
236 enc *vom.Encoder // to encode requests and args to the server
237 flow stream.Flow // the underlying flow
238 response ipc.Response // each decoded response message is kept here
239
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700240 discharges []security.ThirdPartyDischarge // discharges used for this request
241 dischargeCache *dischargeCache // client-global discharge cache reference type
242
Asim Shankar1707e432014-05-29 19:42:41 -0700243 sendClosedMu sync.Mutex
244 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700245}
246
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700247func newFlowClient(flow stream.Flow, dischargeCache *dischargeCache, discharges []security.ThirdPartyDischarge) *flowClient {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700248 return &flowClient{
249 // TODO(toddw): Support different codecs
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700250 dec: vom.NewDecoder(flow),
251 enc: vom.NewEncoder(flow),
252 flow: flow,
253 discharges: discharges,
254 dischargeCache: dischargeCache,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700255 }
256}
257
258func (fc *flowClient) close(verr verror.E) verror.E {
259 if err := fc.flow.Close(); err != nil && verr == nil {
260 verr = verror.Internalf("ipc: flow close failed: %v", err)
261 }
262 return verr
263}
264
Asim Shankarb54d7642014-06-05 13:08:04 -0700265func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700266 req := ipc.Request{
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700267 Suffix: suffix,
268 Method: method,
269 NumPosArgs: uint64(len(args)),
270 Timeout: int64(timeout),
271 HasBlessing: blessing != nil,
272 NumDischarges: uint64(len(fc.discharges)),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700273 }
274 if err := fc.enc.Encode(req); err != nil {
275 return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
276 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700277 if blessing != nil {
278 if err := fc.enc.Encode(blessing); err != nil {
279 return fc.close(verror.BadProtocolf("ipc: blessing encoding failed: %v", err))
280 }
281 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700282 for _, d := range fc.discharges {
283 if err := fc.enc.Encode(d); err != nil {
284 return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.CaveatID(), err))
285 }
286 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700287 for ix, arg := range args {
288 if err := fc.enc.Encode(arg); err != nil {
289 return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
290 }
291 }
292 return nil
293}
294
295func (fc *flowClient) Send(item interface{}) error {
296 if fc.sendClosed {
297 return errFlowClosed
298 }
299
300 // The empty request header indicates what follows is a streaming arg.
301 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700302 return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700303 }
304 if err := fc.enc.Encode(item); err != nil {
305 return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
306 }
307 return nil
308}
309
310func (fc *flowClient) Recv(itemptr interface{}) error {
311 switch {
312 case fc.response.Error != nil:
313 return fc.response.Error
314 case fc.response.EndStreamResults:
315 return io.EOF
316 }
317
318 // Decode the response header and handle errors and EOF.
319 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700320 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700321 }
322 if fc.response.Error != nil {
323 return fc.response.Error
324 }
325 if fc.response.EndStreamResults {
326 // Return EOF to indicate to the caller that there are no more stream
327 // results. Any error sent by the server is kept in fc.response.Error, and
328 // returned to the user in Finish.
329 return io.EOF
330 }
331 // Decode the streaming result.
332 if err := fc.dec.Decode(itemptr); err != nil {
333 return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err))
334 }
335 return nil
336}
337
338func (fc *flowClient) CloseSend() error {
Tilak Sharma0c766112014-05-20 17:47:27 -0700339 return fc.closeSend()
340}
341
342// closeSend ensures CloseSend always returns verror.E.
343func (fc *flowClient) closeSend() verror.E {
Asim Shankar1707e432014-05-29 19:42:41 -0700344 fc.sendClosedMu.Lock()
345 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700346 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700347 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700348 }
349 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
350 return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
351 }
352 fc.sendClosed = true
353 return nil
354}
355
356func (fc *flowClient) Finish(resultptrs ...interface{}) error {
357 return fc.finish(resultptrs...)
358}
359
360// finish ensures Finish always returns verror.E.
361func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
Todd Wangce3033b2014-05-23 17:04:44 -0700362 // Call closeSend implicitly, if the user hasn't already called it. There are
363 // three cases:
364 // 1) Server is blocked on Recv waiting for the final request message.
365 // 2) Server has already finished processing, the final response message and
366 // out args are queued up on the client, and the flow is closed.
367 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
368 // response and args aren't queued up yet, and the flow isn't closed.
369 //
370 // We must call closeSend to handle case (1) and unblock the server; otherwise
371 // we'll deadlock with both client and server waiting for each other. We must
372 // ignore the error (if any) to handle case (2). In that case the flow is
373 // closed, meaning writes will fail and reads will succeed, and closeSend will
374 // always return an error. But this isn't a "real" error; the client should
375 // read the rest of the results and succeed.
376 _ = fc.closeSend()
377
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700378 // Decode the response header, if it hasn't already been decoded by Recv.
379 if fc.response.Error == nil && !fc.response.EndStreamResults {
380 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700381 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700382 }
383 // The response header must indicate the streaming results have ended.
384 if fc.response.Error == nil && !fc.response.EndStreamResults {
385 return fc.close(errRemainingStreamResults)
386 }
387 }
388 if fc.response.Error != nil {
Ankur57444f32014-08-13 11:03:39 -0700389 if verror.Is(fc.response.Error, verror.NotAuthorized) && fc.dischargeCache != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700390 // In case the error was caused by a bad discharge, we do not want to get stuck
391 // with retrying again and again with this discharge. As there is no direct way
392 // to detect it, we conservatively flush all discharges we used from the cache.
393 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Ankur57444f32014-08-13 11:03:39 -0700394 vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700395 fc.dischargeCache.Invalidate(fc.discharges...)
396 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700397 return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
398 }
399 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
400 return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d", got, want))
401 }
402 for ix, r := range resultptrs {
403 if err := fc.dec.Decode(r); err != nil {
404 return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err))
405 }
406 }
407 return fc.close(nil)
408}
409
410func (fc *flowClient) Cancel() {
411 fc.flow.Cancel()
412}