blob: b2bd08781a518b2ea2666d1e796da62ed0f0fab4 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -07006 "math"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07007 "math/rand"
8 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
13 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Asim Shankar5c576c42014-10-01 12:19:12 -070014 isecurity "veyron.io/veyron/veyron/runtimes/google/security"
Jiri Simsa519c5072014-09-17 21:37:57 -070015 "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070016
Jiri Simsa519c5072014-09-17 21:37:57 -070017 "veyron.io/veyron/veyron2/context"
18 "veyron.io/veyron/veyron2/ipc"
19 "veyron.io/veyron/veyron2/ipc/stream"
20 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070021 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070022 "veyron.io/veyron/veyron2/security"
23 "veyron.io/veyron/veyron2/verror"
24 "veyron.io/veyron/veyron2/vlog"
25 "veyron.io/veyron/veyron2/vom"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070026)
27
28var (
Tilak Sharma492e8e92014-09-18 10:58:14 -070029 errNoServers = verror.NoExistf("ipc: no servers")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070030 errFlowClosed = verror.Abortedf("ipc: flow closed")
31 errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
32 errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
33)
34
35type client struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -070036 streamMgr stream.Manager
37 ns naming.Namespace
38 vcOpts []stream.VCOpt // vc opts passed to dial
Jiri Simsa5293dcb2014-05-10 09:56:38 -070039
40 // We support concurrent calls to StartCall and Close, so we must protect the
41 // vcMap. Everything else is initialized upon client construction, and safe
42 // to use concurrently.
43 vcMapMu sync.Mutex
Asim Shankar8f05c222014-10-06 22:08:19 -070044 // TODO(ashankar): The key should be a function of the blessings shared with the server?
45 vcMap map[string]*vcInfo // map key is endpoint.String
Andres Erbsenb7f95f32014-07-07 12:07:56 -070046
47 dischargeCache dischargeCache
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048}
49
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070050var _ ipc.Client = (*client)(nil)
51var _ ipc.BindOpt = (*client)(nil)
52
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053type vcInfo struct {
54 vc stream.VC
55 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -070056}
57
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070058func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070059 c := &client{
Andres Erbsenb7f95f32014-07-07 12:07:56 -070060 streamMgr: streamMgr,
61 ns: ns,
62 vcMap: make(map[string]*vcInfo),
Ankurf044a8d2014-09-05 17:05:24 -070063 dischargeCache: dischargeCache{cache: make(map[string]security.Discharge)},
Jiri Simsa5293dcb2014-05-10 09:56:38 -070064 }
65 for _, opt := range opts {
66 // Collect all client opts that are also vc opts.
67 if vcOpt, ok := opt.(stream.VCOpt); ok {
68 c.vcOpts = append(c.vcOpts, vcOpt)
69 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070070 }
71 return c, nil
72}
73
Bogdan Caprita783f7792014-05-15 09:29:17 -070074func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070075 c.vcMapMu.Lock()
76 defer c.vcMapMu.Unlock()
77 if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -070078 if flow, err := vcinfo.vc.Connect(); err == nil {
79 return flow, nil
80 }
81 // If the vc fails to establish a new flow, we assume it's
82 // broken, remove it from the map, and proceed to establishing
83 // a new vc.
84 // TODO(caprita): Should we distinguish errors due to vc being
85 // closed from other errors? If not, should we call vc.Close()
86 // before removing the vc from the map?
87 delete(c.vcMap, ep.String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070088 }
Robin Thellendee439642014-10-20 14:39:17 -070089 c.vcMapMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070090 vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -070091 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092 if err != nil {
93 return nil, err
94 }
Robin Thellendee439642014-10-20 14:39:17 -070095 if othervc, exists := c.vcMap[ep.String()]; exists {
96 vc = othervc.vc
97 // TODO(ashankar,toddw): Figure out how to close up the VC that
98 // is discarded. vc.Close?
99 } else {
100 c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
101 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700102 return vc.Connect()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103}
104
105// connectFlow parses an endpoint and a suffix out of the server and establishes
106// a flow to the endpoint, returning the parsed suffix.
107// The server name passed in should be a rooted name, of the form "/ep/suffix" or
108// "/ep//suffix", or just "/ep".
109func (c *client) connectFlow(server string) (stream.Flow, string, error) {
110 address, suffix := naming.SplitAddressName(server)
111 if len(address) == 0 {
112 return nil, "", errNonRootedName
113 }
114 ep, err := inaming.NewEndpoint(address)
115 if err != nil {
116 return nil, "", err
117 }
118 if err = version.CheckCompatibility(ep); err != nil {
119 return nil, "", err
120 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700121 flow, err := c.createFlow(ep)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700122 if err != nil {
123 return nil, "", err
124 }
125 return flow, suffix, nil
126}
127
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700128// A randomized exponential backoff. The randomness deters error convoys from forming.
129func backoff(n int, deadline time.Time) bool {
130 b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
131 if b > maxBackoff {
132 b = maxBackoff
133 }
134 r := deadline.Sub(time.Now())
135 if b > r {
136 // We need to leave a little time for the call to start or
137 // we'll just timeout in startCall before we actually do
138 // anything. If we just have a millisecond left, give up.
139 if r <= time.Millisecond {
140 return false
141 }
142 b = r - time.Millisecond
143 }
144 time.Sleep(b)
145 return true
146}
147
148// TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack.
149func retriable(err error) bool {
150 e := err.Error()
151 // Authentication errors are permanent.
152 if strings.Contains(e, "authorized") {
153 return false
154 }
155 // Resolution errors are retriable.
156 if strings.Contains(e, "ipc: Resolve") {
157 return true
158 }
159 // Kernel level errors are retriable.
160 if strings.Contains(e, "errno") {
161 return true
162 }
Matt Rosencrantz0c4032e2014-09-23 12:43:24 -0700163 // Connection refused is retriable.
164 if strings.Contains(e, "connection refused") {
165 return true
166 }
167
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700168 return false
169}
170
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700171func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
172 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700173 if r, ok := o.(options.RetryTimeout); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700174 return time.Duration(r), true
175 }
176 }
177 return 0, false
178}
179
Asim Shankarddc0c222014-07-29 15:47:00 -0700180func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700181 defer vlog.LogCall()()
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700182 // Context specified deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700183 deadline, hasDeadline := ctx.Deadline()
184 if !hasDeadline {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700185 // Default deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700186 deadline = time.Now().Add(defaultCallTimeout)
187 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700188 if r, ok := getRetryTimeoutOpt(opts); ok {
189 // Caller specified deadline.
190 deadline = time.Now().Add(time.Duration(r))
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700191 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700192 var lastErr verror.E
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700193 for retries := 0; ; retries++ {
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700194 if retries != 0 {
195 if !backoff(retries, deadline) {
196 break
197 }
198 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700199 call, err := c.startCall(ctx, name, method, args, opts...)
200 if err == nil {
201 return call, nil
202 }
203 lastErr = err
Nicolas LaCasse27f70412014-10-03 16:51:55 -0700204 if time.Now().After(deadline) || !retriable(err) {
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700205 break
206 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700207 }
208 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700209}
210
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700211func getNoResolveOpt(opts []ipc.CallOpt) bool {
212 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700213 if r, ok := o.(options.NoResolve); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700214 return bool(r)
215 }
216 }
217 return false
218}
219
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700220// startCall ensures StartCall always returns verror.E.
Asim Shankarddc0c222014-07-29 15:47:00 -0700221func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) {
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000222 if ctx == nil {
Asim Shankarddc0c222014-07-29 15:47:00 -0700223 return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000224 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700225 ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method))
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700226 // Resolve name unless told not to.
227 var servers []string
228 if getNoResolveOpt(opts) {
229 servers = []string{name}
230 } else {
231 var err error
232 if servers, err = c.ns.Resolve(ctx, name); err != nil {
233 return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
234 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700235 }
236 // Try all servers, and if none of them are authorized for the call then return the error of the last server
237 // that was tried.
238 var lastErr verror.E
239 for _, server := range servers {
240 flow, suffix, err := c.connectFlow(server)
241 if err != nil {
Tilak Sharma492e8e92014-09-18 10:58:14 -0700242 lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700243 vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
244 continue // Try the next server.
245 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700246 flow.SetDeadline(ctx.Done())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700247
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700248 var serverB []string
249 var grantedB security.Blessings
Asim Shankar8f05c222014-10-06 22:08:19 -0700250 var discharges []security.Discharge
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700251
252 // LocalPrincipal is nil means that the client wanted to avoid authentication,
253 // and thus wanted to skip authorization as well.
Suharsh Sivakumar223f5362014-10-27 14:21:49 -0700254 // TODO(suharshs,ataly,ashankar): Remove flow.LocalID() after the old security model is dead.
255 if flow.LocalPrincipal() != nil || flow.LocalID() != nil {
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700256 // Validate caveats on the server's identity for the context associated with this call.
257 if serverB, grantedB, err = c.authorizeServer(flow, name, suffix, method, opts); err != nil {
258 lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
259 flow.Close()
260 continue
261 }
262 // Fetch any discharges for third-party caveats on the client's blessings.
263 if self := flow.LocalBlessings(); self != nil {
264 if tpcavs := self.ThirdPartyCaveats(); len(tpcavs) > 0 {
265 discharges = c.prepareDischarges(ctx, tpcavs, mkDischargeImpetus(serverB, method, args), opts)
266 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700267 }
268 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700269
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700270 lastErr = nil
Asim Shankar8f05c222014-10-06 22:08:19 -0700271 fc := newFlowClient(ctx, serverB, flow, &c.dischargeCache, discharges)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700272
Matt Rosencrantza9036db2014-09-26 12:52:58 -0700273 if doneChan := ctx.Done(); doneChan != nil {
274 go func() {
Matt Rosencrantz86897932014-10-02 09:34:34 -0700275 select {
276 case <-ctx.Done():
277 fc.Cancel()
278 case <-fc.flow.Closed():
279 }
Matt Rosencrantza9036db2014-09-26 12:52:58 -0700280 }()
281 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700282
Matt Rosencrantz86897932014-10-02 09:34:34 -0700283 timeout := time.Duration(ipc.NoTimeout)
284 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
285 timeout = deadline.Sub(time.Now())
286 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700287 if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700288 return nil, verr
289 }
290 return fc, nil
291 }
292 if lastErr != nil {
David Why Use Two When One Will Do Presottof3f39ae2014-08-27 11:13:27 -0700293 // If there was any problem starting the call, flush the cache entry under the
294 // assumption that it was caused by stale data.
295 c.ns.FlushCacheEntry(name)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700296 return nil, lastErr
297 }
298 return nil, errNoServers
299}
300
Asim Shankar8f05c222014-10-06 22:08:19 -0700301// authorizeServer validates that the server (remote end of flow) has the credentials to serve
302// the RPC name.method for the client (local end of the flow). It returns the blessings at the
303// server that are authorized for this purpose and any blessings that are to be granted to
304// the server (via ipc.Granter implementations in opts.)
305func (c *client) authorizeServer(flow stream.Flow, name, suffix, method string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) {
306 if flow.RemoteID() == nil && flow.RemoteBlessings() == nil {
307 return nil, nil, fmt.Errorf("server has not presented any blessings")
Asim Shankarb54d7642014-06-05 13:08:04 -0700308 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700309 authctx := isecurity.NewContext(isecurity.ContextArgs{
310 LocalID: flow.LocalID(),
311 RemoteID: flow.RemoteID(),
312 Debug: "ClientAuthorizingServer",
313 LocalPrincipal: flow.LocalPrincipal(),
314 LocalBlessings: flow.LocalBlessings(),
315 RemoteBlessings: flow.RemoteBlessings(),
316 /* TODO(ashankar,ataly): Uncomment this! This is disabled till the hack to skip third-party caveat
317 validation on a server's blessings are disabled. Commenting out the next three lines affects more
318 than third-party caveats, so yeah, have to remove this soon!
319 Method: method,
320 Name: name,
321 Suffix: suffix, */
322 LocalEndpoint: flow.LocalEndpoint(),
323 RemoteEndpoint: flow.RemoteEndpoint(),
324 })
325 if serverID := flow.RemoteID(); flow.RemoteBlessings() == nil && serverID != nil {
326 serverID, err = serverID.Authorize(authctx)
327 if err != nil {
328 return nil, nil, err
329 }
330 serverBlessings = serverID.Names()
Asim Shankarb54d7642014-06-05 13:08:04 -0700331 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700332 if server := flow.RemoteBlessings(); server != nil {
333 serverBlessings = server.ForContext(authctx)
334 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700335 for _, o := range opts {
336 switch v := o.(type) {
Asim Shankarcc044212014-10-15 23:25:26 -0700337 case options.RemoteID:
Asim Shankar8f05c222014-10-06 22:08:19 -0700338 if !security.BlessingPattern(v).MatchedBy(serverBlessings...) {
339 return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, v)
Asim Shankarb54d7642014-06-05 13:08:04 -0700340 }
341 case ipc.Granter:
Asim Shankar8f05c222014-10-06 22:08:19 -0700342 if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
343 return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err)
344 } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
345 return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err)
346 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700347 }
348 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700349 return serverBlessings, grantedBlessings, nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700350}
351
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700352func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700353 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700354 c.vcMapMu.Lock()
355 for _, v := range c.vcMap {
356 c.streamMgr.ShutdownEndpoint(v.remoteEP)
357 }
358 c.vcMap = nil
359 c.vcMapMu.Unlock()
360}
361
362// IPCBindOpt makes client implement BindOpt.
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700363func (c *client) IPCBindOpt() {
364 //nologcall
365}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700366
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700367// flowClient implements the RPC client-side protocol for a single RPC, over a
368// flow that's already connected to the server.
369type flowClient struct {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700370 ctx context.T // context to annotate with call details
Asim Shankar1707e432014-05-29 19:42:41 -0700371 dec *vom.Decoder // to decode responses and results from the server
372 enc *vom.Encoder // to encode requests and args to the server
Asim Shankar8f05c222014-10-06 22:08:19 -0700373 server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
Asim Shankar1707e432014-05-29 19:42:41 -0700374 flow stream.Flow // the underlying flow
375 response ipc.Response // each decoded response message is kept here
376
Ankurf044a8d2014-09-05 17:05:24 -0700377 discharges []security.Discharge // discharges used for this request
378 dischargeCache *dischargeCache // client-global discharge cache reference type
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700379
Asim Shankar1707e432014-05-29 19:42:41 -0700380 sendClosedMu sync.Mutex
381 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700382
383 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700384}
385
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700386var _ ipc.Call = (*flowClient)(nil)
387var _ ipc.Stream = (*flowClient)(nil)
388
Asim Shankar8f05c222014-10-06 22:08:19 -0700389func newFlowClient(ctx context.T, server []string, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700390 return &flowClient{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700391 ctx: ctx,
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700392 dec: vom.NewDecoder(flow),
393 enc: vom.NewEncoder(flow),
Asim Shankar8f05c222014-10-06 22:08:19 -0700394 server: server,
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700395 flow: flow,
396 discharges: discharges,
397 dischargeCache: dischargeCache,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700398 }
399}
400
401func (fc *flowClient) close(verr verror.E) verror.E {
402 if err := fc.flow.Close(); err != nil && verr == nil {
403 verr = verror.Internalf("ipc: flow close failed: %v", err)
404 }
405 return verr
406}
407
Asim Shankar8f05c222014-10-06 22:08:19 -0700408func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700409 req := ipc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700410 Suffix: suffix,
411 Method: method,
412 NumPosArgs: uint64(len(args)),
413 Timeout: int64(timeout),
414 GrantedBlessings: security.MarshalBlessings(blessings),
415 NumDischarges: uint64(len(fc.discharges)),
416 TraceRequest: vtrace.Request(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700417 }
418 if err := fc.enc.Encode(req); err != nil {
419 return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
420 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700421 for _, d := range fc.discharges {
422 if err := fc.enc.Encode(d); err != nil {
Ankurf044a8d2014-09-05 17:05:24 -0700423 return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err))
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700424 }
425 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700426 for ix, arg := range args {
427 if err := fc.enc.Encode(arg); err != nil {
428 return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
429 }
430 }
431 return nil
432}
433
434func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700435 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700436 if fc.sendClosed {
437 return errFlowClosed
438 }
439
440 // The empty request header indicates what follows is a streaming arg.
441 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700442 return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700443 }
444 if err := fc.enc.Encode(item); err != nil {
445 return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
446 }
447 return nil
448}
449
450func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700451 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700452 switch {
453 case fc.response.Error != nil:
454 return fc.response.Error
455 case fc.response.EndStreamResults:
456 return io.EOF
457 }
458
459 // Decode the response header and handle errors and EOF.
460 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700461 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700462 }
463 if fc.response.Error != nil {
464 return fc.response.Error
465 }
466 if fc.response.EndStreamResults {
467 // Return EOF to indicate to the caller that there are no more stream
468 // results. Any error sent by the server is kept in fc.response.Error, and
469 // returned to the user in Finish.
470 return io.EOF
471 }
472 // Decode the streaming result.
473 if err := fc.dec.Decode(itemptr); err != nil {
474 return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err))
475 }
476 return nil
477}
478
479func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700480 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700481 return fc.closeSend()
482}
483
484// closeSend ensures CloseSend always returns verror.E.
485func (fc *flowClient) closeSend() verror.E {
Asim Shankar1707e432014-05-29 19:42:41 -0700486 fc.sendClosedMu.Lock()
487 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700488 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700489 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700490 }
491 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700492 // TODO(caprita): Indiscriminately closing the flow below causes
493 // a race as described in:
494 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
495 //
496 // There should be a finer grained way to fix this (for example,
497 // encoding errors should probably still result in closing the
498 // flow); on the flip side, there may exist other instances
499 // where we are closing the flow but should not.
500 //
501 // For now, commenting out the line below removes the flakiness
502 // from our existing unit tests, but this needs to be revisited
503 // and fixed correctly.
504 //
505 // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700506 }
507 fc.sendClosed = true
508 return nil
509}
510
511func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700512 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700513 err := fc.finish(resultptrs...)
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700514 vtrace.FromContext(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700515 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700516}
517
518// finish ensures Finish always returns verror.E.
519func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700520 if fc.finished {
521 return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed"))
522 }
523 fc.finished = true
Todd Wangce3033b2014-05-23 17:04:44 -0700524 // Call closeSend implicitly, if the user hasn't already called it. There are
525 // three cases:
526 // 1) Server is blocked on Recv waiting for the final request message.
527 // 2) Server has already finished processing, the final response message and
528 // out args are queued up on the client, and the flow is closed.
529 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
530 // response and args aren't queued up yet, and the flow isn't closed.
531 //
532 // We must call closeSend to handle case (1) and unblock the server; otherwise
533 // we'll deadlock with both client and server waiting for each other. We must
534 // ignore the error (if any) to handle case (2). In that case the flow is
535 // closed, meaning writes will fail and reads will succeed, and closeSend will
536 // always return an error. But this isn't a "real" error; the client should
537 // read the rest of the results and succeed.
538 _ = fc.closeSend()
539
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700540 // Decode the response header, if it hasn't already been decoded by Recv.
541 if fc.response.Error == nil && !fc.response.EndStreamResults {
542 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700543 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700544 }
545 // The response header must indicate the streaming results have ended.
546 if fc.response.Error == nil && !fc.response.EndStreamResults {
547 return fc.close(errRemainingStreamResults)
548 }
549 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700550
551 // Incorporate any VTrace info that was returned.
552 vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
553
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700554 if fc.response.Error != nil {
Tilak Sharma492e8e92014-09-18 10:58:14 -0700555 if verror.Is(fc.response.Error, verror.NoAccess) && fc.dischargeCache != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700556 // In case the error was caused by a bad discharge, we do not want to get stuck
557 // with retrying again and again with this discharge. As there is no direct way
558 // to detect it, we conservatively flush all discharges we used from the cache.
559 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Ankur57444f32014-08-13 11:03:39 -0700560 vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700561 fc.dischargeCache.Invalidate(fc.discharges...)
562 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700563 return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
564 }
565 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Cosmos Nicolaou9c9918d2014-09-23 08:45:56 -0700566 return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700567 }
568 for ix, r := range resultptrs {
569 if err := fc.dec.Decode(r); err != nil {
570 return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err))
571 }
572 }
573 return fc.close(nil)
574}
575
576func (fc *flowClient) Cancel() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700577 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700578 vtrace.FromContext(fc.ctx).Annotate("Cancelled")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700579 fc.flow.Cancel()
580}
Asim Shankar2d731a92014-09-29 17:46:38 -0700581
582func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -0700583 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -0700584}