blob: 8de92b7f72468bea83a9c7173f0b2e10a7305b9d [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07005package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006
7import (
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "io"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -070010 "math/rand"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080011 "net"
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -080012 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070013 "sync"
14 "time"
15
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -070016 "v.io/x/lib/vlog"
17
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -070018 "v.io/v23"
Jiri Simsa6ac95222015-02-23 16:11:49 -080019 "v.io/v23/context"
20 "v.io/v23/i18n"
Todd Wang5082a552015-04-02 10:56:11 -070021 "v.io/v23/namespace"
Jiri Simsa6ac95222015-02-23 16:11:49 -080022 "v.io/v23/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070023 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080024 "v.io/v23/security"
25 "v.io/v23/vdl"
Todd Wangf6a06882015-02-27 17:38:01 -080026 vtime "v.io/v23/vdlroot/time"
Jiri Simsa6ac95222015-02-23 16:11:49 -080027 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080028 "v.io/v23/vom"
29 "v.io/v23/vtrace"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080030
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080031 inaming "v.io/x/ref/profiles/internal/naming"
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -070032 "v.io/x/ref/profiles/internal/rpc/stream"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070033 "v.io/x/ref/profiles/internal/rpc/stream/vc"
34 "v.io/x/ref/profiles/internal/rpc/version"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070035)
36
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070037const pkgPath = "v.io/x/ref/profiles/internal/rpc"
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080038
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070039func reg(id, msg string) verror.IDAction {
40 // Note: the error action is never used and is instead computed
41 // at a higher level. The errors here are purely for informational
42 // purposes.
43 return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
44}
45
Jiri Simsa5293dcb2014-05-10 09:56:38 -070046var (
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070047 // These errors are intended to be used as arguments to higher
48 // level errors and hence {1}{2} is omitted from their format
49 // strings to avoid repeating these n-times in the final error
50 // message visible to the user.
51 errClientCloseAlreadyCalled = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called")
52 errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called")
53 errNonRootedName = reg(".errNonRootedName", "{3} does not appear to contain an address")
54 errInvalidEndpoint = reg(".errInvalidEndpoint", "failed to parse endpoint")
55 errIncompatibleEndpoint = reg(".errIncompatibleEndpoint", "incompatible endpoint")
56 errVomEncoder = reg(".errVomEncoder", "failed to create vom encoder{:3}")
57 errVomDecoder = reg(".errVomDecoder", "failed to create vom decoder{:3}")
58 errRequestEncoding = reg(".errRequestEncoding", "failed to encode request {3}{:4}")
59 errDischargeEncoding = reg(".errDischargeEncoding", "failed to encode discharges {:3}")
60 errBlessingEncoding = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}")
61 errArgEncoding = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}")
62 errMismatchedResults = reg(".errMismatchedResults", "got {3} results, but want {4}")
63 errResultDecoding = reg(".errResultDecoding", "failed to decode result #{3}{:4}")
64 errResponseDecoding = reg(".errResponseDecoding", "failed to decode response{:3}")
65 errRemainingStreamResults = reg(".errRemaingStreamResults", "stream closed with remaining stream results")
66 errNoBlessingsForPeer = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}")
67 errBlessingGrant = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}")
68 errBlessingAdd = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}")
69 errServerAuthorizeFailed = reg(".errServerAuthorizedFailed", "failed to authorized flow with remote blessings{:3} {:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080070
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070071 errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080072
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070073 errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}")
74 errNoPrincipal = reg(".errNoPrincipal", "principal required for secure connections")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070075)
76
77type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080078 streamMgr stream.Manager
Todd Wang5082a552015-04-02 10:56:11 -070079 ns namespace.T
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080080 vcOpts []stream.VCOpt // vc opts passed to dial
81 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070082
Jungho Ahn25545d32015-01-26 15:14:14 -080083 // We cache the IP networks on the device since it is not that cheap to read
84 // network interfaces through os syscall.
85 // TODO(jhahn): Add monitoring the network interface changes.
86 ipNets []*net.IPNet
87
Jiri Simsa5293dcb2014-05-10 09:56:38 -070088 // We support concurrent calls to StartCall and Close, so we must protect the
89 // vcMap. Everything else is initialized upon client construction, and safe
90 // to use concurrently.
91 vcMapMu sync.Mutex
Suharsh Sivakumarae774a52015-01-09 14:26:32 -080092 vcMap map[vcMapKey]*vcInfo
Andres Erbsenb7f95f32014-07-07 12:07:56 -070093
Ankure49a86a2014-11-11 18:52:43 -080094 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095}
96
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070097var _ rpc.Client = (*client)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070098
Jiri Simsa5293dcb2014-05-10 09:56:38 -070099type vcInfo struct {
100 vc stream.VC
101 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700102}
103
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800104type vcMapKey struct {
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700105 endpoint string
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700106 clientPublicKey string // clientPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800107}
108
Todd Wang5082a552015-04-02 10:56:11 -0700109func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700110 c := &client{
Ankure49a86a2014-11-11 18:52:43 -0800111 streamMgr: streamMgr,
112 ns: ns,
Jungho Ahn25545d32015-01-26 15:14:14 -0800113 ipNets: ipNetworks(),
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800114 vcMap: make(map[vcMapKey]*vcInfo),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 }
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800116 c.dc = InternalNewDischargeClient(nil, c, 0)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700117 for _, opt := range opts {
118 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800119 switch v := opt.(type) {
120 case stream.VCOpt:
121 c.vcOpts = append(c.vcOpts, v)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800122 case PreferredProtocols:
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800123 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700124 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700125 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800126
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700127 return c, nil
128}
129
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700130func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700131 c.vcMapMu.Lock()
132 defer c.vcMapMu.Unlock()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700133
134 suberr := func(err error) *verror.SubErr {
135 return &verror.SubErr{Err: err, Options: verror.Print}
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800136 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700137
138 if c.vcMap == nil {
139 return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
140 }
141
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700142 vcKey := vcMapKey{endpoint: ep.String()}
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700143 if principal != nil {
144 vcKey.clientPublicKey = principal.PublicKey().String()
145 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800146 if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700147 if flow, err := vcinfo.vc.Connect(); err == nil {
148 return flow, nil
149 }
150 // If the vc fails to establish a new flow, we assume it's
151 // broken, remove it from the map, and proceed to establishing
152 // a new vc.
153 // TODO(caprita): Should we distinguish errors due to vc being
154 // closed from other errors? If not, should we call vc.Close()
155 // before removing the vc from the map?
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800156 delete(c.vcMap, vcKey)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700157 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800158 sm := c.streamMgr
Robin Thellendee439642014-10-20 14:39:17 -0700159 c.vcMapMu.Unlock()
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700160 vc, err := sm.Dial(ep, principal, vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -0700161 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700162 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700163 return nil, suberr(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700164 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800165 if c.vcMap == nil {
166 sm.ShutdownEndpoint(ep)
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700167 return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800168 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800169 if othervc, exists := c.vcMap[vcKey]; exists {
Robin Thellendee439642014-10-20 14:39:17 -0700170 vc = othervc.vc
171 // TODO(ashankar,toddw): Figure out how to close up the VC that
172 // is discarded. vc.Close?
173 } else {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800174 c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
Robin Thellendee439642014-10-20 14:39:17 -0700175 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700176 flow, err := vc.Connect()
177 if err != nil {
178 return nil, suberr(err)
179 }
180 return flow, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700181}
182
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700183// A randomized exponential backoff. The randomness deters error convoys
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700184// from forming. The first time you retry n should be 0, then 1 etc.
185func backoff(n uint, deadline time.Time) bool {
186 // This is ((100 to 200) * 2^n) ms.
187 b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700188 if b > maxBackoff {
189 b = maxBackoff
190 }
191 r := deadline.Sub(time.Now())
192 if b > r {
193 // We need to leave a little time for the call to start or
194 // we'll just timeout in startCall before we actually do
195 // anything. If we just have a millisecond left, give up.
196 if r <= time.Millisecond {
197 return false
198 }
199 b = r - time.Millisecond
200 }
201 time.Sleep(b)
202 return true
203}
204
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700205func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800206 defer vlog.LogCall()()
207 return c.startCall(ctx, name, method, args, opts)
208}
209
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700210func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
211 defer vlog.LogCall()()
212
213 deadline := getDeadline(ctx, opts)
214
215 var lastErr error
216 for retries := uint(0); ; retries++ {
217 call, err := c.startCall(ctx, name, method, inArgs, opts)
218 if err != nil {
219 return err
220 }
221 err = call.Finish(outArgs...)
222 if err == nil {
223 return nil
224 }
225 lastErr = err
226 // We only retry if RetryBackoff is returned by the application because other
227 // RetryConnection and RetryRefetch required actions by the client before
228 // retrying.
229 if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
Suharsh Sivakumarc1641f62015-04-10 13:09:22 -0700230 vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700231 break
232 }
233 if !backoff(retries, deadline) {
234 break
235 }
Suharsh Sivakumarc1641f62015-04-10 13:09:22 -0700236 vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700237 }
238 return lastErr
239}
240
241func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
242 // Context specified deadline.
243 deadline, hasDeadline := ctx.Deadline()
244 if !hasDeadline {
245 // Default deadline.
246 deadline = time.Now().Add(defaultCallTimeout)
247 }
248 if r, ok := getRetryTimeoutOpt(opts); ok {
249 // Caller specified deadline.
250 deadline = time.Now().Add(r)
251 }
252 return deadline
253}
254
255func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
256 switch {
257 case noRetry(opts):
258 return false
259 case action != verror.RetryBackoff:
260 return false
261 case time.Now().After(deadline):
262 return false
263 }
264 return true
265}
266
267func shouldRetry(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
268 switch {
269 case noRetry(opts):
270 return false
271 case action != verror.RetryConnection && action != verror.RetryRefetch:
272 return false
273 case time.Now().After(deadline):
274 return false
275 case action == verror.RetryRefetch && getNoNamespaceOpt(opts):
276 // If we're skipping resolution and there are no servers for
277 // this call retrying is not going to help, we can't come up
278 // with new servers if there is no resolution.
279 return false
280 }
281 return true
282}
283
Todd Wangb31da592015-02-20 12:50:39 -0800284func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) {
Ankure49a86a2014-11-11 18:52:43 -0800285 var impetus security.DischargeImpetus
286 if len(serverBlessings) > 0 {
287 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
288 for i, b := range serverBlessings {
289 impetus.Server[i] = security.BlessingPattern(b)
290 }
291 }
292 impetus.Method = method
293 if len(args) > 0 {
Todd Wangb31da592015-02-20 12:50:39 -0800294 impetus.Arguments = make([]*vdl.Value, len(args))
Ankure49a86a2014-11-11 18:52:43 -0800295 for i, a := range args {
Todd Wangb31da592015-02-20 12:50:39 -0800296 vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a))
297 if err != nil {
298 return security.DischargeImpetus{}, err
299 }
300 impetus.Arguments[i] = vArg
Ankure49a86a2014-11-11 18:52:43 -0800301 }
302 }
Todd Wangb31da592015-02-20 12:50:39 -0800303 return impetus, nil
Ankure49a86a2014-11-11 18:52:43 -0800304}
305
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800306// startCall ensures StartCall always returns verror.E.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700307func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800308 if !ctx.Initialized() {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700309 return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700310 }
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700311 ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
312 if err := canCreateServerAuthorizer(ctx, opts); err != nil {
313 return nil, verror.New(verror.ErrBadArg, ctx, err)
Ankur50a5f392015-02-27 18:46:30 -0800314 }
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700315
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700316 deadline := getDeadline(ctx, opts)
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800317
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800318 var lastErr error
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700319 for retries := uint(0); ; retries++ {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800320 call, action, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700321 if err == nil {
322 return call, nil
323 }
324 lastErr = err
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700325 if !shouldRetry(action, deadline, opts) {
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800326 span.Annotatef("Cannot retry after error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700327 break
328 }
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700329 if !backoff(retries, deadline) {
330 break
331 }
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700332 span.Annotatef("Retrying due to error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700333 }
334 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700335}
336
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800337type serverStatus struct {
Ankur50a5f392015-02-27 18:46:30 -0800338 index int
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700339 server, suffix string
Ankur50a5f392015-02-27 18:46:30 -0800340 flow stream.Flow
341 blessings []string // authorized server blessings
342 rejectedBlessings []security.RejectedBlessing // rejected server blessings
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700343 serverErr *verror.SubErr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800344}
345
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700346func suberrName(server, name, method string) string {
347 // In the case the client directly dialed an endpoint we want to avoid printing
348 // the endpoint twice.
349 if server == name {
350 return fmt.Sprintf("%s.%s", server, method)
351 }
352 return fmt.Sprintf("%s:%s.%s", server, name, method)
353}
354
Asim Shankaraae31802015-01-22 11:59:42 -0800355// tryCreateFlow attempts to establish a Flow to "server" (which must be a
356// rooted name), over which a method invocation request could be sent.
Ankur50a5f392015-02-27 18:46:30 -0800357//
358// The server at the remote end of the flow is authorized using the provided
359// authorizer, both during creation of the VC underlying the flow and the
360// flow itself.
Cosmos Nicolaou00a0f802014-11-16 22:44:55 -0800361// TODO(cnicolaou): implement real, configurable load balancing.
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700362func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700363 status := &serverStatus{index: index, server: server}
Asim Shankarf4864f42014-11-25 18:53:05 -0800364 var span vtrace.Span
Asim Shankaraae31802015-01-22 11:59:42 -0800365 ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow")
Asim Shankarf4864f42014-11-25 18:53:05 -0800366 span.Annotatef("address:%v", server)
Asim Shankaraae31802015-01-22 11:59:42 -0800367 defer func() {
368 ch <- status
369 span.Finish()
370 }()
Ankur50a5f392015-02-27 18:46:30 -0800371
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700372 suberr := func(err error) *verror.SubErr {
373 return &verror.SubErr{
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700374 Name: suberrName(server, name, method),
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700375 Err: err,
376 Options: verror.Print,
377 }
378 }
379
Asim Shankaraae31802015-01-22 11:59:42 -0800380 address, suffix := naming.SplitAddressName(server)
381 if len(address) == 0 {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700382 status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
Asim Shankaraae31802015-01-22 11:59:42 -0800383 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800384 }
Ankur50a5f392015-02-27 18:46:30 -0800385 status.suffix = suffix
386
Asim Shankaraae31802015-01-22 11:59:42 -0800387 ep, err := inaming.NewEndpoint(address)
388 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700389 status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
Asim Shankaraae31802015-01-22 11:59:42 -0800390 return
391 }
392 if err = version.CheckCompatibility(ep); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700393 status.serverErr = suberr(verror.New(errIncompatibleEndpoint, ctx))
Asim Shankaraae31802015-01-22 11:59:42 -0800394 return
395 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700396 if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700397 status.serverErr.Name = suberrName(server, name, method)
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700398 vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
Asim Shankaraae31802015-01-22 11:59:42 -0800399 return
400 }
Ankur50a5f392015-02-27 18:46:30 -0800401
402 // Authorize the remote end of the flow using the provided authorizer.
403 if status.flow.LocalPrincipal() == nil {
404 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700405 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800406 return
407 }
408
Ankurd8646812015-03-12 10:48:41 -0700409 seccall := security.NewCall(&security.CallParams{
Ankur50a5f392015-02-27 18:46:30 -0800410 LocalPrincipal: status.flow.LocalPrincipal(),
411 LocalBlessings: status.flow.LocalBlessings(),
412 RemoteBlessings: status.flow.RemoteBlessings(),
413 LocalEndpoint: status.flow.LocalEndpoint(),
414 RemoteEndpoint: status.flow.RemoteEndpoint(),
415 RemoteDischarges: status.flow.RemoteDischarges(),
416 Method: method,
Matt Rosencrantz250558f2015-03-17 11:37:31 -0700417 Suffix: status.suffix,
418 })
Todd Wang4264e4b2015-04-16 22:43:40 -0700419 if err := auth.Authorize(ctx, seccall); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700420 // We will test for errServerAuthorizeFailed in failedTryCall and report
421 // verror.ErrNotTrusted
422 status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
423 vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
Ankur50a5f392015-02-27 18:46:30 -0800424 status.flow.Close()
425 status.flow = nil
426 return
427 }
Todd Wang4264e4b2015-04-16 22:43:40 -0700428 status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
Asim Shankaraae31802015-01-22 11:59:42 -0800429 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800430}
431
Asim Shankaraae31802015-01-22 11:59:42 -0800432// tryCall makes a single attempt at a call. It may connect to multiple servers
433// (all that serve "name"), but will invoke the method on at most one of them
434// (the server running on the most preferred protcol and network amongst all
435// the servers that were successfully connected to and authorized).
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700436func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, verror.ActionCode, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800437 var resolved *naming.MountEntry
Asim Shankaraae31802015-01-22 11:59:42 -0800438 var err error
Asim Shankar263c73b2015-03-19 18:31:26 -0700439 var blessingPattern security.BlessingPattern
440 blessingPattern, name = security.SplitPatternName(name)
David Why Use Two When One Will Do Presotto38788d42015-03-31 17:13:54 -0700441 if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800442 // We always return NoServers as the error so that the caller knows
443 // that's ok to retry the operation since the name may be registered
444 // in the near future.
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700445 switch {
446 case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
Jiri Simsa074bf362015-02-17 09:29:45 -0800447 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name)
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700448 case verror.ErrorID(err) == verror.ErrNoServers.ID:
449 // Avoid wrapping errors unnecessarily.
450 return nil, verror.NoRetry, err
451 default:
452 return nil, verror.NoRetry, verror.New(verror.ErrNoServers, ctx, name, err)
Ryan Brown6153c6c2014-12-11 13:10:09 -0800453 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700454 } else {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800455 if len(resolved.Servers) == 0 {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700456 // This should never happen.
457 return nil, verror.NoRetry, verror.New(verror.ErrInternal, ctx, name)
Suharsh Sivakumar65e44c22014-12-10 17:15:19 -0800458 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800459 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800460 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800461 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700462 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700463 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800464
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700465 // We need to ensure calls to v23 factory methods do not occur during runtime
466 // initialization. Currently, the agent, which uses SecurityNone, is the only caller
467 // during runtime initialization. We would like to set the principal in the context
468 // to nil if we are running in SecurityNone, but this always results in a panic since
469 // the agent client would trigger the call v23.SetPrincipal during runtime
470 // initialization. So, we gate the call to v23.GetPrincipal instead since the agent
471 // client will have callEncrypted == false.
472 // Potential solutions to this are:
473 // (1) Create a separate client for the agent so that this code doesn't have to
474 // account for its use during runtime initialization.
475 // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
476 // on here.
477 var principal security.Principal
478 if callEncrypted(opts) {
Suharsh Sivakumar0ed10c22015-04-06 12:55:55 -0700479 if principal = v23.GetPrincipal(ctx); principal == nil {
480 return nil, verror.NoRetry, verror.New(errNoPrincipal, ctx)
481 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700482 }
483
Asim Shankarb547ea92015-02-17 18:49:45 -0800484 // servers is now ordered by the priority heurestic implemented in
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800485 // filterAndOrderServers.
Asim Shankaraae31802015-01-22 11:59:42 -0800486 //
487 // Try to connect to all servers in parallel. Provide sufficient
488 // buffering for all of the connections to finish instantaneously. This
489 // is important because we want to process the responses in priority
490 // order; that order is indicated by the order of entries in servers.
491 // So, if two respones come in at the same 'instant', we prefer the
492 // first in the resolved.Servers)
493 attempts := len(resolved.Servers)
Ankur50a5f392015-02-27 18:46:30 -0800494
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800495 responses := make([]*serverStatus, attempts)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800496 ch := make(chan *serverStatus, attempts)
Asim Shankaraae31802015-01-22 11:59:42 -0800497 vcOpts := append(getVCOpts(opts), c.vcOpts...)
Asim Shankar263c73b2015-03-19 18:31:26 -0700498 authorizer := newServerAuthorizer(blessingPattern, opts...)
Asim Shankaraae31802015-01-22 11:59:42 -0800499 for i, server := range resolved.Names() {
Asim Shankar263c73b2015-03-19 18:31:26 -0700500 // Create a copy of vcOpts for each call to tryCreateFlow
501 // to avoid concurrent tryCreateFlows from stepping on each
502 // other while manipulating their copy of the options.
Ankur50a5f392015-02-27 18:46:30 -0800503 vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
504 copy(vcOptsCopy, vcOpts)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700505 go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700506 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800507
Todd Wangf6a06882015-02-27 17:38:01 -0800508 var timeoutChan <-chan time.Time
509 if deadline, ok := ctx.Deadline(); ok {
510 timeoutChan = time.After(deadline.Sub(time.Now()))
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800511 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800512
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800513 for {
Todd Wangef05c062014-11-15 09:51:43 -0800514 // Block for at least one new response from the server, or the timeout.
515 select {
516 case r := <-ch:
517 responses[r.index] = r
518 // Read as many more responses as we can without blocking.
519 LoopNonBlocking:
520 for {
521 select {
522 default:
523 break LoopNonBlocking
524 case r := <-ch:
525 responses[r.index] = r
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800526 }
Todd Wangef05c062014-11-15 09:51:43 -0800527 }
528 case <-timeoutChan:
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700529 vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
Asim Shankaraae31802015-01-22 11:59:42 -0800530 _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
Todd Wang8fa38762015-03-25 14:04:59 -0700531 if verror.ErrorID(err) != verror.ErrTimeout.ID {
Jiri Simsa074bf362015-02-17 09:29:45 -0800532 return nil, verror.NoRetry, verror.New(verror.ErrTimeout, ctx, err)
Cosmos Nicolaou38209d42014-12-09 16:50:38 -0800533 }
534 return nil, verror.NoRetry, err
Todd Wangef05c062014-11-15 09:51:43 -0800535 }
536
Ankur50a5f392015-02-27 18:46:30 -0800537 dc := c.dc
538 if shouldNotFetchDischarges(opts) {
539 dc = nil
540 }
Todd Wangef05c062014-11-15 09:51:43 -0800541 // Process new responses, in priority order.
542 numResponses := 0
543 for _, r := range responses {
544 if r != nil {
545 numResponses++
546 }
547 if r == nil || r.flow == nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800548 continue
549 }
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800550
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800551 doneChan := ctx.Done()
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800552 r.flow.SetDeadline(doneChan)
Ankur50a5f392015-02-27 18:46:30 -0800553 fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
554 if err != nil {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700555 return nil, verror.NoRetry, err
Ankur50a5f392015-02-27 18:46:30 -0800556 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800557
Ankurdda16492015-04-07 12:35:42 -0700558 if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700559 r.serverErr = &verror.SubErr{
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700560 Name: suberrName(r.server, name, method),
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700561 Options: verror.Print,
562 Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
563 }
564 vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
Ankur50a5f392015-02-27 18:46:30 -0800565 r.flow.Close()
566 r.flow = nil
567 continue
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800568 }
569
Todd Wangef05c062014-11-15 09:51:43 -0800570 // This is the 'point of no return'; once the RPC is started (fc.start
571 // below) we can't be sure if it makes it to the server or not so, this
572 // code will never call fc.start more than once to ensure that we provide
573 // 'at-most-once' rpc semantics at this level. Retrying the network
574 // connections (i.e. creating flows) is fine since we can cleanup that
575 // state if we abort a call (i.e. close the flow).
576 //
577 // We must ensure that all flows other than r.flow are closed.
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800578 //
579 // TODO(cnicolaou): all errors below are marked as NoRetry
580 // because we want to provide at-most-once rpc semantics so
581 // we only ever attempt an RPC once. In the future, we'll cache
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700582 // responses on the server and then we can retry in-flight
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800583 // RPCs.
Todd Wangef05c062014-11-15 09:51:43 -0800584 go cleanupTryCall(r, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800585
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800586 if doneChan != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800587 go func() {
588 select {
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800589 case <-doneChan:
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800590 vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
Matt Rosencrantz9346b412014-12-18 15:59:19 -0800591 fc.flow.Cancel()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800592 case <-fc.flow.Closed():
593 }
594 }()
595 }
596
Todd Wangf6a06882015-02-27 17:38:01 -0800597 deadline, _ := ctx.Deadline()
Ankur50a5f392015-02-27 18:46:30 -0800598 if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800599 return nil, verror.NoRetry, verr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800600 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800601 return fc, verror.NoRetry, nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800602 }
Todd Wangef05c062014-11-15 09:51:43 -0800603 if numResponses == len(responses) {
Asim Shankaraae31802015-01-22 11:59:42 -0800604 return c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800605 }
606 }
Todd Wangef05c062014-11-15 09:51:43 -0800607}
608
Asim Shankaraae31802015-01-22 11:59:42 -0800609// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
Todd Wangef05c062014-11-15 09:51:43 -0800610// goroutines, and have closed the flow from each one except skip. This is a
611// blocking function; it should be called in its own goroutine.
612func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
613 numPending := 0
614 for _, r := range responses {
615 switch {
616 case r == nil:
617 // The response hasn't arrived yet.
618 numPending++
619 case r == skip || r.flow == nil:
620 // Either we should skip this flow, or we've closed the flow for this
621 // response already; nothing more to do.
622 default:
623 // We received the response, but haven't closed the flow yet.
624 r.flow.Close()
625 }
626 }
627 // Now we just need to wait for the pending responses and close their flows.
628 for i := 0; i < numPending; i++ {
629 if r := <-ch; r.flow != nil {
630 r.flow.Close()
631 }
632 }
633}
634
635// failedTryCall performs asynchronous cleanup for tryCall, and returns an
636// appropriate error from the responses we've already received. All parallel
637// calls in tryCall failed or we timed out if we get here.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700638func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, error) {
Todd Wangef05c062014-11-15 09:51:43 -0800639 go cleanupTryCall(nil, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800640 c.ns.FlushCacheEntry(name)
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700641 suberrs := []verror.SubErr{}
642 topLevelError := verror.ErrNoServers
643 topLevelAction := verror.RetryRefetch
Asim Shankaraae31802015-01-22 11:59:42 -0800644 for _, r := range responses {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700645 if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
646 switch verror.ErrorID(r.serverErr.Err) {
647 case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
648 topLevelError = verror.ErrNotTrusted
649 topLevelAction = verror.NoRetry
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800650 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700651 suberrs = append(suberrs, *r.serverErr)
Todd Wangef05c062014-11-15 09:51:43 -0800652 }
653 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700654
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800655 // TODO(cnicolaou): we get system errors for things like dialing using
656 // the 'ws' protocol which can never succeed even if we retry the connection,
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700657 // hence we return RetryRefetch below except for the case where the servers
658 // are not trusted, in case there's no point in retrying at all.
659 // TODO(cnicolaou): implementing at-most-once rpc semantics in the future
660 // will require thinking through all of the cases where the RPC can
661 // be retried by the client whilst it's actually being executed on the
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700662 // server.
663 return nil, topLevelAction, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700664}
665
Ankur50a5f392015-02-27 18:46:30 -0800666// prepareBlessingsAndDischarges prepares blessings and discharges for
667// the call.
668//
669// This includes: (1) preparing blessings that must be granted to the
670// server, (2) preparing blessings that the client authenticates with,
671// and, (3) preparing any discharges for third-party caveats on the client's
672// blessings.
Ankurdda16492015-04-07 12:35:42 -0700673func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error {
Ankur50a5f392015-02-27 18:46:30 -0800674 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700675 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800676 if fc.flow.LocalPrincipal() == nil {
677 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700678 }
Ankur50a5f392015-02-27 18:46:30 -0800679
Ankur50a5f392015-02-27 18:46:30 -0800680 // Fetch blessings from the client's blessing store that are to be
681 // shared with the server.
682 if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() {
683 // TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings
684 // to the server.
685 return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings)
686 }
687
688 // Fetch any discharges for third-party caveats on the client's blessings.
689 if !fc.blessings.IsZero() && fc.dc != nil {
690 impetus, err := mkDischargeImpetus(fc.server, method, args)
691 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700692 return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err))
Ankur50a5f392015-02-27 18:46:30 -0800693 }
694 fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
695 }
Ankurdda16492015-04-07 12:35:42 -0700696
697 // Prepare blessings that must be granted to the server (using any
698 // rpc.Granter implementation in 'opts').
699 //
Todd Wang4264e4b2015-04-16 22:43:40 -0700700 // NOTE(ataly, suharshs): Before invoking the granter, we set the parameters
701 // of the current call. The user can now retrieve the principal via
702 // v23.GetPrincipal(ctx), or via call.LocalPrincipal(). While in theory the
703 // two principals can be different, the flow.LocalPrincipal == nil check at
704 // the beginning of this method ensures that the two are the same and non-nil
705 // at this point in the code.
Ankurdda16492015-04-07 12:35:42 -0700706 ldischargeMap := make(map[string]security.Discharge)
707 for _, d := range fc.discharges {
708 ldischargeMap[d.ID()] = d
709 }
710 seccall := security.NewCall(&security.CallParams{
711 LocalPrincipal: fc.flow.LocalPrincipal(),
712 LocalBlessings: fc.blessings,
713 RemoteBlessings: fc.flow.RemoteBlessings(),
714 LocalEndpoint: fc.flow.LocalEndpoint(),
715 RemoteEndpoint: fc.flow.RemoteEndpoint(),
716 LocalDischarges: ldischargeMap,
717 RemoteDischarges: fc.flow.RemoteDischarges(),
718 Method: method,
719 Suffix: suffix,
720 })
Todd Wang4264e4b2015-04-16 22:43:40 -0700721 if err := fc.prepareGrantedBlessings(ctx, seccall, opts); err != nil {
Ankurdda16492015-04-07 12:35:42 -0700722 return err
723 }
Ankur50a5f392015-02-27 18:46:30 -0800724 return nil
725}
726
Todd Wang4264e4b2015-04-16 22:43:40 -0700727func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, call security.Call, opts []rpc.CallOpt) error {
Asim Shankarb54d7642014-06-05 13:08:04 -0700728 for _, o := range opts {
729 switch v := o.(type) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700730 case rpc.Granter:
Todd Wang4264e4b2015-04-16 22:43:40 -0700731 if b, err := v.Grant(ctx, call); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700732 return verror.New(errBlessingGrant, fc.ctx, err)
Ankur50a5f392015-02-27 18:46:30 -0800733 } else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700734 return verror.New(errBlessingAdd, fc.ctx, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700735 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700736 }
737 }
Ankur50a5f392015-02-27 18:46:30 -0800738 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700739}
740
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700741func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700742 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700743 c.vcMapMu.Lock()
744 for _, v := range c.vcMap {
745 c.streamMgr.ShutdownEndpoint(v.remoteEP)
746 }
747 c.vcMap = nil
748 c.vcMapMu.Unlock()
749}
750
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700751// flowClient implements the RPC client-side protocol for a single RPC, over a
752// flow that's already connected to the server.
753type flowClient struct {
Todd Wang3425a902015-01-21 18:43:59 -0800754 ctx *context.T // context to annotate with call details
755 dec *vom.Decoder // to decode responses and results from the server
756 enc *vom.Encoder // to encode requests and args to the server
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700757 server []string // Blessings bound to the server that authorize it to receive the RPC request from the client.
Todd Wang3425a902015-01-21 18:43:59 -0800758 flow stream.Flow // the underlying flow
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700759 response rpc.Response // each decoded response message is kept here
Asim Shankar1707e432014-05-29 19:42:41 -0700760
Ankure49a86a2014-11-11 18:52:43 -0800761 discharges []security.Discharge // discharges used for this request
762 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700763
Ankur50a5f392015-02-27 18:46:30 -0800764 blessings security.Blessings // the local blessings for the current RPC.
765 grantedBlessings security.Blessings // the blessings granted to the server.
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800766
Asim Shankar1707e432014-05-29 19:42:41 -0700767 sendClosedMu sync.Mutex
768 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800769 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700770}
771
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700772var _ rpc.ClientCall = (*flowClient)(nil)
773var _ rpc.Stream = (*flowClient)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700774
Ankur50a5f392015-02-27 18:46:30 -0800775func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800776 fc := &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800777 ctx: ctx,
Ankure49a86a2014-11-11 18:52:43 -0800778 flow: flow,
Ankur50a5f392015-02-27 18:46:30 -0800779 server: server,
Ankure49a86a2014-11-11 18:52:43 -0800780 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700781 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800782 var err error
Jungho Ahn60408fa2015-03-27 15:28:22 -0700783 typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
784 if typeenc == nil {
785 if fc.enc, err = vom.NewEncoder(flow); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700786 // In practice, this will never fail because of a networking
787 // problem since the encoder writes the 'magic byte' which
788 // will be buffered and not written to the network immediately.
789 berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
Jungho Ahn60408fa2015-03-27 15:28:22 -0700790 return nil, fc.close(berr)
791 }
792 if fc.dec, err = vom.NewDecoder(flow); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700793 berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
Jungho Ahn60408fa2015-03-27 15:28:22 -0700794 return nil, fc.close(berr)
795 }
796 } else {
797 if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700798 berr := verror.AddSubErrs(verror.New(errVomEncoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
Jungho Ahn60408fa2015-03-27 15:28:22 -0700799 return nil, fc.close(berr)
800 }
801 typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
802 if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700803 berr := verror.AddSubErrs(verror.New(errVomDecoder, fc.ctx), fc.ctx, verror.SubErr{Err: err})
Jungho Ahn60408fa2015-03-27 15:28:22 -0700804 return nil, fc.close(berr)
805 }
Todd Wang34ed4c62014-11-26 15:15:52 -0800806 }
807 return fc, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700808}
809
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800810func (fc *flowClient) close(err error) error {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700811 subErr := verror.SubErr{Err: err, Options: verror.Print}
812 subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800813 if cerr := fc.flow.Close(); cerr != nil && err == nil {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700814 return verror.New(verror.ErrInternal, fc.ctx, subErr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700815 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700816 switch verror.ErrorID(err) {
817 case verror.ErrBadProtocol.ID, errRequestEncoding.ID, errArgEncoding.ID:
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800818 switch fc.ctx.Err() {
819 case context.DeadlineExceeded:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700820 timeout := verror.New(verror.ErrTimeout, fc.ctx)
821 err := verror.AddSubErrs(timeout, fc.ctx, subErr)
822 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800823 case context.Canceled:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700824 canceled := verror.New(verror.ErrCanceled, fc.ctx)
825 err := verror.AddSubErrs(canceled, fc.ctx, subErr)
826 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800827 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700828 case errVomEncoder.ID, errVomDecoder.ID:
829 badProtocol := verror.New(verror.ErrBadProtocol, fc.ctx)
830 err = verror.AddSubErrs(badProtocol, fc.ctx, subErr)
831 case verror.ErrTimeout.ID:
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800832 // Canceled trumps timeout.
833 if fc.ctx.Err() == context.Canceled {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700834 return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr)
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800835 }
836 }
837 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838}
839
Ankur50a5f392015-02-27 18:46:30 -0800840func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800841 // Encode the Blessings information for the client to authorize the flow.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700842 var blessingsRequest rpc.BlessingsRequest
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800843 if fc.flow.LocalPrincipal() != nil {
Jungho Ahn44d8daf2015-01-16 10:39:15 -0800844 blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800845 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700846 req := rpc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700847 Suffix: suffix,
848 Method: method,
849 NumPosArgs: uint64(len(args)),
Todd Wangf6a06882015-02-27 17:38:01 -0800850 Deadline: vtime.Deadline{deadline},
Asim Shankarb07ec692015-02-27 23:40:44 -0800851 GrantedBlessings: fc.grantedBlessings,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800852 Blessings: blessingsRequest,
Asim Shankar08642822015-03-02 21:21:09 -0800853 Discharges: fc.discharges,
Matt Rosencrantz2803fe92015-03-09 15:26:32 -0700854 TraceRequest: vtrace.GetRequest(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700855 }
856 if err := fc.enc.Encode(req); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700857 berr := verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800858 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700859 }
860 for ix, arg := range args {
861 if err := fc.enc.Encode(arg); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700862 berr := verror.New(errArgEncoding, fc.ctx, ix, err)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800863 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700864 }
865 }
866 return nil
867}
868
869func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700870 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700871 if fc.sendClosed {
Jiri Simsa074bf362015-02-17 09:29:45 -0800872 return verror.New(verror.ErrAborted, fc.ctx)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700873 }
874
875 // The empty request header indicates what follows is a streaming arg.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700876 if err := fc.enc.Encode(rpc.Request{}); err != nil {
877 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800878 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700879 }
880 if err := fc.enc.Encode(item); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800881 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, -1, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800882 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700883 }
884 return nil
885}
886
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800887func decodeNetError(ctx *context.T, err error) verror.IDAction {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800888 if neterr, ok := err.(net.Error); ok {
889 if neterr.Timeout() || neterr.Temporary() {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800890 // If a read is canceled in the lower levels we see
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800891 // a timeout error - see readLocked in vc/reader.go
892 if ctx.Err() == context.Canceled {
Jiri Simsa074bf362015-02-17 09:29:45 -0800893 return verror.ErrCanceled
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800894 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800895 return verror.ErrTimeout
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800896 }
897 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800898 return verror.ErrBadProtocol
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800899}
900
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700901func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700902 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700903 switch {
904 case fc.response.Error != nil:
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800905 // TODO(cnicolaou): this will become a verror.E when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800906 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800907 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700908 case fc.response.EndStreamResults:
909 return io.EOF
910 }
911
912 // Decode the response header and handle errors and EOF.
913 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800914 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800915 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700916 }
917 if fc.response.Error != nil {
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800918 // TODO(cnicolaou): this will become a verror.E when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800919 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800920 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700921 }
922 if fc.response.EndStreamResults {
923 // Return EOF to indicate to the caller that there are no more stream
924 // results. Any error sent by the server is kept in fc.response.Error, and
925 // returned to the user in Finish.
926 return io.EOF
927 }
928 // Decode the streaming result.
929 if err := fc.dec.Decode(itemptr); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800930 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800931 // TODO(cnicolaou): should we be caching this?
932 fc.response.Error = berr
933 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700934 }
935 return nil
936}
937
938func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700939 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700940 return fc.closeSend()
941}
942
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800943// closeSend ensures CloseSend always returns verror.E.
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800944func (fc *flowClient) closeSend() error {
Asim Shankar1707e432014-05-29 19:42:41 -0700945 fc.sendClosedMu.Lock()
946 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700947 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700948 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700949 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700950 if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700951 // TODO(caprita): Indiscriminately closing the flow below causes
952 // a race as described in:
953 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
954 //
955 // There should be a finer grained way to fix this (for example,
956 // encoding errors should probably still result in closing the
957 // flow); on the flip side, there may exist other instances
958 // where we are closing the flow but should not.
959 //
960 // For now, commenting out the line below removes the flakiness
961 // from our existing unit tests, but this needs to be revisited
962 // and fixed correctly.
963 //
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700964 // return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700965 }
966 fc.sendClosed = true
967 return nil
968}
969
970func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700971 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700972 err := fc.finish(resultptrs...)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800973 vtrace.GetSpan(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700974 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700975}
976
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800977// finish ensures Finish always returns a verror.E.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800978func (fc *flowClient) finish(resultptrs ...interface{}) error {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700979 if fc.finished {
Todd Wangff73e1f2015-02-10 21:45:52 -0800980 err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
Jiri Simsa074bf362015-02-17 09:29:45 -0800981 return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700982 }
983 fc.finished = true
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800984
Todd Wangce3033b2014-05-23 17:04:44 -0700985 // Call closeSend implicitly, if the user hasn't already called it. There are
986 // three cases:
987 // 1) Server is blocked on Recv waiting for the final request message.
988 // 2) Server has already finished processing, the final response message and
989 // out args are queued up on the client, and the flow is closed.
990 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
991 // response and args aren't queued up yet, and the flow isn't closed.
992 //
993 // We must call closeSend to handle case (1) and unblock the server; otherwise
994 // we'll deadlock with both client and server waiting for each other. We must
995 // ignore the error (if any) to handle case (2). In that case the flow is
996 // closed, meaning writes will fail and reads will succeed, and closeSend will
997 // always return an error. But this isn't a "real" error; the client should
998 // read the rest of the results and succeed.
999 _ = fc.closeSend()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001000 // Decode the response header, if it hasn't already been decoded by Recv.
1001 if fc.response.Error == nil && !fc.response.EndStreamResults {
1002 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -08001003 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001004 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001005 }
1006 // The response header must indicate the streaming results have ended.
1007 if fc.response.Error == nil && !fc.response.EndStreamResults {
Jiri Simsa074bf362015-02-17 09:29:45 -08001008 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRemainingStreamResults, fc.ctx))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001009 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001010 }
1011 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001012 if fc.response.AckBlessings {
1013 clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
1014 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001015 // Incorporate any VTrace info that was returned.
Matt Rosencrantz2803fe92015-03-09 15:26:32 -07001016 vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001017 if fc.response.Error != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -07001018 id := verror.ErrorID(fc.response.Error)
1019 if id == verror.ErrNoAccess.ID && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001020 // In case the error was caused by a bad discharge, we do not want to get stuck
1021 // with retrying again and again with this discharge. As there is no direct way
1022 // to detect it, we conservatively flush all discharges we used from the cache.
1023 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Asim Shankar77befba2015-01-09 12:49:04 -08001024 vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Ankure49a86a2014-11-11 18:52:43 -08001025 fc.dc.Invalidate(fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001026 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -07001027 if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
1028 return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
1029 }
Jiri Simsa074bf362015-02-17 09:29:45 -08001030 return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001031 }
1032 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Jiri Simsa074bf362015-02-17 09:29:45 -08001033 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001034 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001035 }
1036 for ix, r := range resultptrs {
1037 if err := fc.dec.Decode(r); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -08001038 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001039 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001040 }
1041 }
1042 return fc.close(nil)
1043}
1044
Asim Shankar2d731a92014-09-29 17:46:38 -07001045func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -07001046 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -07001047}
Ankur50a5f392015-02-27 18:46:30 -08001048
1049func bpatterns(patterns []string) []security.BlessingPattern {
1050 if patterns == nil {
1051 return nil
1052 }
1053 bpatterns := make([]security.BlessingPattern, len(patterns))
1054 for i, p := range patterns {
1055 bpatterns[i] = security.BlessingPattern(p)
1056 }
1057 return bpatterns
1058}