blob: 5782fe8ffa9cdffae68fa6703eaa614556fbc119 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -07006 "math"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07007 "math/rand"
8 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
13 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Asim Shankar5c576c42014-10-01 12:19:12 -070014 isecurity "veyron.io/veyron/veyron/runtimes/google/security"
Jiri Simsa519c5072014-09-17 21:37:57 -070015 "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070016
Jiri Simsa519c5072014-09-17 21:37:57 -070017 "veyron.io/veyron/veyron2/context"
18 "veyron.io/veyron/veyron2/ipc"
19 "veyron.io/veyron/veyron2/ipc/stream"
20 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070021 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070022 "veyron.io/veyron/veyron2/security"
23 "veyron.io/veyron/veyron2/verror"
24 "veyron.io/veyron/veyron2/vlog"
25 "veyron.io/veyron/veyron2/vom"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070026)
27
28var (
Tilak Sharma492e8e92014-09-18 10:58:14 -070029 errNoServers = verror.NoExistf("ipc: no servers")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070030 errFlowClosed = verror.Abortedf("ipc: flow closed")
31 errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
32 errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
33)
34
35type client struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -070036 streamMgr stream.Manager
37 ns naming.Namespace
38 vcOpts []stream.VCOpt // vc opts passed to dial
Jiri Simsa5293dcb2014-05-10 09:56:38 -070039
40 // We support concurrent calls to StartCall and Close, so we must protect the
41 // vcMap. Everything else is initialized upon client construction, and safe
42 // to use concurrently.
43 vcMapMu sync.Mutex
Asim Shankar8f05c222014-10-06 22:08:19 -070044 // TODO(ashankar): The key should be a function of the blessings shared with the server?
45 vcMap map[string]*vcInfo // map key is endpoint.String
Andres Erbsenb7f95f32014-07-07 12:07:56 -070046
47 dischargeCache dischargeCache
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048}
49
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070050var _ ipc.Client = (*client)(nil)
51var _ ipc.BindOpt = (*client)(nil)
52
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053type vcInfo struct {
54 vc stream.VC
55 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -070056}
57
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070058func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070059 c := &client{
Andres Erbsenb7f95f32014-07-07 12:07:56 -070060 streamMgr: streamMgr,
61 ns: ns,
62 vcMap: make(map[string]*vcInfo),
Ankurf044a8d2014-09-05 17:05:24 -070063 dischargeCache: dischargeCache{cache: make(map[string]security.Discharge)},
Jiri Simsa5293dcb2014-05-10 09:56:38 -070064 }
65 for _, opt := range opts {
66 // Collect all client opts that are also vc opts.
67 if vcOpt, ok := opt.(stream.VCOpt); ok {
68 c.vcOpts = append(c.vcOpts, vcOpt)
69 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070070 }
71 return c, nil
72}
73
Bogdan Caprita783f7792014-05-15 09:29:17 -070074func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070075 c.vcMapMu.Lock()
76 defer c.vcMapMu.Unlock()
77 if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -070078 if flow, err := vcinfo.vc.Connect(); err == nil {
79 return flow, nil
80 }
81 // If the vc fails to establish a new flow, we assume it's
82 // broken, remove it from the map, and proceed to establishing
83 // a new vc.
84 // TODO(caprita): Should we distinguish errors due to vc being
85 // closed from other errors? If not, should we call vc.Close()
86 // before removing the vc from the map?
87 delete(c.vcMap, ep.String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070088 }
89 vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
90 if err != nil {
91 return nil, err
92 }
Bogdan Caprita783f7792014-05-15 09:29:17 -070093 c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
94 return vc.Connect()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095}
96
97// connectFlow parses an endpoint and a suffix out of the server and establishes
98// a flow to the endpoint, returning the parsed suffix.
99// The server name passed in should be a rooted name, of the form "/ep/suffix" or
100// "/ep//suffix", or just "/ep".
101func (c *client) connectFlow(server string) (stream.Flow, string, error) {
102 address, suffix := naming.SplitAddressName(server)
103 if len(address) == 0 {
104 return nil, "", errNonRootedName
105 }
106 ep, err := inaming.NewEndpoint(address)
107 if err != nil {
108 return nil, "", err
109 }
110 if err = version.CheckCompatibility(ep); err != nil {
111 return nil, "", err
112 }
Bogdan Caprita783f7792014-05-15 09:29:17 -0700113 flow, err := c.createFlow(ep)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700114 if err != nil {
115 return nil, "", err
116 }
117 return flow, suffix, nil
118}
119
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700120// A randomized exponential backoff. The randomness deters error convoys from forming.
121func backoff(n int, deadline time.Time) bool {
122 b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
123 if b > maxBackoff {
124 b = maxBackoff
125 }
126 r := deadline.Sub(time.Now())
127 if b > r {
128 // We need to leave a little time for the call to start or
129 // we'll just timeout in startCall before we actually do
130 // anything. If we just have a millisecond left, give up.
131 if r <= time.Millisecond {
132 return false
133 }
134 b = r - time.Millisecond
135 }
136 time.Sleep(b)
137 return true
138}
139
140// TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack.
141func retriable(err error) bool {
142 e := err.Error()
143 // Authentication errors are permanent.
144 if strings.Contains(e, "authorized") {
145 return false
146 }
147 // Resolution errors are retriable.
148 if strings.Contains(e, "ipc: Resolve") {
149 return true
150 }
151 // Kernel level errors are retriable.
152 if strings.Contains(e, "errno") {
153 return true
154 }
Matt Rosencrantz0c4032e2014-09-23 12:43:24 -0700155 // Connection refused is retriable.
156 if strings.Contains(e, "connection refused") {
157 return true
158 }
159
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700160 return false
161}
162
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700163func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
164 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700165 if r, ok := o.(options.RetryTimeout); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700166 return time.Duration(r), true
167 }
168 }
169 return 0, false
170}
171
Asim Shankarddc0c222014-07-29 15:47:00 -0700172func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700173 defer vlog.LogCall()()
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700174 // Context specified deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700175 deadline, hasDeadline := ctx.Deadline()
176 if !hasDeadline {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700177 // Default deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700178 deadline = time.Now().Add(defaultCallTimeout)
179 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700180 if r, ok := getRetryTimeoutOpt(opts); ok {
181 // Caller specified deadline.
182 deadline = time.Now().Add(time.Duration(r))
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700183 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700184 var lastErr verror.E
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700185 for retries := 0; ; retries++ {
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700186 if retries != 0 {
187 if !backoff(retries, deadline) {
188 break
189 }
190 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700191 call, err := c.startCall(ctx, name, method, args, opts...)
192 if err == nil {
193 return call, nil
194 }
195 lastErr = err
Nicolas LaCasse27f70412014-10-03 16:51:55 -0700196 if time.Now().After(deadline) || !retriable(err) {
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700197 break
198 }
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700199 }
200 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700201}
202
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700203func getNoResolveOpt(opts []ipc.CallOpt) bool {
204 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700205 if r, ok := o.(options.NoResolve); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700206 return bool(r)
207 }
208 }
209 return false
210}
211
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700212// startCall ensures StartCall always returns verror.E.
Asim Shankarddc0c222014-07-29 15:47:00 -0700213func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) {
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000214 if ctx == nil {
Asim Shankarddc0c222014-07-29 15:47:00 -0700215 return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
Matt Rosencrantz7e68d5a2014-06-11 15:28:51 +0000216 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700217 ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method))
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700218 // Resolve name unless told not to.
219 var servers []string
220 if getNoResolveOpt(opts) {
221 servers = []string{name}
222 } else {
223 var err error
224 if servers, err = c.ns.Resolve(ctx, name); err != nil {
225 return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
226 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700227 }
228 // Try all servers, and if none of them are authorized for the call then return the error of the last server
229 // that was tried.
230 var lastErr verror.E
231 for _, server := range servers {
232 flow, suffix, err := c.connectFlow(server)
233 if err != nil {
Tilak Sharma492e8e92014-09-18 10:58:14 -0700234 lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700235 vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
236 continue // Try the next server.
237 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700238 flow.SetDeadline(ctx.Done())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700239
240 // Validate caveats on the server's identity for the context associated with this call.
Asim Shankar8f05c222014-10-06 22:08:19 -0700241 serverB, grantedB, err := c.authorizeServer(flow, name, suffix, method, opts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700242 if err != nil {
Asim Shankar8f05c222014-10-06 22:08:19 -0700243 lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
Asim Shankarb54d7642014-06-05 13:08:04 -0700244 flow.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700245 continue
246 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700247 // Fetch any discharges for third-party caveats on the client's blessings.
248 var discharges []security.Discharge
249 if self := flow.LocalBlessings(); self != nil {
250 if tpcavs := self.ThirdPartyCaveats(); len(tpcavs) > 0 {
251 discharges = c.prepareDischarges(ctx, tpcavs, mkDischargeImpetus(serverB, method, args), opts)
252 }
253 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700254
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700255 lastErr = nil
Asim Shankar8f05c222014-10-06 22:08:19 -0700256 fc := newFlowClient(ctx, serverB, flow, &c.dischargeCache, discharges)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700257
Matt Rosencrantza9036db2014-09-26 12:52:58 -0700258 if doneChan := ctx.Done(); doneChan != nil {
259 go func() {
Matt Rosencrantz86897932014-10-02 09:34:34 -0700260 select {
261 case <-ctx.Done():
262 fc.Cancel()
263 case <-fc.flow.Closed():
264 }
Matt Rosencrantza9036db2014-09-26 12:52:58 -0700265 }()
266 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700267
Matt Rosencrantz86897932014-10-02 09:34:34 -0700268 timeout := time.Duration(ipc.NoTimeout)
269 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
270 timeout = deadline.Sub(time.Now())
271 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700272 if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700273 return nil, verr
274 }
275 return fc, nil
276 }
277 if lastErr != nil {
David Why Use Two When One Will Do Presottof3f39ae2014-08-27 11:13:27 -0700278 // If there was any problem starting the call, flush the cache entry under the
279 // assumption that it was caused by stale data.
280 c.ns.FlushCacheEntry(name)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700281 return nil, lastErr
282 }
283 return nil, errNoServers
284}
285
Asim Shankar8f05c222014-10-06 22:08:19 -0700286// authorizeServer validates that the server (remote end of flow) has the credentials to serve
287// the RPC name.method for the client (local end of the flow). It returns the blessings at the
288// server that are authorized for this purpose and any blessings that are to be granted to
289// the server (via ipc.Granter implementations in opts.)
290func (c *client) authorizeServer(flow stream.Flow, name, suffix, method string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) {
291 if flow.RemoteID() == nil && flow.RemoteBlessings() == nil {
292 return nil, nil, fmt.Errorf("server has not presented any blessings")
Asim Shankarb54d7642014-06-05 13:08:04 -0700293 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700294 authctx := isecurity.NewContext(isecurity.ContextArgs{
295 LocalID: flow.LocalID(),
296 RemoteID: flow.RemoteID(),
297 Debug: "ClientAuthorizingServer",
298 LocalPrincipal: flow.LocalPrincipal(),
299 LocalBlessings: flow.LocalBlessings(),
300 RemoteBlessings: flow.RemoteBlessings(),
301 /* TODO(ashankar,ataly): Uncomment this! This is disabled till the hack to skip third-party caveat
302 validation on a server's blessings are disabled. Commenting out the next three lines affects more
303 than third-party caveats, so yeah, have to remove this soon!
304 Method: method,
305 Name: name,
306 Suffix: suffix, */
307 LocalEndpoint: flow.LocalEndpoint(),
308 RemoteEndpoint: flow.RemoteEndpoint(),
309 })
310 if serverID := flow.RemoteID(); flow.RemoteBlessings() == nil && serverID != nil {
311 serverID, err = serverID.Authorize(authctx)
312 if err != nil {
313 return nil, nil, err
314 }
315 serverBlessings = serverID.Names()
Asim Shankarb54d7642014-06-05 13:08:04 -0700316 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700317 if server := flow.RemoteBlessings(); server != nil {
318 serverBlessings = server.ForContext(authctx)
319 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700320 for _, o := range opts {
321 switch v := o.(type) {
Asim Shankarcc044212014-10-15 23:25:26 -0700322 case options.RemoteID:
Asim Shankar8f05c222014-10-06 22:08:19 -0700323 if !security.BlessingPattern(v).MatchedBy(serverBlessings...) {
324 return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, v)
Asim Shankarb54d7642014-06-05 13:08:04 -0700325 }
326 case ipc.Granter:
Asim Shankar8f05c222014-10-06 22:08:19 -0700327 if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
328 return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err)
329 } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
330 return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err)
331 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700332 }
333 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700334 return serverBlessings, grantedBlessings, nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700335}
336
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700337func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700338 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700339 c.vcMapMu.Lock()
340 for _, v := range c.vcMap {
341 c.streamMgr.ShutdownEndpoint(v.remoteEP)
342 }
343 c.vcMap = nil
344 c.vcMapMu.Unlock()
345}
346
347// IPCBindOpt makes client implement BindOpt.
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700348func (c *client) IPCBindOpt() {
349 //nologcall
350}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700351
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700352// flowClient implements the RPC client-side protocol for a single RPC, over a
353// flow that's already connected to the server.
354type flowClient struct {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700355 ctx context.T // context to annotate with call details
Asim Shankar1707e432014-05-29 19:42:41 -0700356 dec *vom.Decoder // to decode responses and results from the server
357 enc *vom.Encoder // to encode requests and args to the server
Asim Shankar8f05c222014-10-06 22:08:19 -0700358 server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
Asim Shankar1707e432014-05-29 19:42:41 -0700359 flow stream.Flow // the underlying flow
360 response ipc.Response // each decoded response message is kept here
361
Ankurf044a8d2014-09-05 17:05:24 -0700362 discharges []security.Discharge // discharges used for this request
363 dischargeCache *dischargeCache // client-global discharge cache reference type
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700364
Asim Shankar1707e432014-05-29 19:42:41 -0700365 sendClosedMu sync.Mutex
366 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700367
368 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700369}
370
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700371var _ ipc.Call = (*flowClient)(nil)
372var _ ipc.Stream = (*flowClient)(nil)
373
Asim Shankar8f05c222014-10-06 22:08:19 -0700374func newFlowClient(ctx context.T, server []string, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700375 return &flowClient{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700376 ctx: ctx,
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700377 dec: vom.NewDecoder(flow),
378 enc: vom.NewEncoder(flow),
Asim Shankar8f05c222014-10-06 22:08:19 -0700379 server: server,
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700380 flow: flow,
381 discharges: discharges,
382 dischargeCache: dischargeCache,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700383 }
384}
385
386func (fc *flowClient) close(verr verror.E) verror.E {
387 if err := fc.flow.Close(); err != nil && verr == nil {
388 verr = verror.Internalf("ipc: flow close failed: %v", err)
389 }
390 return verr
391}
392
Asim Shankar8f05c222014-10-06 22:08:19 -0700393func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700394 req := ipc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700395 Suffix: suffix,
396 Method: method,
397 NumPosArgs: uint64(len(args)),
398 Timeout: int64(timeout),
399 GrantedBlessings: security.MarshalBlessings(blessings),
400 NumDischarges: uint64(len(fc.discharges)),
401 TraceRequest: vtrace.Request(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700402 }
403 if err := fc.enc.Encode(req); err != nil {
404 return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
405 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700406 for _, d := range fc.discharges {
407 if err := fc.enc.Encode(d); err != nil {
Ankurf044a8d2014-09-05 17:05:24 -0700408 return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err))
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700409 }
410 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700411 for ix, arg := range args {
412 if err := fc.enc.Encode(arg); err != nil {
413 return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
414 }
415 }
416 return nil
417}
418
419func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700420 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700421 if fc.sendClosed {
422 return errFlowClosed
423 }
424
425 // The empty request header indicates what follows is a streaming arg.
426 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700427 return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700428 }
429 if err := fc.enc.Encode(item); err != nil {
430 return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
431 }
432 return nil
433}
434
435func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700436 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700437 switch {
438 case fc.response.Error != nil:
439 return fc.response.Error
440 case fc.response.EndStreamResults:
441 return io.EOF
442 }
443
444 // Decode the response header and handle errors and EOF.
445 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700446 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700447 }
448 if fc.response.Error != nil {
449 return fc.response.Error
450 }
451 if fc.response.EndStreamResults {
452 // Return EOF to indicate to the caller that there are no more stream
453 // results. Any error sent by the server is kept in fc.response.Error, and
454 // returned to the user in Finish.
455 return io.EOF
456 }
457 // Decode the streaming result.
458 if err := fc.dec.Decode(itemptr); err != nil {
459 return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err))
460 }
461 return nil
462}
463
464func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700465 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700466 return fc.closeSend()
467}
468
469// closeSend ensures CloseSend always returns verror.E.
470func (fc *flowClient) closeSend() verror.E {
Asim Shankar1707e432014-05-29 19:42:41 -0700471 fc.sendClosedMu.Lock()
472 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700473 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700474 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700475 }
476 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700477 // TODO(caprita): Indiscriminately closing the flow below causes
478 // a race as described in:
479 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
480 //
481 // There should be a finer grained way to fix this (for example,
482 // encoding errors should probably still result in closing the
483 // flow); on the flip side, there may exist other instances
484 // where we are closing the flow but should not.
485 //
486 // For now, commenting out the line below removes the flakiness
487 // from our existing unit tests, but this needs to be revisited
488 // and fixed correctly.
489 //
490 // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700491 }
492 fc.sendClosed = true
493 return nil
494}
495
496func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700497 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700498 err := fc.finish(resultptrs...)
499 vtrace.FromContext(fc.ctx).Annotate("Finished")
500 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700501}
502
503// finish ensures Finish always returns verror.E.
504func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700505 if fc.finished {
506 return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed"))
507 }
508 fc.finished = true
Todd Wangce3033b2014-05-23 17:04:44 -0700509 // Call closeSend implicitly, if the user hasn't already called it. There are
510 // three cases:
511 // 1) Server is blocked on Recv waiting for the final request message.
512 // 2) Server has already finished processing, the final response message and
513 // out args are queued up on the client, and the flow is closed.
514 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
515 // response and args aren't queued up yet, and the flow isn't closed.
516 //
517 // We must call closeSend to handle case (1) and unblock the server; otherwise
518 // we'll deadlock with both client and server waiting for each other. We must
519 // ignore the error (if any) to handle case (2). In that case the flow is
520 // closed, meaning writes will fail and reads will succeed, and closeSend will
521 // always return an error. But this isn't a "real" error; the client should
522 // read the rest of the results and succeed.
523 _ = fc.closeSend()
524
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700525 // Decode the response header, if it hasn't already been decoded by Recv.
526 if fc.response.Error == nil && !fc.response.EndStreamResults {
527 if err := fc.dec.Decode(&fc.response); err != nil {
Benjamin Prosnitz3db40a12014-06-09 10:10:59 -0700528 return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700529 }
530 // The response header must indicate the streaming results have ended.
531 if fc.response.Error == nil && !fc.response.EndStreamResults {
532 return fc.close(errRemainingStreamResults)
533 }
534 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700535
536 // Incorporate any VTrace info that was returned.
537 vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
538
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700539 if fc.response.Error != nil {
Tilak Sharma492e8e92014-09-18 10:58:14 -0700540 if verror.Is(fc.response.Error, verror.NoAccess) && fc.dischargeCache != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700541 // In case the error was caused by a bad discharge, we do not want to get stuck
542 // with retrying again and again with this discharge. As there is no direct way
543 // to detect it, we conservatively flush all discharges we used from the cache.
544 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Ankur57444f32014-08-13 11:03:39 -0700545 vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700546 fc.dischargeCache.Invalidate(fc.discharges...)
547 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700548 return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
549 }
550 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Cosmos Nicolaou9c9918d2014-09-23 08:45:56 -0700551 return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700552 }
553 for ix, r := range resultptrs {
554 if err := fc.dec.Decode(r); err != nil {
555 return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err))
556 }
557 }
558 return fc.close(nil)
559}
560
561func (fc *flowClient) Cancel() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700562 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700563 vtrace.FromContext(fc.ctx).Annotate("Cancelled")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700564 fc.flow.Cancel()
565}
Asim Shankar2d731a92014-09-29 17:46:38 -0700566
567func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -0700568 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -0700569}