blob: 1b72b7a9dd6c335e2ad8be4d73c0882d4592e8d9 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -07006 "math"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07007 "math/rand"
Ryan Brown2726b402014-11-04 17:13:27 -08008 "regexp"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07009 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070010 "sync"
11 "time"
12
Ankure49a86a2014-11-11 18:52:43 -080013 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
Jiri Simsa519c5072014-09-17 21:37:57 -070014 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
15 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
16 "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017
Jiri Simsa519c5072014-09-17 21:37:57 -070018 "veyron.io/veyron/veyron2/context"
19 "veyron.io/veyron/veyron2/ipc"
20 "veyron.io/veyron/veyron2/ipc/stream"
21 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070022 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070023 "veyron.io/veyron/veyron2/security"
Ankure49a86a2014-11-11 18:52:43 -080024 "veyron.io/veyron/veyron2/vdl/vdlutil"
Jiri Simsa519c5072014-09-17 21:37:57 -070025 "veyron.io/veyron/veyron2/verror"
26 "veyron.io/veyron/veyron2/vlog"
27 "veyron.io/veyron/veyron2/vom"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070028)
29
30var (
Tilak Sharma492e8e92014-09-18 10:58:14 -070031 errNoServers = verror.NoExistf("ipc: no servers")
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080032 errNoAccess = verror.NoAccessf("ipc: client unwilling to access to server")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070033 errFlowClosed = verror.Abortedf("ipc: flow closed")
34 errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
35 errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
36)
37
Ryan Brown2726b402014-11-04 17:13:27 -080038var serverPatternRegexp = regexp.MustCompile("^\\[([^\\]]+)\\](.*)")
39
40// TODO(ribrdb): Flip this to true once everything is updated.
41const enableSecureServerAuth = false
42
Jiri Simsa5293dcb2014-05-10 09:56:38 -070043type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080044 streamMgr stream.Manager
45 ns naming.Namespace
46 vcOpts []stream.VCOpt // vc opts passed to dial
47 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048
49 // We support concurrent calls to StartCall and Close, so we must protect the
50 // vcMap. Everything else is initialized upon client construction, and safe
51 // to use concurrently.
52 vcMapMu sync.Mutex
Asim Shankar8f05c222014-10-06 22:08:19 -070053 // TODO(ashankar): The key should be a function of the blessings shared with the server?
54 vcMap map[string]*vcInfo // map key is endpoint.String
Andres Erbsenb7f95f32014-07-07 12:07:56 -070055
Ankure49a86a2014-11-11 18:52:43 -080056 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -070057}
58
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070059var _ ipc.Client = (*client)(nil)
60var _ ipc.BindOpt = (*client)(nil)
61
Jiri Simsa5293dcb2014-05-10 09:56:38 -070062type vcInfo struct {
63 vc stream.VC
64 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -070065}
66
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070067func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080068
Jiri Simsa5293dcb2014-05-10 09:56:38 -070069 c := &client{
Ankure49a86a2014-11-11 18:52:43 -080070 streamMgr: streamMgr,
71 ns: ns,
72 vcMap: make(map[string]*vcInfo),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070073 }
74 for _, opt := range opts {
Ankure49a86a2014-11-11 18:52:43 -080075 if dc, ok := opt.(vc.DischargeClient); ok {
76 c.dc = dc
77 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070078 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080079 switch v := opt.(type) {
80 case stream.VCOpt:
81 c.vcOpts = append(c.vcOpts, v)
82 case options.PreferredProtocols:
83 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -070084 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080086
Jiri Simsa5293dcb2014-05-10 09:56:38 -070087 return c, nil
88}
89
Bogdan Caprita783f7792014-05-15 09:29:17 -070090func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070091 c.vcMapMu.Lock()
92 defer c.vcMapMu.Unlock()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080093 if c.vcMap == nil {
94 return nil, fmt.Errorf("client has been closed")
95 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070096 if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -070097 if flow, err := vcinfo.vc.Connect(); err == nil {
98 return flow, nil
99 }
100 // If the vc fails to establish a new flow, we assume it's
101 // broken, remove it from the map, and proceed to establishing
102 // a new vc.
103 // TODO(caprita): Should we distinguish errors due to vc being
104 // closed from other errors? If not, should we call vc.Close()
105 // before removing the vc from the map?
106 delete(c.vcMap, ep.String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700107 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800108 sm := c.streamMgr
Robin Thellendee439642014-10-20 14:39:17 -0700109 c.vcMapMu.Unlock()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800110 vc, err := sm.Dial(ep, c.vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -0700111 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700112 if err != nil {
113 return nil, err
114 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800115 if c.vcMap == nil {
116 sm.ShutdownEndpoint(ep)
117 return nil, fmt.Errorf("client has been closed")
118 }
Robin Thellendee439642014-10-20 14:39:17 -0700119 if othervc, exists := c.vcMap[ep.String()]; exists {
120 vc = othervc.vc
121 // TODO(ashankar,toddw): Figure out how to close up the VC that
122 // is discarded. vc.Close?
123 } else {
124 c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
125 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700126 return vc.Connect()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700127}
128
129// connectFlow parses an endpoint and a suffix out of the server and establishes
130// a flow to the endpoint, returning the parsed suffix.
131// The server name passed in should be a rooted name, of the form "/ep/suffix" or
132// "/ep//suffix", or just "/ep".
133func (c *client) connectFlow(server string) (stream.Flow, string, error) {
134 address, suffix := naming.SplitAddressName(server)
135 if len(address) == 0 {
136 return nil, "", errNonRootedName
137 }
138 ep, err := inaming.NewEndpoint(address)
139 if err != nil {
140 return nil, "", err
141 }
142 if err = version.CheckCompatibility(ep); err != nil {
143 return nil, "", err
144 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700145 flow, err := c.createFlow(ep)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700146 if err != nil {
147 return nil, "", err
148 }
149 return flow, suffix, nil
150}
151
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700152// A randomized exponential backoff. The randomness deters error convoys from forming.
153func backoff(n int, deadline time.Time) bool {
154 b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
155 if b > maxBackoff {
156 b = maxBackoff
157 }
158 r := deadline.Sub(time.Now())
159 if b > r {
160 // We need to leave a little time for the call to start or
161 // we'll just timeout in startCall before we actually do
162 // anything. If we just have a millisecond left, give up.
163 if r <= time.Millisecond {
164 return false
165 }
166 b = r - time.Millisecond
167 }
168 time.Sleep(b)
169 return true
170}
171
172// TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack.
173func retriable(err error) bool {
174 e := err.Error()
175 // Authentication errors are permanent.
176 if strings.Contains(e, "authorized") {
177 return false
178 }
179 // Resolution errors are retriable.
180 if strings.Contains(e, "ipc: Resolve") {
181 return true
182 }
183 // Kernel level errors are retriable.
184 if strings.Contains(e, "errno") {
185 return true
186 }
Matt Rosencrantz0c4032e2014-09-23 12:43:24 -0700187 // Connection refused is retriable.
188 if strings.Contains(e, "connection refused") {
189 return true
190 }
191
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700192 return false
193}
194
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700195func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
196 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700197 if r, ok := o.(options.RetryTimeout); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700198 return time.Duration(r), true
199 }
200 }
201 return 0, false
202}
203
Asim Shankarddc0c222014-07-29 15:47:00 -0700204func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700205 defer vlog.LogCall()()
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700206 return c.startCall(ctx, name, method, args, opts)
207}
208
209func getNoResolveOpt(opts []ipc.CallOpt) bool {
210 for _, o := range opts {
211 if r, ok := o.(options.NoResolve); ok {
212 return bool(r)
213 }
214 }
215 return false
216}
217
Ankure49a86a2014-11-11 18:52:43 -0800218func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) security.DischargeImpetus {
219 var impetus security.DischargeImpetus
220 if len(serverBlessings) > 0 {
221 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
222 for i, b := range serverBlessings {
223 impetus.Server[i] = security.BlessingPattern(b)
224 }
225 }
226 impetus.Method = method
227 if len(args) > 0 {
228 impetus.Arguments = make([]vdlutil.Any, len(args))
229 for i, a := range args {
230 impetus.Arguments[i] = vdlutil.Any(a)
231 }
232 }
233 return impetus
234}
235
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700236// startCall ensures StartCall always returns verror.E.
237func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
238 if ctx == nil {
239 return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
240 }
241
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700242 // Context specified deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700243 deadline, hasDeadline := ctx.Deadline()
244 if !hasDeadline {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700245 // Default deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700246 deadline = time.Now().Add(defaultCallTimeout)
247 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700248 if r, ok := getRetryTimeoutOpt(opts); ok {
249 // Caller specified deadline.
250 deadline = time.Now().Add(time.Duration(r))
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700251 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700252 var lastErr verror.E
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700253 for retries := 0; ; retries++ {
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700254 if retries != 0 {
255 if !backoff(retries, deadline) {
256 break
257 }
258 }
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700259 call, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700260 if err == nil {
261 return call, nil
262 }
263 lastErr = err
Nicolas LaCasse27f70412014-10-03 16:51:55 -0700264 if time.Now().After(deadline) || !retriable(err) {
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700265 break
266 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700267 }
268 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700269}
270
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800271type serverStatus struct {
272 index int
273 suffix string
274 flow stream.Flow
275 processed bool
276 err verror.E
277}
278
279func (c *client) tryServer(index int, server string, ch chan<- *serverStatus, done <-chan struct{}) {
280 select {
281 case <-done:
282 return
283 default:
284 }
285 status := &serverStatus{index: index}
286 flow, suffix, err := c.connectFlow(server)
287 if err != nil {
288 vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
289 status.err = verror.NoExistf("ipc: %q: %s", server, err)
290 ch <- status
291 return
292 }
293 status.suffix = suffix
294 status.flow = flow
295 select {
296 case <-done:
297 flow.Close()
298 default:
299 ch <- status
300 }
301}
302
303// tryCall makes a single attempt at a call, against possibly multiple servers.
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700304func (c *client) tryCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800305 ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>\"%s\".%s", name, method))
Ryan Brownbc2c87c2014-11-17 18:55:25 +0000306 _, serverPattern, name := splitObjectName(name)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700307 // Resolve name unless told not to.
308 var servers []string
309 if getNoResolveOpt(opts) {
310 servers = []string{name}
311 } else {
Ryan Brownbc2c87c2014-11-17 18:55:25 +0000312 if resolved, err := c.ns.Resolve(ctx, name); err != nil {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700313 return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800314 } else {
315 // An empty set of protocols means all protocols...
316 ordered, err := filterAndOrderServers(resolved, c.preferredProtocols)
317 if len(ordered) == 0 {
318 return nil, verror.NoExistf("ipc: %q: %s", name, err)
319 }
320 servers = ordered
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700321 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700322 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800323 // servers is now orderd by the priority heurestic implemented in
324 // filterAndOrderServers.
325 attempts := len(servers)
326 if attempts == 0 {
327 return nil, errNoServers
328 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800329
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800330 // Try to connect to all servers in parallel.
331 responses := make([]*serverStatus, attempts)
332
333 // Provide sufficient buffering for all of the connections to finish
334 // instantaneously. This is important because we want to process
335 // the responses in priority order; that order is indicated by the
336 // order of entries in servers. So, if two respones come in at the
337 // same 'instant', we prefer the first in the slice.
338 ch := make(chan *serverStatus, attempts)
339
340 // Read as many responses as we can before we would block.
341 gatherResponses := func() {
342 for {
343 select {
344 default:
345 return
346 case s := <-ch:
347 responses[s.index] = s
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700348 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700349 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700350 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800351
352 delay := time.Duration(ipc.NoTimeout)
353 if dl, set := ctx.Deadline(); set {
354 delay = dl.Sub(time.Now())
355 }
356 timeoutChan := time.After(delay)
357
358 // We'll close this channel when an RPC has been started and we've
359 // irrevocably selected a server.
360 done := make(chan struct{})
361 // Try all of the servers in parallel.
362 for i, server := range servers {
363 go c.tryServer(i, server, ch, done)
364 }
365
366 select {
367 case <-timeoutChan:
368 // All calls failed if we get here.
369 close(done)
David Why Use Two When One Will Do Presottof3f39ae2014-08-27 11:13:27 -0700370 c.ns.FlushCacheEntry(name)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800371 return nil, verror.NoExistf("ipc: couldn't connect to server %v", name)
372 case s := <-ch:
373 responses[s.index] = s
374 gatherResponses()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700375 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800376
377 accessErrs := []error{}
378 connErrs := []error{}
379 for {
380
381 for _, r := range responses {
382 if r == nil || r.err != nil {
383 if r != nil && r.err != nil && !r.processed {
384 connErrs = append(connErrs, r.err)
385 r.processed = true
386 }
387 continue
388 }
389
390 flow := r.flow
391 suffix := r.suffix
392 flow.SetDeadline(ctx.Done())
393
394 var (
395 serverB []string
396 grantedB security.Blessings
397 )
398
399 // LocalPrincipal is nil means that the client wanted to avoid
400 // authentication, and thus wanted to skip authorization as well.
401 if flow.LocalPrincipal() != nil {
402 // Validate caveats on the server's identity for the context associated with this call.
403 var err error
404 if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
405 vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
406 if !r.processed {
407 accessErrs = append(accessErrs, err)
408 r.err = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
409 r.processed = true
410 }
411 flow.Close()
412 continue
413 }
414 }
415
416 // This is the 'point of no return', so we tell the tryServer
417 // goroutines to not bother sending us any more flows.
418 // Once the RPC is started (fc.start below) we can't be sure
419 // if it makes it to the server or not so, this code will
420 // never call fc.start more than once to ensure that we
421 // provide 'at-most-once' rpc semantics at this level. Retrying
422 // the network connections (i.e. creating flows) is fine since
423 // we can cleanup that state if we abort a call (i.e. close the
424 // flow).
425 close(done)
426
427 fc := newFlowClient(ctx, serverB, flow, c.dc)
428
429 if doneChan := ctx.Done(); doneChan != nil {
430 go func() {
431 select {
432 case <-ctx.Done():
433 fc.Cancel()
434 case <-fc.flow.Closed():
435 }
436 }()
437 }
438
439 timeout := time.Duration(ipc.NoTimeout)
440 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
441 timeout = deadline.Sub(time.Now())
442 }
443 if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
444 return nil, verr
445 }
446 return fc, nil
447 }
448
449 // Quit if we've seen an error from all parallel connection attempts
450 handled := 0
451 for _, r := range responses {
452 if r != nil && r.err != nil {
453 handled++
454 }
455 }
456 if handled == len(responses) {
457 break
458 }
459
460 select {
461 case <-timeoutChan:
462 // All remaining calls failed if we get here.
463 vlog.VI(2).Infof("ipc: couldn't connect to server %v", name)
464 goto quit
465 case s := <-ch:
466 responses[s.index] = s
467 gatherResponses()
468 }
469 }
470quit:
471 close(done)
472 c.ns.FlushCacheEntry(name)
473 // TODO(cnicolaou): introduce a third error code here for mixed
474 // conn/access errors.
475 return nil, verror.NoExistf("ipc: client failed to invoke %q.%q: on %v", name, method, servers, append(connErrs, accessErrs...))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700476}
477
Asim Shankar8f05c222014-10-06 22:08:19 -0700478// authorizeServer validates that the server (remote end of flow) has the credentials to serve
479// the RPC name.method for the client (local end of the flow). It returns the blessings at the
480// server that are authorized for this purpose and any blessings that are to be granted to
481// the server (via ipc.Granter implementations in opts.)
Ankure49a86a2014-11-11 18:52:43 -0800482func (c *client) authorizeServer(flow stream.Flow, name, method string, serverPattern security.BlessingPattern, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) {
Asim Shankar220a0152014-10-30 21:21:09 -0700483 if flow.RemoteBlessings() == nil {
Asim Shankar8f05c222014-10-06 22:08:19 -0700484 return nil, nil, fmt.Errorf("server has not presented any blessings")
Asim Shankarb54d7642014-06-05 13:08:04 -0700485 }
Ankure49a86a2014-11-11 18:52:43 -0800486 ctxt := security.NewContext(&security.ContextParams{
487 LocalPrincipal: flow.LocalPrincipal(),
488 LocalBlessings: flow.LocalBlessings(),
489 RemoteBlessings: flow.RemoteBlessings(),
490 LocalEndpoint: flow.LocalEndpoint(),
491 RemoteEndpoint: flow.RemoteEndpoint(),
492 RemoteDischarges: flow.RemoteDischarges(),
493 Method: method,
494 Name: name})
Ryan Brown2726b402014-11-04 17:13:27 -0800495 serverBlessings = flow.RemoteBlessings().ForContext(ctxt)
496 if serverPattern != "" {
497 if !serverPattern.MatchedBy(serverBlessings...) {
498 return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, serverPattern)
499 }
500 } else if enableSecureServerAuth {
501 if err := (defaultAuthorizer{}).Authorize(ctxt); err != nil {
502 return nil, nil, fmt.Errorf("default authorization precludes talking to server %v", serverBlessings)
503 }
504 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700505 for _, o := range opts {
506 switch v := o.(type) {
Asim Shankarb54d7642014-06-05 13:08:04 -0700507 case ipc.Granter:
Asim Shankar8f05c222014-10-06 22:08:19 -0700508 if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
509 return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err)
510 } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
511 return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err)
512 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700513 }
514 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700515 return serverBlessings, grantedBlessings, nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700516}
517
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700518func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700519 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700520 c.vcMapMu.Lock()
521 for _, v := range c.vcMap {
522 c.streamMgr.ShutdownEndpoint(v.remoteEP)
523 }
524 c.vcMap = nil
525 c.vcMapMu.Unlock()
526}
527
528// IPCBindOpt makes client implement BindOpt.
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700529func (c *client) IPCBindOpt() {
530 //nologcall
531}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700532
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700533// flowClient implements the RPC client-side protocol for a single RPC, over a
534// flow that's already connected to the server.
535type flowClient struct {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700536 ctx context.T // context to annotate with call details
Asim Shankar1707e432014-05-29 19:42:41 -0700537 dec *vom.Decoder // to decode responses and results from the server
538 enc *vom.Encoder // to encode requests and args to the server
Asim Shankar8f05c222014-10-06 22:08:19 -0700539 server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
Asim Shankar1707e432014-05-29 19:42:41 -0700540 flow stream.Flow // the underlying flow
541 response ipc.Response // each decoded response message is kept here
542
Ankure49a86a2014-11-11 18:52:43 -0800543 discharges []security.Discharge // discharges used for this request
544 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700545
Asim Shankar1707e432014-05-29 19:42:41 -0700546 sendClosedMu sync.Mutex
547 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700548
549 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700550}
551
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700552var _ ipc.Call = (*flowClient)(nil)
553var _ ipc.Stream = (*flowClient)(nil)
554
Ankure49a86a2014-11-11 18:52:43 -0800555func newFlowClient(ctx context.T, server []string, flow stream.Flow, dc vc.DischargeClient) *flowClient {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700556 return &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800557 ctx: ctx,
558 dec: vom.NewDecoder(flow),
559 enc: vom.NewEncoder(flow),
560 server: server,
561 flow: flow,
562 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700563 }
564}
565
566func (fc *flowClient) close(verr verror.E) verror.E {
567 if err := fc.flow.Close(); err != nil && verr == nil {
568 verr = verror.Internalf("ipc: flow close failed: %v", err)
569 }
570 return verr
571}
572
Asim Shankar8f05c222014-10-06 22:08:19 -0700573func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E {
Ankure49a86a2014-11-11 18:52:43 -0800574 // Fetch any discharges for third-party caveats on the client's blessings
575 // if this client owns a discharge-client.
576 if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
577 fc.discharges = fc.dc.PrepareDischarges(self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
578 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700579 req := ipc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700580 Suffix: suffix,
581 Method: method,
582 NumPosArgs: uint64(len(args)),
583 Timeout: int64(timeout),
584 GrantedBlessings: security.MarshalBlessings(blessings),
585 NumDischarges: uint64(len(fc.discharges)),
586 TraceRequest: vtrace.Request(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700587 }
588 if err := fc.enc.Encode(req); err != nil {
589 return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
590 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700591 for _, d := range fc.discharges {
592 if err := fc.enc.Encode(d); err != nil {
Ankurf044a8d2014-09-05 17:05:24 -0700593 return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err))
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700594 }
595 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700596 for ix, arg := range args {
597 if err := fc.enc.Encode(arg); err != nil {
598 return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
599 }
600 }
601 return nil
602}
603
604func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700605 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700606 if fc.sendClosed {
607 return errFlowClosed
608 }
609
610 // The empty request header indicates what follows is a streaming arg.
611 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700612 return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700613 }
614 if err := fc.enc.Encode(item); err != nil {
615 return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
616 }
617 return nil
618}
619
620func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700621 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700622 switch {
623 case fc.response.Error != nil:
624 return fc.response.Error
625 case fc.response.EndStreamResults:
626 return io.EOF
627 }
628
629 // Decode the response header and handle errors and EOF.
630 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700631 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700632 }
633 if fc.response.Error != nil {
634 return fc.response.Error
635 }
636 if fc.response.EndStreamResults {
637 // Return EOF to indicate to the caller that there are no more stream
638 // results. Any error sent by the server is kept in fc.response.Error, and
639 // returned to the user in Finish.
640 return io.EOF
641 }
642 // Decode the streaming result.
643 if err := fc.dec.Decode(itemptr); err != nil {
644 return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err))
645 }
646 return nil
647}
648
649func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700650 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700651 return fc.closeSend()
652}
653
654// closeSend ensures CloseSend always returns verror.E.
655func (fc *flowClient) closeSend() verror.E {
Asim Shankar1707e432014-05-29 19:42:41 -0700656 fc.sendClosedMu.Lock()
657 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700658 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700659 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700660 }
661 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700662 // TODO(caprita): Indiscriminately closing the flow below causes
663 // a race as described in:
664 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
665 //
666 // There should be a finer grained way to fix this (for example,
667 // encoding errors should probably still result in closing the
668 // flow); on the flip side, there may exist other instances
669 // where we are closing the flow but should not.
670 //
671 // For now, commenting out the line below removes the flakiness
672 // from our existing unit tests, but this needs to be revisited
673 // and fixed correctly.
674 //
675 // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700676 }
677 fc.sendClosed = true
678 return nil
679}
680
681func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700682 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700683 err := fc.finish(resultptrs...)
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700684 vtrace.FromContext(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700685 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700686}
687
688// finish ensures Finish always returns verror.E.
689func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700690 if fc.finished {
691 return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed"))
692 }
693 fc.finished = true
Todd Wangce3033b2014-05-23 17:04:44 -0700694 // Call closeSend implicitly, if the user hasn't already called it. There are
695 // three cases:
696 // 1) Server is blocked on Recv waiting for the final request message.
697 // 2) Server has already finished processing, the final response message and
698 // out args are queued up on the client, and the flow is closed.
699 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
700 // response and args aren't queued up yet, and the flow isn't closed.
701 //
702 // We must call closeSend to handle case (1) and unblock the server; otherwise
703 // we'll deadlock with both client and server waiting for each other. We must
704 // ignore the error (if any) to handle case (2). In that case the flow is
705 // closed, meaning writes will fail and reads will succeed, and closeSend will
706 // always return an error. But this isn't a "real" error; the client should
707 // read the rest of the results and succeed.
708 _ = fc.closeSend()
709
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700710 // Decode the response header, if it hasn't already been decoded by Recv.
711 if fc.response.Error == nil && !fc.response.EndStreamResults {
712 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700713 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700714 }
715 // The response header must indicate the streaming results have ended.
716 if fc.response.Error == nil && !fc.response.EndStreamResults {
717 return fc.close(errRemainingStreamResults)
718 }
719 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700720
721 // Incorporate any VTrace info that was returned.
722 vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
723
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700724 if fc.response.Error != nil {
Ankure49a86a2014-11-11 18:52:43 -0800725 if verror.Is(fc.response.Error, verror.NoAccess) && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700726 // In case the error was caused by a bad discharge, we do not want to get stuck
727 // with retrying again and again with this discharge. As there is no direct way
728 // to detect it, we conservatively flush all discharges we used from the cache.
729 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Ankur57444f32014-08-13 11:03:39 -0700730 vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Ankure49a86a2014-11-11 18:52:43 -0800731 fc.dc.Invalidate(fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700732 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700733 return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
734 }
735 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Cosmos Nicolaou9c9918d2014-09-23 08:45:56 -0700736 return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700737 }
738 for ix, r := range resultptrs {
739 if err := fc.dec.Decode(r); err != nil {
740 return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err))
741 }
742 }
743 return fc.close(nil)
744}
745
746func (fc *flowClient) Cancel() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700747 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700748 vtrace.FromContext(fc.ctx).Annotate("Cancelled")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700749 fc.flow.Cancel()
750}
Asim Shankar2d731a92014-09-29 17:46:38 -0700751
752func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -0700753 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -0700754}
Asim Shankar220a0152014-10-30 21:21:09 -0700755
Ryan Brown2726b402014-11-04 17:13:27 -0800756func splitObjectName(name string) (mtPattern, serverPattern security.BlessingPattern, objectName string) {
757 objectName = name
758 match := serverPatternRegexp.FindSubmatch([]byte(name))
759 if match != nil {
760 objectName = string(match[2])
761 if naming.Rooted(objectName) {
762 mtPattern = security.BlessingPattern(match[1])
763 } else {
764 serverPattern = security.BlessingPattern(match[1])
765 return
766 }
767 }
768 if !naming.Rooted(objectName) {
769 return
770 }
771
772 address, relative := naming.SplitAddressName(objectName)
773 match = serverPatternRegexp.FindSubmatch([]byte(relative))
774 if match != nil {
775 serverPattern = security.BlessingPattern(match[1])
776 objectName = naming.JoinAddressName(address, string(match[2]))
777 }
778 return
779}