blob: a7f427f692c45bcd03c023fa6e76de85172fd027 [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07005package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006
7import (
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "io"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -070010 "math/rand"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080011 "net"
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -080012 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070013 "sync"
14 "time"
15
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -070016 "v.io/v23"
Jiri Simsa6ac95222015-02-23 16:11:49 -080017 "v.io/v23/context"
18 "v.io/v23/i18n"
Todd Wang5082a552015-04-02 10:56:11 -070019 "v.io/v23/namespace"
Jiri Simsa6ac95222015-02-23 16:11:49 -080020 "v.io/v23/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070021 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080022 "v.io/v23/security"
23 "v.io/v23/vdl"
Todd Wangf6a06882015-02-27 17:38:01 -080024 vtime "v.io/v23/vdlroot/time"
Jiri Simsa6ac95222015-02-23 16:11:49 -080025 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080026 "v.io/v23/vom"
27 "v.io/v23/vtrace"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080028
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -070029 "v.io/x/ref/lib/apilog"
Suharsh Sivakumardcc11d72015-05-11 12:19:20 -070030 inaming "v.io/x/ref/runtime/internal/naming"
31 "v.io/x/ref/runtime/internal/rpc/stream"
32 "v.io/x/ref/runtime/internal/rpc/stream/vc"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070033)
34
Suharsh Sivakumardcc11d72015-05-11 12:19:20 -070035const pkgPath = "v.io/x/ref/runtime/internal/rpc"
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080036
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070037func reg(id, msg string) verror.IDAction {
38 // Note: the error action is never used and is instead computed
39 // at a higher level. The errors here are purely for informational
40 // purposes.
41 return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
42}
43
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044var (
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070045 // These errors are intended to be used as arguments to higher
46 // level errors and hence {1}{2} is omitted from their format
47 // strings to avoid repeating these n-times in the final error
48 // message visible to the user.
49 errClientCloseAlreadyCalled = reg(".errCloseAlreadyCalled", "rpc.Client.Close has already been called")
50 errClientFinishAlreadyCalled = reg(".errFinishAlreadyCalled", "rpc.ClientCall.Finish has already been called")
51 errNonRootedName = reg(".errNonRootedName", "{3} does not appear to contain an address")
52 errInvalidEndpoint = reg(".errInvalidEndpoint", "failed to parse endpoint")
53 errIncompatibleEndpoint = reg(".errIncompatibleEndpoint", "incompatible endpoint")
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070054 errRequestEncoding = reg(".errRequestEncoding", "failed to encode request {3}{:4}")
55 errDischargeEncoding = reg(".errDischargeEncoding", "failed to encode discharges {:3}")
56 errBlessingEncoding = reg(".errBlessingEncoding", "failed to encode blessing {3}{:4}")
57 errArgEncoding = reg(".errArgEncoding", "failed to encode arg #{3}{:4:}")
58 errMismatchedResults = reg(".errMismatchedResults", "got {3} results, but want {4}")
59 errResultDecoding = reg(".errResultDecoding", "failed to decode result #{3}{:4}")
60 errResponseDecoding = reg(".errResponseDecoding", "failed to decode response{:3}")
61 errRemainingStreamResults = reg(".errRemaingStreamResults", "stream closed with remaining stream results")
62 errNoBlessingsForPeer = reg(".errNoBlessingsForPeer", "no blessings tagged for peer {3}{:4}")
63 errBlessingGrant = reg(".errBlessingGrant", "failed to grant blessing to server with blessings{:3}")
64 errBlessingAdd = reg(".errBlessingAdd", "failed to add blessing granted to server{:3}")
Suharsh Sivakumar4c7e0b72015-04-24 16:57:13 -070065 errServerAuthorizeFailed = reg(".errServerAuthorizedFailed", "failed to authorize flow with remote blessings{:3} {:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080066
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070067 errPrepareBlessingsAndDischarges = reg(".prepareBlessingsAndDischarges", "failed to prepare blessings and discharges: remote blessings{:3} {:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080068
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070069 errDischargeImpetus = reg(".errDischargeImpetus", "couldn't make discharge impetus{:3}")
70 errNoPrincipal = reg(".errNoPrincipal", "principal required for secure connections")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070071)
72
73type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080074 streamMgr stream.Manager
Todd Wang5082a552015-04-02 10:56:11 -070075 ns namespace.T
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080076 vcOpts []stream.VCOpt // vc opts passed to dial
77 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070078
Jungho Ahn25545d32015-01-26 15:14:14 -080079 // We cache the IP networks on the device since it is not that cheap to read
80 // network interfaces through os syscall.
Cosmos Nicolaou70205072015-07-10 11:39:55 -070081 // TODO(toddw): this can be removed since netstate now implements caching
82 // directly.
Jungho Ahn25545d32015-01-26 15:14:14 -080083 ipNets []*net.IPNet
84
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -070085 vcCache *vc.VCCache
Andres Erbsenb7f95f32014-07-07 12:07:56 -070086
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -070087 wg sync.WaitGroup
88 mu sync.Mutex
89 closed bool
90
Ankure49a86a2014-11-11 18:52:43 -080091 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092}
93
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070094var _ rpc.Client = (*client)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070095
Matt Rosencrantzaf3c76f2015-09-19 17:17:32 -070096func DeprecatedNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070097 c := &client{
Ankure49a86a2014-11-11 18:52:43 -080098 streamMgr: streamMgr,
99 ns: ns,
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -0700100 vcCache: vc.NewVCCache(),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700101 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700102 ipNets, err := ipNetworks()
103 if err != nil {
104 return nil, err
105 }
106 c.ipNets = ipNets
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800107 c.dc = InternalNewDischargeClient(nil, c, 0)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700108 for _, opt := range opts {
109 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800110 switch v := opt.(type) {
111 case stream.VCOpt:
112 c.vcOpts = append(c.vcOpts, v)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800113 case PreferredProtocols:
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800114 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700116 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800117
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700118 return c, nil
119}
120
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700121func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700122 suberr := func(err error) *verror.SubErr {
123 return &verror.SubErr{Err: err, Options: verror.Print}
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800124 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700125
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700126 found, err := c.vcCache.ReservedFind(ep, principal)
127 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700128 return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
129 }
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700130 defer c.vcCache.Unreserve(ep, principal)
131 if found != nil {
132 // We are serializing the creation of all flows per VC. This is okay
133 // because if one flow creation is to block, it is likely that all others
134 // for that VC would block as well.
135 if flow, err := found.Connect(); err == nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700136 return flow, nil
137 }
138 // If the vc fails to establish a new flow, we assume it's
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700139 // broken, remove it from the cache, and proceed to establishing
Bogdan Caprita783f7792014-05-15 09:29:17 -0700140 // a new vc.
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700141 //
142 // TODO(suharshs): The decision to redial 1 time when the dialing the vc
143 // in the cache fails is a bit inconsistent with the behavior when a newly
144 // dialed vc.Connect fails. We should revisit this.
145 //
Bogdan Caprita783f7792014-05-15 09:29:17 -0700146 // TODO(caprita): Should we distinguish errors due to vc being
147 // closed from other errors? If not, should we call vc.Close()
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700148 // before removing the vc from the cache?
149 if err := c.vcCache.Delete(found); err != nil {
150 return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
151 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700152 }
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700153
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800154 sm := c.streamMgr
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700155 v, err := sm.Dial(ctx, ep, vcOpts...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700156 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700157 return nil, suberr(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700158 }
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700159
160 flow, err := v.Connect()
161 if err != nil {
162 return nil, suberr(err)
163 }
164
165 if err := c.vcCache.Insert(v.(*vc.VC)); err != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800166 sm.ShutdownEndpoint(ep)
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700167 return nil, suberr(verror.New(errClientCloseAlreadyCalled, ctx))
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800168 }
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700169
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700170 return flow, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700171}
172
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700173// A randomized exponential backoff. The randomness deters error convoys
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700174// from forming. The first time you retry n should be 0, then 1 etc.
175func backoff(n uint, deadline time.Time) bool {
176 // This is ((100 to 200) * 2^n) ms.
177 b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700178 if b > maxBackoff {
179 b = maxBackoff
180 }
181 r := deadline.Sub(time.Now())
182 if b > r {
183 // We need to leave a little time for the call to start or
184 // we'll just timeout in startCall before we actually do
185 // anything. If we just have a millisecond left, give up.
186 if r <= time.Millisecond {
187 return false
188 }
189 b = r - time.Millisecond
190 }
191 time.Sleep(b)
192 return true
193}
194
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700195func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700196 defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,args=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800197 return c.startCall(ctx, name, method, args, opts)
198}
199
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700200func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700201 defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700202 deadline := getDeadline(ctx, opts)
203
204 var lastErr error
205 for retries := uint(0); ; retries++ {
206 call, err := c.startCall(ctx, name, method, inArgs, opts)
207 if err != nil {
208 return err
209 }
210 err = call.Finish(outArgs...)
211 if err == nil {
212 return nil
213 }
214 lastErr = err
215 // We only retry if RetryBackoff is returned by the application because other
216 // RetryConnection and RetryRefetch required actions by the client before
217 // retrying.
218 if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700219 ctx.VI(4).Infof("Cannot retry after error: %s", lastErr)
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700220 break
221 }
222 if !backoff(retries, deadline) {
223 break
224 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700225 ctx.VI(4).Infof("Retrying due to error: %s", lastErr)
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700226 }
227 return lastErr
228}
229
230func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
231 // Context specified deadline.
232 deadline, hasDeadline := ctx.Deadline()
233 if !hasDeadline {
234 // Default deadline.
235 deadline = time.Now().Add(defaultCallTimeout)
236 }
237 if r, ok := getRetryTimeoutOpt(opts); ok {
238 // Caller specified deadline.
239 deadline = time.Now().Add(r)
240 }
241 return deadline
242}
243
244func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
245 switch {
246 case noRetry(opts):
247 return false
248 case action != verror.RetryBackoff:
249 return false
250 case time.Now().After(deadline):
251 return false
252 }
253 return true
254}
255
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700256func shouldRetry(action verror.ActionCode, requireResolve bool, deadline time.Time, opts []rpc.CallOpt) bool {
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700257 switch {
258 case noRetry(opts):
259 return false
260 case action != verror.RetryConnection && action != verror.RetryRefetch:
261 return false
262 case time.Now().After(deadline):
263 return false
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700264 case requireResolve && getNoNamespaceOpt(opts):
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700265 // If we're skipping resolution and there are no servers for
266 // this call retrying is not going to help, we can't come up
267 // with new servers if there is no resolution.
268 return false
269 }
270 return true
271}
272
Todd Wangb31da592015-02-20 12:50:39 -0800273func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) {
Ankure49a86a2014-11-11 18:52:43 -0800274 var impetus security.DischargeImpetus
275 if len(serverBlessings) > 0 {
276 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
277 for i, b := range serverBlessings {
278 impetus.Server[i] = security.BlessingPattern(b)
279 }
280 }
281 impetus.Method = method
282 if len(args) > 0 {
Todd Wangb31da592015-02-20 12:50:39 -0800283 impetus.Arguments = make([]*vdl.Value, len(args))
Ankure49a86a2014-11-11 18:52:43 -0800284 for i, a := range args {
Todd Wangb31da592015-02-20 12:50:39 -0800285 vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a))
286 if err != nil {
287 return security.DischargeImpetus{}, err
288 }
289 impetus.Arguments[i] = vArg
Ankure49a86a2014-11-11 18:52:43 -0800290 }
291 }
Todd Wangb31da592015-02-20 12:50:39 -0800292 return impetus, nil
Ankure49a86a2014-11-11 18:52:43 -0800293}
294
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800295// startCall ensures StartCall always returns verror.E.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700296func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800297 if !ctx.Initialized() {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700298 return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700299 }
Todd Wangad492042015-04-17 15:58:40 -0700300 ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700301 if err := canCreateServerAuthorizer(ctx, opts); err != nil {
302 return nil, verror.New(verror.ErrBadArg, ctx, err)
Ankur50a5f392015-02-27 18:46:30 -0800303 }
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700304
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700305 deadline := getDeadline(ctx, opts)
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800306
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800307 var lastErr error
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700308 for retries := uint(0); ; retries++ {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700309 call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700310 if err == nil {
311 return call, nil
312 }
313 lastErr = err
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700314 if !shouldRetry(action, requireResolve, deadline, opts) {
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800315 span.Annotatef("Cannot retry after error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700316 break
317 }
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700318 if !backoff(retries, deadline) {
319 break
320 }
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700321 span.Annotatef("Retrying due to error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700322 }
323 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700324}
325
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800326type serverStatus struct {
Ankur50a5f392015-02-27 18:46:30 -0800327 index int
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700328 server, suffix string
Ankur50a5f392015-02-27 18:46:30 -0800329 flow stream.Flow
330 blessings []string // authorized server blessings
331 rejectedBlessings []security.RejectedBlessing // rejected server blessings
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700332 serverErr *verror.SubErr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800333}
334
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700335func suberrName(server, name, method string) string {
336 // In the case the client directly dialed an endpoint we want to avoid printing
337 // the endpoint twice.
338 if server == name {
339 return fmt.Sprintf("%s.%s", server, method)
340 }
341 return fmt.Sprintf("%s:%s.%s", server, name, method)
342}
343
Asim Shankaraae31802015-01-22 11:59:42 -0800344// tryCreateFlow attempts to establish a Flow to "server" (which must be a
345// rooted name), over which a method invocation request could be sent.
Ankur50a5f392015-02-27 18:46:30 -0800346//
347// The server at the remote end of the flow is authorized using the provided
348// authorizer, both during creation of the VC underlying the flow and the
349// flow itself.
Cosmos Nicolaou00a0f802014-11-16 22:44:55 -0800350// TODO(cnicolaou): implement real, configurable load balancing.
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700351func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -0700352 defer c.wg.Done()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700353 status := &serverStatus{index: index, server: server}
Asim Shankarf4864f42014-11-25 18:53:05 -0800354 var span vtrace.Span
Todd Wangad492042015-04-17 15:58:40 -0700355 ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
Asim Shankarf4864f42014-11-25 18:53:05 -0800356 span.Annotatef("address:%v", server)
Asim Shankaraae31802015-01-22 11:59:42 -0800357 defer func() {
358 ch <- status
359 span.Finish()
360 }()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700361 suberr := func(err error) *verror.SubErr {
362 return &verror.SubErr{
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700363 Name: suberrName(server, name, method),
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700364 Err: err,
365 Options: verror.Print,
366 }
367 }
368
Asim Shankaraae31802015-01-22 11:59:42 -0800369 address, suffix := naming.SplitAddressName(server)
370 if len(address) == 0 {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700371 status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
Asim Shankaraae31802015-01-22 11:59:42 -0800372 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800373 }
Ankur50a5f392015-02-27 18:46:30 -0800374 status.suffix = suffix
375
Asim Shankaraae31802015-01-22 11:59:42 -0800376 ep, err := inaming.NewEndpoint(address)
377 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700378 status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
Asim Shankaraae31802015-01-22 11:59:42 -0800379 return
380 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700381 if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700382 status.serverErr.Name = suberrName(server, name, method)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700383 ctx.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
Asim Shankaraae31802015-01-22 11:59:42 -0800384 return
385 }
Ankur50a5f392015-02-27 18:46:30 -0800386
387 // Authorize the remote end of the flow using the provided authorizer.
388 if status.flow.LocalPrincipal() == nil {
389 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700390 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800391 return
392 }
393
Ankurd8646812015-03-12 10:48:41 -0700394 seccall := security.NewCall(&security.CallParams{
Ankur50a5f392015-02-27 18:46:30 -0800395 LocalPrincipal: status.flow.LocalPrincipal(),
396 LocalBlessings: status.flow.LocalBlessings(),
397 RemoteBlessings: status.flow.RemoteBlessings(),
398 LocalEndpoint: status.flow.LocalEndpoint(),
399 RemoteEndpoint: status.flow.RemoteEndpoint(),
400 RemoteDischarges: status.flow.RemoteDischarges(),
401 Method: method,
Matt Rosencrantz250558f2015-03-17 11:37:31 -0700402 Suffix: status.suffix,
403 })
Todd Wang4264e4b2015-04-16 22:43:40 -0700404 if err := auth.Authorize(ctx, seccall); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700405 // We will test for errServerAuthorizeFailed in failedTryCall and report
406 // verror.ErrNotTrusted
407 status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700408 ctx.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
Ankur50a5f392015-02-27 18:46:30 -0800409 status.flow.Close()
410 status.flow = nil
411 return
412 }
Todd Wang4264e4b2015-04-16 22:43:40 -0700413 status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
Asim Shankaraae31802015-01-22 11:59:42 -0800414 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800415}
416
Asim Shankaraae31802015-01-22 11:59:42 -0800417// tryCall makes a single attempt at a call. It may connect to multiple servers
418// (all that serve "name"), but will invoke the method on at most one of them
419// (the server running on the most preferred protcol and network amongst all
420// the servers that were successfully connected to and authorized).
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700421// if requireResolve is true on return, then we shouldn't bother retrying unless
422// you can re-resolve.
423func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800424 var resolved *naming.MountEntry
Asim Shankar263c73b2015-03-19 18:31:26 -0700425 var blessingPattern security.BlessingPattern
426 blessingPattern, name = security.SplitPatternName(name)
David Why Use Two When One Will Do Presotto38788d42015-03-31 17:13:54 -0700427 if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800428 // We always return NoServers as the error so that the caller knows
429 // that's ok to retry the operation since the name may be registered
430 // in the near future.
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700431 switch {
432 case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700433 return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700434 case verror.ErrorID(err) == verror.ErrNoServers.ID:
435 // Avoid wrapping errors unnecessarily.
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700436 return nil, verror.NoRetry, false, err
Suharsh Sivakumar0b49a402015-08-13 22:03:21 -0700437 case verror.ErrorID(err) == verror.ErrTimeout.ID:
438 // If the call timed out we actually want to propagate that error.
439 return nil, verror.NoRetry, false, err
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700440 default:
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700441 return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
Ryan Brown6153c6c2014-12-11 13:10:09 -0800442 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700443 } else {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800444 if len(resolved.Servers) == 0 {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700445 // This should never happen.
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700446 return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
Suharsh Sivakumar65e44c22014-12-10 17:15:19 -0800447 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800448 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800449 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700450 return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700451 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700452 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800453
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700454 // We need to ensure calls to v23 factory methods do not occur during runtime
455 // initialization. Currently, the agent, which uses SecurityNone, is the only caller
456 // during runtime initialization. We would like to set the principal in the context
457 // to nil if we are running in SecurityNone, but this always results in a panic since
Todd Wangad492042015-04-17 15:58:40 -0700458 // the agent client would trigger the call v23.WithPrincipal during runtime
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700459 // initialization. So, we gate the call to v23.GetPrincipal instead since the agent
460 // client will have callEncrypted == false.
461 // Potential solutions to this are:
462 // (1) Create a separate client for the agent so that this code doesn't have to
463 // account for its use during runtime initialization.
464 // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
465 // on here.
466 var principal security.Principal
467 if callEncrypted(opts) {
Suharsh Sivakumar0ed10c22015-04-06 12:55:55 -0700468 if principal = v23.GetPrincipal(ctx); principal == nil {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700469 return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
Suharsh Sivakumar0ed10c22015-04-06 12:55:55 -0700470 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700471 }
472
Asim Shankarb547ea92015-02-17 18:49:45 -0800473 // servers is now ordered by the priority heurestic implemented in
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800474 // filterAndOrderServers.
Asim Shankaraae31802015-01-22 11:59:42 -0800475 //
476 // Try to connect to all servers in parallel. Provide sufficient
477 // buffering for all of the connections to finish instantaneously. This
478 // is important because we want to process the responses in priority
479 // order; that order is indicated by the order of entries in servers.
480 // So, if two respones come in at the same 'instant', we prefer the
481 // first in the resolved.Servers)
482 attempts := len(resolved.Servers)
Ankur50a5f392015-02-27 18:46:30 -0800483
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800484 responses := make([]*serverStatus, attempts)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800485 ch := make(chan *serverStatus, attempts)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700486 vcOpts := append(translateVCOpts(opts), c.vcOpts...)
Asim Shankar263c73b2015-03-19 18:31:26 -0700487 authorizer := newServerAuthorizer(blessingPattern, opts...)
Asim Shankaraae31802015-01-22 11:59:42 -0800488 for i, server := range resolved.Names() {
Asim Shankar263c73b2015-03-19 18:31:26 -0700489 // Create a copy of vcOpts for each call to tryCreateFlow
490 // to avoid concurrent tryCreateFlows from stepping on each
491 // other while manipulating their copy of the options.
Ankur50a5f392015-02-27 18:46:30 -0800492 vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
493 copy(vcOptsCopy, vcOpts)
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -0700494 c.mu.Lock()
495 if c.closed {
496 c.mu.Unlock()
497 return nil, verror.NoRetry, false, verror.New(errClientCloseAlreadyCalled, ctx)
498 }
499 c.wg.Add(1)
500 c.mu.Unlock()
501
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700502 go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700503 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800504
Todd Wangf6a06882015-02-27 17:38:01 -0800505 var timeoutChan <-chan time.Time
506 if deadline, ok := ctx.Deadline(); ok {
507 timeoutChan = time.After(deadline.Sub(time.Now()))
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800508 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800509
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800510 for {
Todd Wangef05c062014-11-15 09:51:43 -0800511 // Block for at least one new response from the server, or the timeout.
512 select {
513 case r := <-ch:
514 responses[r.index] = r
515 // Read as many more responses as we can without blocking.
516 LoopNonBlocking:
517 for {
518 select {
519 default:
520 break LoopNonBlocking
521 case r := <-ch:
522 responses[r.index] = r
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800523 }
Todd Wangef05c062014-11-15 09:51:43 -0800524 }
525 case <-timeoutChan:
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700526 ctx.VI(2).Infof("rpc: timeout on connection to server %v ", name)
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700527 _, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
Todd Wang8fa38762015-03-25 14:04:59 -0700528 if verror.ErrorID(err) != verror.ErrTimeout.ID {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700529 return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
Cosmos Nicolaou38209d42014-12-09 16:50:38 -0800530 }
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700531 return nil, verror.NoRetry, false, err
Todd Wangef05c062014-11-15 09:51:43 -0800532 }
533
Ankur50a5f392015-02-27 18:46:30 -0800534 dc := c.dc
535 if shouldNotFetchDischarges(opts) {
536 dc = nil
537 }
Todd Wangef05c062014-11-15 09:51:43 -0800538 // Process new responses, in priority order.
539 numResponses := 0
540 for _, r := range responses {
541 if r != nil {
542 numResponses++
543 }
544 if r == nil || r.flow == nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800545 continue
546 }
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800547
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800548 doneChan := ctx.Done()
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800549 r.flow.SetDeadline(doneChan)
Ankur50a5f392015-02-27 18:46:30 -0800550 fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
551 if err != nil {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700552 return nil, verror.NoRetry, false, err
Ankur50a5f392015-02-27 18:46:30 -0800553 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800554
Ankurdda16492015-04-07 12:35:42 -0700555 if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700556 r.serverErr = &verror.SubErr{
Suharsh Sivakumar56bc5ee2015-04-16 13:46:11 -0700557 Name: suberrName(r.server, name, method),
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700558 Options: verror.Print,
559 Err: verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
560 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700561 ctx.VI(2).Infof("rpc: err: %s", r.serverErr)
Ankur50a5f392015-02-27 18:46:30 -0800562 r.flow.Close()
563 r.flow = nil
564 continue
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800565 }
566
Todd Wangef05c062014-11-15 09:51:43 -0800567 // This is the 'point of no return'; once the RPC is started (fc.start
568 // below) we can't be sure if it makes it to the server or not so, this
569 // code will never call fc.start more than once to ensure that we provide
570 // 'at-most-once' rpc semantics at this level. Retrying the network
571 // connections (i.e. creating flows) is fine since we can cleanup that
572 // state if we abort a call (i.e. close the flow).
573 //
574 // We must ensure that all flows other than r.flow are closed.
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800575 //
576 // TODO(cnicolaou): all errors below are marked as NoRetry
577 // because we want to provide at-most-once rpc semantics so
578 // we only ever attempt an RPC once. In the future, we'll cache
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700579 // responses on the server and then we can retry in-flight
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800580 // RPCs.
Todd Wangef05c062014-11-15 09:51:43 -0800581 go cleanupTryCall(r, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800582
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800583 if doneChan != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800584 go func() {
585 select {
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800586 case <-doneChan:
Cosmos Nicolaou6bed4192015-05-07 21:31:15 -0700587 vtrace.GetSpan(fc.ctx).Annotate("Canceled")
Matt Rosencrantz9346b412014-12-18 15:59:19 -0800588 fc.flow.Cancel()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800589 case <-fc.flow.Closed():
590 }
591 }()
592 }
593
Todd Wangf6a06882015-02-27 17:38:01 -0800594 deadline, _ := ctx.Deadline()
Ankur50a5f392015-02-27 18:46:30 -0800595 if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700596 return nil, verror.NoRetry, false, verr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800597 }
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700598 return fc, verror.NoRetry, false, nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800599 }
Todd Wangef05c062014-11-15 09:51:43 -0800600 if numResponses == len(responses) {
Asim Shankaraae31802015-01-22 11:59:42 -0800601 return c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800602 }
603 }
Todd Wangef05c062014-11-15 09:51:43 -0800604}
605
Asim Shankaraae31802015-01-22 11:59:42 -0800606// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
Todd Wangef05c062014-11-15 09:51:43 -0800607// goroutines, and have closed the flow from each one except skip. This is a
608// blocking function; it should be called in its own goroutine.
609func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
610 numPending := 0
611 for _, r := range responses {
612 switch {
613 case r == nil:
614 // The response hasn't arrived yet.
615 numPending++
616 case r == skip || r.flow == nil:
617 // Either we should skip this flow, or we've closed the flow for this
618 // response already; nothing more to do.
619 default:
620 // We received the response, but haven't closed the flow yet.
621 r.flow.Close()
622 }
623 }
624 // Now we just need to wait for the pending responses and close their flows.
625 for i := 0; i < numPending; i++ {
626 if r := <-ch; r.flow != nil {
627 r.flow.Close()
628 }
629 }
630}
631
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700632// failedTryCall performs ©asynchronous cleanup for tryCall, and returns an
Todd Wangef05c062014-11-15 09:51:43 -0800633// appropriate error from the responses we've already received. All parallel
634// calls in tryCall failed or we timed out if we get here.
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700635func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
Todd Wangef05c062014-11-15 09:51:43 -0800636 go cleanupTryCall(nil, responses, ch)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700637 c.ns.FlushCacheEntry(ctx, name)
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700638 suberrs := []verror.SubErr{}
639 topLevelError := verror.ErrNoServers
640 topLevelAction := verror.RetryRefetch
Cosmos Nicolaou472c6812015-04-21 14:02:14 -0700641 onlyErrNetwork := true
Asim Shankaraae31802015-01-22 11:59:42 -0800642 for _, r := range responses {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700643 if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
644 switch verror.ErrorID(r.serverErr.Err) {
645 case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
646 topLevelError = verror.ErrNotTrusted
647 topLevelAction = verror.NoRetry
Cosmos Nicolaou472c6812015-04-21 14:02:14 -0700648 onlyErrNetwork = false
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700649 case stream.ErrAborted.ID, stream.ErrNetwork.ID:
Cosmos Nicolaou472c6812015-04-21 14:02:14 -0700650 // do nothing
Suharsh Sivakumar0c9d1082015-09-03 13:08:58 -0700651 case verror.ErrTimeout.ID:
652 topLevelError = verror.ErrTimeout
653 onlyErrNetwork = false
Cosmos Nicolaou472c6812015-04-21 14:02:14 -0700654 default:
655 onlyErrNetwork = false
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800656 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700657 suberrs = append(suberrs, *r.serverErr)
Todd Wangef05c062014-11-15 09:51:43 -0800658 }
659 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700660
Cosmos Nicolaou472c6812015-04-21 14:02:14 -0700661 if onlyErrNetwork {
662 // If we only encountered network errors, then report ErrBadProtocol.
663 topLevelError = verror.ErrBadProtocol
664 }
665
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800666 // TODO(cnicolaou): we get system errors for things like dialing using
667 // the 'ws' protocol which can never succeed even if we retry the connection,
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700668 // hence we return RetryRefetch below except for the case where the servers
669 // are not trusted, in case there's no point in retrying at all.
670 // TODO(cnicolaou): implementing at-most-once rpc semantics in the future
671 // will require thinking through all of the cases where the RPC can
672 // be retried by the client whilst it's actually being executed on the
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700673 // server.
Matt Rosencrantzac6cd9b2015-04-20 15:49:37 -0700674 return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700675}
676
Ankur50a5f392015-02-27 18:46:30 -0800677// prepareBlessingsAndDischarges prepares blessings and discharges for
678// the call.
679//
680// This includes: (1) preparing blessings that must be granted to the
681// server, (2) preparing blessings that the client authenticates with,
682// and, (3) preparing any discharges for third-party caveats on the client's
683// blessings.
Ankurdda16492015-04-07 12:35:42 -0700684func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error {
Ankur50a5f392015-02-27 18:46:30 -0800685 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700686 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800687 if fc.flow.LocalPrincipal() == nil {
688 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700689 }
Ankur50a5f392015-02-27 18:46:30 -0800690
Ankur50a5f392015-02-27 18:46:30 -0800691 // Fetch blessings from the client's blessing store that are to be
692 // shared with the server.
693 if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() {
694 // TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings
695 // to the server.
696 return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings)
697 }
698
699 // Fetch any discharges for third-party caveats on the client's blessings.
700 if !fc.blessings.IsZero() && fc.dc != nil {
701 impetus, err := mkDischargeImpetus(fc.server, method, args)
702 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700703 return verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errDischargeImpetus, nil, err))
Ankur50a5f392015-02-27 18:46:30 -0800704 }
705 fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
706 }
Ankurdda16492015-04-07 12:35:42 -0700707
708 // Prepare blessings that must be granted to the server (using any
709 // rpc.Granter implementation in 'opts').
710 //
Todd Wang4264e4b2015-04-16 22:43:40 -0700711 // NOTE(ataly, suharshs): Before invoking the granter, we set the parameters
712 // of the current call. The user can now retrieve the principal via
713 // v23.GetPrincipal(ctx), or via call.LocalPrincipal(). While in theory the
714 // two principals can be different, the flow.LocalPrincipal == nil check at
715 // the beginning of this method ensures that the two are the same and non-nil
716 // at this point in the code.
Ankurdda16492015-04-07 12:35:42 -0700717 ldischargeMap := make(map[string]security.Discharge)
718 for _, d := range fc.discharges {
719 ldischargeMap[d.ID()] = d
720 }
721 seccall := security.NewCall(&security.CallParams{
722 LocalPrincipal: fc.flow.LocalPrincipal(),
723 LocalBlessings: fc.blessings,
724 RemoteBlessings: fc.flow.RemoteBlessings(),
725 LocalEndpoint: fc.flow.LocalEndpoint(),
726 RemoteEndpoint: fc.flow.RemoteEndpoint(),
727 LocalDischarges: ldischargeMap,
728 RemoteDischarges: fc.flow.RemoteDischarges(),
729 Method: method,
730 Suffix: suffix,
731 })
Todd Wang4264e4b2015-04-16 22:43:40 -0700732 if err := fc.prepareGrantedBlessings(ctx, seccall, opts); err != nil {
Ankurdda16492015-04-07 12:35:42 -0700733 return err
734 }
Ankur50a5f392015-02-27 18:46:30 -0800735 return nil
736}
737
Todd Wang4264e4b2015-04-16 22:43:40 -0700738func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, call security.Call, opts []rpc.CallOpt) error {
Asim Shankarb54d7642014-06-05 13:08:04 -0700739 for _, o := range opts {
740 switch v := o.(type) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700741 case rpc.Granter:
Todd Wang4264e4b2015-04-16 22:43:40 -0700742 if b, err := v.Grant(ctx, call); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700743 return verror.New(errBlessingGrant, fc.ctx, err)
Ankur50a5f392015-02-27 18:46:30 -0800744 } else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700745 return verror.New(errBlessingAdd, fc.ctx, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700746 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700747 }
748 }
Ankur50a5f392015-02-27 18:46:30 -0800749 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700750}
751
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700752func (c *client) Close() {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700753 defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -0700754 c.mu.Lock()
755 c.closed = true
756 c.mu.Unlock()
Suharsh Sivakumar04e0e282015-05-02 23:24:04 -0700757 for _, v := range c.vcCache.Close() {
758 c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700759 }
Matt Rosencrantz982ceaa2015-08-10 15:15:10 -0700760 c.wg.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700761}
762
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700763// flowClient implements the RPC client-side protocol for a single RPC, over a
764// flow that's already connected to the server.
765type flowClient struct {
Todd Wang3425a902015-01-21 18:43:59 -0800766 ctx *context.T // context to annotate with call details
767 dec *vom.Decoder // to decode responses and results from the server
768 enc *vom.Encoder // to encode requests and args to the server
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700769 server []string // Blessings bound to the server that authorize it to receive the RPC request from the client.
Todd Wang3425a902015-01-21 18:43:59 -0800770 flow stream.Flow // the underlying flow
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700771 response rpc.Response // each decoded response message is kept here
Asim Shankar1707e432014-05-29 19:42:41 -0700772
Ankure49a86a2014-11-11 18:52:43 -0800773 discharges []security.Discharge // discharges used for this request
774 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700775
Ankur50a5f392015-02-27 18:46:30 -0800776 blessings security.Blessings // the local blessings for the current RPC.
777 grantedBlessings security.Blessings // the blessings granted to the server.
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800778
Asim Shankar1707e432014-05-29 19:42:41 -0700779 sendClosedMu sync.Mutex
780 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800781 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700782}
783
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700784var _ rpc.ClientCall = (*flowClient)(nil)
785var _ rpc.Stream = (*flowClient)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700786
Ankur50a5f392015-02-27 18:46:30 -0800787func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800788 fc := &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800789 ctx: ctx,
Ankure49a86a2014-11-11 18:52:43 -0800790 flow: flow,
Ankur50a5f392015-02-27 18:46:30 -0800791 server: server,
Ankure49a86a2014-11-11 18:52:43 -0800792 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700793 }
Jungho Ahn60408fa2015-03-27 15:28:22 -0700794 typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
795 if typeenc == nil {
Jungho Ahn5d1fe972015-04-27 17:51:32 -0700796 fc.enc = vom.NewEncoder(flow)
797 fc.dec = vom.NewDecoder(flow)
Jungho Ahn60408fa2015-03-27 15:28:22 -0700798 } else {
Jungho Ahn5d1fe972015-04-27 17:51:32 -0700799 fc.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder))
Jungho Ahn60408fa2015-03-27 15:28:22 -0700800 typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
Jungho Ahn5d1fe972015-04-27 17:51:32 -0700801 fc.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder))
Todd Wang34ed4c62014-11-26 15:15:52 -0800802 }
803 return fc, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700804}
805
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700806// close determines the appropriate error to return, in particular,
807// if a timeout or cancelation has occured then any error
808// is turned into a timeout or cancelation as appropriate.
809// Cancelation takes precedence over timeout. This is needed because
810// a timeout can lead to any other number of errors due to the underlying
811// network connection being shutdown abruptly.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800812func (fc *flowClient) close(err error) error {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700813 subErr := verror.SubErr{Err: err, Options: verror.Print}
814 subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800815 if cerr := fc.flow.Close(); cerr != nil && err == nil {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700816 return verror.New(verror.ErrInternal, fc.ctx, subErr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817 }
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700818 if err == nil {
819 return nil
820 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700821 switch verror.ErrorID(err) {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700822 case verror.ErrCanceled.ID:
823 return err
824 case verror.ErrTimeout.ID:
825 // Canceled trumps timeout.
826 if fc.ctx.Err() == context.Canceled {
827 return verror.AddSubErrs(verror.New(verror.ErrCanceled, fc.ctx), fc.ctx, subErr)
828 }
829 return err
830 default:
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800831 switch fc.ctx.Err() {
832 case context.DeadlineExceeded:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700833 timeout := verror.New(verror.ErrTimeout, fc.ctx)
834 err := verror.AddSubErrs(timeout, fc.ctx, subErr)
835 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800836 case context.Canceled:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700837 canceled := verror.New(verror.ErrCanceled, fc.ctx)
838 err := verror.AddSubErrs(canceled, fc.ctx, subErr)
839 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800840 }
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700841 }
842 switch verror.ErrorID(err) {
Jungho Ahn5d1fe972015-04-27 17:51:32 -0700843 case errRequestEncoding.ID, errArgEncoding.ID, errResponseDecoding.ID:
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700844 return verror.New(verror.ErrBadProtocol, fc.ctx, err)
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800845 }
846 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700847}
848
Ankur50a5f392015-02-27 18:46:30 -0800849func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800850 // Encode the Blessings information for the client to authorize the flow.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700851 var blessingsRequest rpc.BlessingsRequest
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800852 if fc.flow.LocalPrincipal() != nil {
Jungho Ahn44d8daf2015-01-16 10:39:15 -0800853 blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800854 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700855 req := rpc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700856 Suffix: suffix,
857 Method: method,
858 NumPosArgs: uint64(len(args)),
Jiri Simsad9a7b3c2015-08-12 16:38:27 -0700859 Deadline: vtime.Deadline{Time: deadline},
Asim Shankarb07ec692015-02-27 23:40:44 -0800860 GrantedBlessings: fc.grantedBlessings,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800861 Blessings: blessingsRequest,
Asim Shankar08642822015-03-02 21:21:09 -0800862 Discharges: fc.discharges,
Matt Rosencrantz2803fe92015-03-09 15:26:32 -0700863 TraceRequest: vtrace.GetRequest(fc.ctx),
Matt Rosencrantz88be1182015-04-27 13:45:43 -0700864 Language: string(i18n.GetLangID(fc.ctx)),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700865 }
866 if err := fc.enc.Encode(req); err != nil {
Jungho Ahn5d1fe972015-04-27 17:51:32 -0700867 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800868 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700869 }
870 for ix, arg := range args {
871 if err := fc.enc.Encode(arg); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700872 berr := verror.New(errArgEncoding, fc.ctx, ix, err)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800873 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700874 }
875 }
876 return nil
877}
878
879func (fc *flowClient) Send(item interface{}) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700880 defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700881 if fc.sendClosed {
Jiri Simsa074bf362015-02-17 09:29:45 -0800882 return verror.New(verror.ErrAborted, fc.ctx)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700883 }
884
885 // The empty request header indicates what follows is a streaming arg.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700886 if err := fc.enc.Encode(rpc.Request{}); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700887 berr := verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800888 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700889 }
890 if err := fc.enc.Encode(item); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700891 berr := verror.New(errArgEncoding, fc.ctx, -1, err)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800892 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700893 }
894 return nil
895}
896
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700897// decodeNetError tests for a net.Error from the lower stream code and
898// translates it into an appropriate error to be returned by the higher level
899// RPC api calls. It also tests for the net.Error being a stream.NetError
900// and if so, uses the error it stores rather than the stream.NetError itself
901// as its retrun value. This allows for the stack trace of the original
902// error to be chained to that of any verror created with it as a first parameter.
903func decodeNetError(ctx *context.T, err error) (verror.IDAction, error) {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800904 if neterr, ok := err.(net.Error); ok {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700905 if streamNeterr, ok := err.(*stream.NetError); ok {
906 err = streamNeterr.Err() // return the error stored in the stream.NetError
907 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800908 if neterr.Timeout() || neterr.Temporary() {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800909 // If a read is canceled in the lower levels we see
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800910 // a timeout error - see readLocked in vc/reader.go
911 if ctx.Err() == context.Canceled {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700912 return verror.ErrCanceled, err
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800913 }
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700914 return verror.ErrTimeout, err
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800915 }
916 }
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700917 if id := verror.ErrorID(err); id != verror.ErrUnknown.ID {
Jiri Simsad9a7b3c2015-08-12 16:38:27 -0700918 return verror.IDAction{
919 ID: id,
920 Action: verror.Action(err),
921 }, err
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700922 }
923 return verror.ErrBadProtocol, err
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800924}
925
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700926func (fc *flowClient) Recv(itemptr interface{}) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700927 defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700928 switch {
929 case fc.response.Error != nil:
Jiri Simsa074bf362015-02-17 09:29:45 -0800930 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700931 case fc.response.EndStreamResults:
932 return io.EOF
933 }
934
935 // Decode the response header and handle errors and EOF.
936 if err := fc.dec.Decode(&fc.response); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700937 id, verr := decodeNetError(fc.ctx, err)
938 berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800939 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700940 }
941 if fc.response.Error != nil {
John Klineadb83962015-06-16 15:28:12 -0700942 return fc.response.Error
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700943 }
944 if fc.response.EndStreamResults {
945 // Return EOF to indicate to the caller that there are no more stream
946 // results. Any error sent by the server is kept in fc.response.Error, and
947 // returned to the user in Finish.
948 return io.EOF
949 }
950 // Decode the streaming result.
951 if err := fc.dec.Decode(itemptr); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -0700952 id, verr := decodeNetError(fc.ctx, err)
953 berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800954 // TODO(cnicolaou): should we be caching this?
955 fc.response.Error = berr
956 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700957 }
958 return nil
959}
960
961func (fc *flowClient) CloseSend() error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700962 defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Tilak Sharma0c766112014-05-20 17:47:27 -0700963 return fc.closeSend()
964}
965
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800966// closeSend ensures CloseSend always returns verror.E.
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800967func (fc *flowClient) closeSend() error {
Asim Shankar1707e432014-05-29 19:42:41 -0700968 fc.sendClosedMu.Lock()
969 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700970 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700971 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700972 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700973 if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700974 // TODO(caprita): Indiscriminately closing the flow below causes
975 // a race as described in:
976 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
977 //
978 // There should be a finer grained way to fix this (for example,
979 // encoding errors should probably still result in closing the
980 // flow); on the flip side, there may exist other instances
981 // where we are closing the flow but should not.
982 //
983 // For now, commenting out the line below removes the flakiness
984 // from our existing unit tests, but this needs to be revisited
985 // and fixed correctly.
986 //
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700987 // return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700988 }
989 fc.sendClosed = true
990 return nil
991}
992
993func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700994 defer apilog.LogCallf(nil, "resultptrs...=%v", resultptrs)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700995 err := fc.finish(resultptrs...)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800996 vtrace.GetSpan(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700997 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700998}
999
Mike Burrows2ec2bb32015-02-26 15:14:43 -08001000// finish ensures Finish always returns a verror.E.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -08001001func (fc *flowClient) finish(resultptrs ...interface{}) error {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -07001002 if fc.finished {
Todd Wangff73e1f2015-02-10 21:45:52 -08001003 err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
Jiri Simsa074bf362015-02-17 09:29:45 -08001004 return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
Ken Ashcraft2b8309a2014-09-09 10:44:43 -07001005 }
1006 fc.finished = true
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001007
Todd Wangce3033b2014-05-23 17:04:44 -07001008 // Call closeSend implicitly, if the user hasn't already called it. There are
1009 // three cases:
1010 // 1) Server is blocked on Recv waiting for the final request message.
1011 // 2) Server has already finished processing, the final response message and
1012 // out args are queued up on the client, and the flow is closed.
1013 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
1014 // response and args aren't queued up yet, and the flow isn't closed.
1015 //
1016 // We must call closeSend to handle case (1) and unblock the server; otherwise
1017 // we'll deadlock with both client and server waiting for each other. We must
1018 // ignore the error (if any) to handle case (2). In that case the flow is
1019 // closed, meaning writes will fail and reads will succeed, and closeSend will
1020 // always return an error. But this isn't a "real" error; the client should
1021 // read the rest of the results and succeed.
1022 _ = fc.closeSend()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001023 // Decode the response header, if it hasn't already been decoded by Recv.
1024 if fc.response.Error == nil && !fc.response.EndStreamResults {
1025 if err := fc.dec.Decode(&fc.response); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -07001026 id, verr := decodeNetError(fc.ctx, err)
1027 berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001028 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001029 }
1030 // The response header must indicate the streaming results have ended.
1031 if fc.response.Error == nil && !fc.response.EndStreamResults {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -07001032 berr := verror.New(errRemainingStreamResults, fc.ctx)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001033 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001034 }
1035 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001036 if fc.response.AckBlessings {
1037 clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
1038 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001039 // Incorporate any VTrace info that was returned.
Matt Rosencrantz2803fe92015-03-09 15:26:32 -07001040 vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001041 if fc.response.Error != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -07001042 id := verror.ErrorID(fc.response.Error)
1043 if id == verror.ErrNoAccess.ID && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001044 // In case the error was caused by a bad discharge, we do not want to get stuck
1045 // with retrying again and again with this discharge. As there is no direct way
1046 // to detect it, we conservatively flush all discharges we used from the cache.
1047 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -07001048 fc.ctx.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Suharsh Sivakumard7d4e222015-06-22 11:10:44 -07001049 fc.dc.Invalidate(fc.ctx, fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001050 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -07001051 if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
1052 return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
1053 }
Jiri Simsa074bf362015-02-17 09:29:45 -08001054 return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001055 }
1056 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Jiri Simsa074bf362015-02-17 09:29:45 -08001057 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001058 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001059 }
1060 for ix, r := range resultptrs {
1061 if err := fc.dec.Decode(r); err != nil {
Cosmos Nicolaoufd54ed92015-04-27 14:12:27 -07001062 id, verr := decodeNetError(fc.ctx, err)
1063 berr := verror.New(id, fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, verr))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001064 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001065 }
1066 }
1067 return fc.close(nil)
1068}
1069
Asim Shankar2d731a92014-09-29 17:46:38 -07001070func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -07001071 defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Asim Shankar8f05c222014-10-06 22:08:19 -07001072 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -07001073}
Ankur50a5f392015-02-27 18:46:30 -08001074
1075func bpatterns(patterns []string) []security.BlessingPattern {
1076 if patterns == nil {
1077 return nil
1078 }
1079 bpatterns := make([]security.BlessingPattern, len(patterns))
1080 for i, p := range patterns {
1081 bpatterns[i] = security.BlessingPattern(p)
1082 }
1083 return bpatterns
1084}