blob: 84f6f4e76f9b8a4cf96d789e17bc075b5e1533bb [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07005package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006
7import (
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "io"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -070010 "math/rand"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080011 "net"
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -080012 "reflect"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -070013 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "sync"
15 "time"
16
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -070017 "v.io/x/lib/vlog"
18
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -070019 "v.io/v23"
Jiri Simsa6ac95222015-02-23 16:11:49 -080020 "v.io/v23/context"
21 "v.io/v23/i18n"
Todd Wang5082a552015-04-02 10:56:11 -070022 "v.io/v23/namespace"
Jiri Simsa6ac95222015-02-23 16:11:49 -080023 "v.io/v23/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070024 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080025 "v.io/v23/security"
26 "v.io/v23/vdl"
Todd Wangf6a06882015-02-27 17:38:01 -080027 vtime "v.io/v23/vdlroot/time"
Jiri Simsa6ac95222015-02-23 16:11:49 -080028 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080029 "v.io/v23/vom"
30 "v.io/v23/vtrace"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080031
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080032 inaming "v.io/x/ref/profiles/internal/naming"
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -070033 "v.io/x/ref/profiles/internal/rpc/stream"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070034 "v.io/x/ref/profiles/internal/rpc/stream/vc"
35 "v.io/x/ref/profiles/internal/rpc/version"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036)
37
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070038const pkgPath = "v.io/x/ref/profiles/internal/rpc"
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080039
Jiri Simsa5293dcb2014-05-10 09:56:38 -070040var (
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080041 // Local errs that are used to provide details to the public ones.
42 errClientCloseAlreadyCalled = verror.Register(pkgPath+".closeAlreadyCalled", verror.NoRetry,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070043 "rpc.Client.Close has already been called")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080044
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070045 errClientFinishAlreadyCalled = verror.Register(pkgPath+".finishAlreadyCalled", verror.NoRetry, "rpc.ClientCall.Finish has already been called")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080046
47 errNonRootedName = verror.Register(pkgPath+".nonRootedName", verror.NoRetry, "{3} does not appear to contain an address")
48
49 errInvalidEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an invalid endpoint")
50
51 errIncompatibleEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an incompatible endpoint")
52
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080053 errNotTrusted = verror.Register(pkgPath+".notTrusted", verror.NoRetry, "name {3} not trusted using blessings {4}{:5}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080054
Asim Shankar6d5e9e72015-03-30 18:51:33 -070055 errAuthError = verror.Register(pkgPath+".authError", verror.RetryRefetch, "{3}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080056
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080057 errSystemRetry = verror.Register(pkgPath+".sysErrorRetryConnection", verror.RetryConnection, "{:3:}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080058
Todd Wang34ed4c62014-11-26 15:15:52 -080059 errVomEncoder = verror.Register(pkgPath+".vomEncoder", verror.NoRetry, "failed to create vom encoder {:3}")
60 errVomDecoder = verror.Register(pkgPath+".vomDecoder", verror.NoRetry, "failed to create vom decoder {:3}")
61
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080062 errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
63
Suharsh Sivakumar720b7042014-12-22 17:33:23 -080064 errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
65
66 errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080067
68 errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
69
Benjamin Prosnitz0db77a22015-01-20 14:25:15 -080070 errMismatchedResults = verror.Register(pkgPath+".mismatchedResults", verror.NoRetry, "got {3} results, but want {4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080071
72 errResultDecoding = verror.Register(pkgPath+".resultDecoding", verror.NoRetry, "failed to decode result #{3}{:4}")
73
74 errResponseDecoding = verror.Register(pkgPath+".responseDecoding", verror.NoRetry, "failed to decode response{:3}")
75
76 errRemainingStreamResults = verror.Register(pkgPath+".remaingStreamResults", verror.NoRetry, "stream closed with remaining stream results")
77
Ankur50a5f392015-02-27 18:46:30 -080078 errNoBlessingsForPeer = verror.Register(pkgPath+".noBlessingsForPeer", verror.NoRetry, "no blessings tagged for peer {3}{:4}")
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -080079
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080080 errBlessingGrant = verror.Register(pkgPath+".blessingGrantFailed", verror.NoRetry, "failed to grant blessing to server with blessings {3}{:4}")
81
82 errBlessingAdd = verror.Register(pkgPath+".blessingAddFailed", verror.NoRetry, "failed to add blessing granted to server {3}{:4}")
Suharsh Sivakumar0ed10c22015-04-06 12:55:55 -070083
84 errNoPrincipal = verror.Register(pkgPath+".noPrincipal", verror.NoRetry, "principal required for secure connections")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085)
86
87type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080088 streamMgr stream.Manager
Todd Wang5082a552015-04-02 10:56:11 -070089 ns namespace.T
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080090 vcOpts []stream.VCOpt // vc opts passed to dial
91 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092
Jungho Ahn25545d32015-01-26 15:14:14 -080093 // We cache the IP networks on the device since it is not that cheap to read
94 // network interfaces through os syscall.
95 // TODO(jhahn): Add monitoring the network interface changes.
96 ipNets []*net.IPNet
97
Jiri Simsa5293dcb2014-05-10 09:56:38 -070098 // We support concurrent calls to StartCall and Close, so we must protect the
99 // vcMap. Everything else is initialized upon client construction, and safe
100 // to use concurrently.
101 vcMapMu sync.Mutex
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800102 vcMap map[vcMapKey]*vcInfo
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700103
Ankure49a86a2014-11-11 18:52:43 -0800104 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700105}
106
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700107var _ rpc.Client = (*client)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700108
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700109type vcInfo struct {
110 vc stream.VC
111 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700112}
113
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800114type vcMapKey struct {
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700115 endpoint string
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700116 clientPublicKey string // clientPublicKey = "" means we are running unencrypted (i.e. SecurityNone)
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800117}
118
Todd Wang5082a552015-04-02 10:56:11 -0700119func InternalNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700120 c := &client{
Ankure49a86a2014-11-11 18:52:43 -0800121 streamMgr: streamMgr,
122 ns: ns,
Jungho Ahn25545d32015-01-26 15:14:14 -0800123 ipNets: ipNetworks(),
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800124 vcMap: make(map[vcMapKey]*vcInfo),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700125 }
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800126 c.dc = InternalNewDischargeClient(nil, c, 0)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700127 for _, opt := range opts {
128 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800129 switch v := opt.(type) {
130 case stream.VCOpt:
131 c.vcOpts = append(c.vcOpts, v)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800132 case PreferredProtocols:
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800133 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700134 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700135 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800136
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700137 return c, nil
138}
139
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700140func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700141 c.vcMapMu.Lock()
142 defer c.vcMapMu.Unlock()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800143 if c.vcMap == nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800144 return nil, verror.New(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800145 }
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700146 vcKey := vcMapKey{endpoint: ep.String()}
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700147 if principal != nil {
148 vcKey.clientPublicKey = principal.PublicKey().String()
149 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800150 if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700151 if flow, err := vcinfo.vc.Connect(); err == nil {
152 return flow, nil
153 }
154 // If the vc fails to establish a new flow, we assume it's
155 // broken, remove it from the map, and proceed to establishing
156 // a new vc.
157 // TODO(caprita): Should we distinguish errors due to vc being
158 // closed from other errors? If not, should we call vc.Close()
159 // before removing the vc from the map?
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800160 delete(c.vcMap, vcKey)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700161 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800162 sm := c.streamMgr
Robin Thellendee439642014-10-20 14:39:17 -0700163 c.vcMapMu.Unlock()
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700164
165 vc, err := sm.Dial(ep, principal, vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -0700166 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 if err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800168 if strings.Contains(err.Error(), "authentication failed") {
Asim Shankar6d5e9e72015-03-30 18:51:33 -0700169 return nil, verror.New(errAuthError, ctx, err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800170 } else {
Todd Wangff73e1f2015-02-10 21:45:52 -0800171 return nil, verror.New(errSystemRetry, ctx, err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800172 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700173 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800174 if c.vcMap == nil {
175 sm.ShutdownEndpoint(ep)
Todd Wangff73e1f2015-02-10 21:45:52 -0800176 return nil, verror.New(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800177 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800178 if othervc, exists := c.vcMap[vcKey]; exists {
Robin Thellendee439642014-10-20 14:39:17 -0700179 vc = othervc.vc
180 // TODO(ashankar,toddw): Figure out how to close up the VC that
181 // is discarded. vc.Close?
182 } else {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800183 c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
Robin Thellendee439642014-10-20 14:39:17 -0700184 }
Asim Shankar6d5e9e72015-03-30 18:51:33 -0700185 return vc.Connect()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700186}
187
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700188// A randomized exponential backoff. The randomness deters error convoys
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700189// from forming. The first time you retry n should be 0, then 1 etc.
190func backoff(n uint, deadline time.Time) bool {
191 // This is ((100 to 200) * 2^n) ms.
192 b := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700193 if b > maxBackoff {
194 b = maxBackoff
195 }
196 r := deadline.Sub(time.Now())
197 if b > r {
198 // We need to leave a little time for the call to start or
199 // we'll just timeout in startCall before we actually do
200 // anything. If we just have a millisecond left, give up.
201 if r <= time.Millisecond {
202 return false
203 }
204 b = r - time.Millisecond
205 }
206 time.Sleep(b)
207 return true
208}
209
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700210func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800211 defer vlog.LogCall()()
212 return c.startCall(ctx, name, method, args, opts)
213}
214
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700215func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
216 defer vlog.LogCall()()
217
218 deadline := getDeadline(ctx, opts)
219
220 var lastErr error
221 for retries := uint(0); ; retries++ {
222 call, err := c.startCall(ctx, name, method, inArgs, opts)
223 if err != nil {
224 return err
225 }
226 err = call.Finish(outArgs...)
227 if err == nil {
228 return nil
229 }
230 lastErr = err
231 // We only retry if RetryBackoff is returned by the application because other
232 // RetryConnection and RetryRefetch required actions by the client before
233 // retrying.
234 if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
235 vlog.Infof("Cannot retry after error: %s", lastErr)
236 break
237 }
238 if !backoff(retries, deadline) {
239 break
240 }
241 vlog.Infof("Retrying due to error: %s", lastErr)
242 }
243 return lastErr
244}
245
246func getDeadline(ctx *context.T, opts []rpc.CallOpt) time.Time {
247 // Context specified deadline.
248 deadline, hasDeadline := ctx.Deadline()
249 if !hasDeadline {
250 // Default deadline.
251 deadline = time.Now().Add(defaultCallTimeout)
252 }
253 if r, ok := getRetryTimeoutOpt(opts); ok {
254 // Caller specified deadline.
255 deadline = time.Now().Add(r)
256 }
257 return deadline
258}
259
260func shouldRetryBackoff(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
261 switch {
262 case noRetry(opts):
263 return false
264 case action != verror.RetryBackoff:
265 return false
266 case time.Now().After(deadline):
267 return false
268 }
269 return true
270}
271
272func shouldRetry(action verror.ActionCode, deadline time.Time, opts []rpc.CallOpt) bool {
273 switch {
274 case noRetry(opts):
275 return false
276 case action != verror.RetryConnection && action != verror.RetryRefetch:
277 return false
278 case time.Now().After(deadline):
279 return false
280 case action == verror.RetryRefetch && getNoNamespaceOpt(opts):
281 // If we're skipping resolution and there are no servers for
282 // this call retrying is not going to help, we can't come up
283 // with new servers if there is no resolution.
284 return false
285 }
286 return true
287}
288
Todd Wangb31da592015-02-20 12:50:39 -0800289func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) (security.DischargeImpetus, error) {
Ankure49a86a2014-11-11 18:52:43 -0800290 var impetus security.DischargeImpetus
291 if len(serverBlessings) > 0 {
292 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
293 for i, b := range serverBlessings {
294 impetus.Server[i] = security.BlessingPattern(b)
295 }
296 }
297 impetus.Method = method
298 if len(args) > 0 {
Todd Wangb31da592015-02-20 12:50:39 -0800299 impetus.Arguments = make([]*vdl.Value, len(args))
Ankure49a86a2014-11-11 18:52:43 -0800300 for i, a := range args {
Todd Wangb31da592015-02-20 12:50:39 -0800301 vArg, err := vdl.ValueFromReflect(reflect.ValueOf(a))
302 if err != nil {
303 return security.DischargeImpetus{}, err
304 }
305 impetus.Arguments[i] = vArg
Ankure49a86a2014-11-11 18:52:43 -0800306 }
307 }
Todd Wangb31da592015-02-20 12:50:39 -0800308 return impetus, nil
Ankure49a86a2014-11-11 18:52:43 -0800309}
310
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800311// startCall ensures StartCall always returns verror.E.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700312func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800313 if !ctx.Initialized() {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700314 return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700315 }
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700316 ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
317 if err := canCreateServerAuthorizer(ctx, opts); err != nil {
318 return nil, verror.New(verror.ErrBadArg, ctx, err)
Ankur50a5f392015-02-27 18:46:30 -0800319 }
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700320
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700321 deadline := getDeadline(ctx, opts)
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800322
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800323 var lastErr error
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700324 for retries := uint(0); ; retries++ {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800325 call, action, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700326 if err == nil {
327 return call, nil
328 }
329 lastErr = err
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700330 if !shouldRetry(action, deadline, opts) {
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800331 span.Annotatef("Cannot retry after error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700332 break
333 }
Matt Rosencrantz254d5702015-04-01 09:47:38 -0700334 if !backoff(retries, deadline) {
335 break
336 }
Suharsh Sivakumar076e9532015-04-09 17:36:25 -0700337 span.Annotatef("Retrying due to error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700338 }
339 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700340}
341
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800342type serverStatus struct {
Ankur50a5f392015-02-27 18:46:30 -0800343 index int
344 suffix string
345 flow stream.Flow
346 blessings []string // authorized server blessings
347 rejectedBlessings []security.RejectedBlessing // rejected server blessings
348 err error
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800349}
350
Asim Shankaraae31802015-01-22 11:59:42 -0800351// tryCreateFlow attempts to establish a Flow to "server" (which must be a
352// rooted name), over which a method invocation request could be sent.
Ankur50a5f392015-02-27 18:46:30 -0800353//
354// The server at the remote end of the flow is authorized using the provided
355// authorizer, both during creation of the VC underlying the flow and the
356// flow itself.
Cosmos Nicolaou00a0f802014-11-16 22:44:55 -0800357// TODO(cnicolaou): implement real, configurable load balancing.
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700358func (c *client) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800359 status := &serverStatus{index: index}
Asim Shankarf4864f42014-11-25 18:53:05 -0800360 var span vtrace.Span
Asim Shankaraae31802015-01-22 11:59:42 -0800361 ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow")
Asim Shankarf4864f42014-11-25 18:53:05 -0800362 span.Annotatef("address:%v", server)
Asim Shankaraae31802015-01-22 11:59:42 -0800363 defer func() {
364 ch <- status
365 span.Finish()
366 }()
Ankur50a5f392015-02-27 18:46:30 -0800367
Asim Shankaraae31802015-01-22 11:59:42 -0800368 address, suffix := naming.SplitAddressName(server)
369 if len(address) == 0 {
Todd Wangff73e1f2015-02-10 21:45:52 -0800370 status.err = verror.New(errNonRootedName, ctx, server)
Asim Shankaraae31802015-01-22 11:59:42 -0800371 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800372 }
Ankur50a5f392015-02-27 18:46:30 -0800373 status.suffix = suffix
374
Asim Shankaraae31802015-01-22 11:59:42 -0800375 ep, err := inaming.NewEndpoint(address)
376 if err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800377 status.err = verror.New(errInvalidEndpoint, ctx, address)
Asim Shankaraae31802015-01-22 11:59:42 -0800378 return
379 }
380 if err = version.CheckCompatibility(ep); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800381 status.err = verror.New(errIncompatibleEndpoint, ctx, ep)
Asim Shankaraae31802015-01-22 11:59:42 -0800382 return
383 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700384 if status.flow, status.err = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700385 vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.err)
Asim Shankaraae31802015-01-22 11:59:42 -0800386 return
387 }
Ankur50a5f392015-02-27 18:46:30 -0800388
389 // Authorize the remote end of the flow using the provided authorizer.
390 if status.flow.LocalPrincipal() == nil {
391 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700392 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800393 return
394 }
395
Ankurd8646812015-03-12 10:48:41 -0700396 seccall := security.NewCall(&security.CallParams{
Ankur50a5f392015-02-27 18:46:30 -0800397 LocalPrincipal: status.flow.LocalPrincipal(),
398 LocalBlessings: status.flow.LocalBlessings(),
399 RemoteBlessings: status.flow.RemoteBlessings(),
400 LocalEndpoint: status.flow.LocalEndpoint(),
401 RemoteEndpoint: status.flow.RemoteEndpoint(),
402 RemoteDischarges: status.flow.RemoteDischarges(),
403 Method: method,
Matt Rosencrantz250558f2015-03-17 11:37:31 -0700404 Suffix: status.suffix,
405 })
406 ctx = security.SetCall(ctx, seccall)
407 if err := auth.Authorize(ctx); err != nil {
Ankur50a5f392015-02-27 18:46:30 -0800408 status.err = verror.New(verror.ErrNotTrusted, ctx, name, status.flow.RemoteBlessings(), err)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700409 vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.err)
Ankur50a5f392015-02-27 18:46:30 -0800410 status.flow.Close()
411 status.flow = nil
412 return
413 }
Ankur9e75e7f2015-03-18 18:48:41 -0700414 status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx)
Asim Shankaraae31802015-01-22 11:59:42 -0800415 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800416}
417
Asim Shankaraae31802015-01-22 11:59:42 -0800418// tryCall makes a single attempt at a call. It may connect to multiple servers
419// (all that serve "name"), but will invoke the method on at most one of them
420// (the server running on the most preferred protcol and network amongst all
421// the servers that were successfully connected to and authorized).
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700422func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, verror.ActionCode, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800423 var resolved *naming.MountEntry
Asim Shankaraae31802015-01-22 11:59:42 -0800424 var err error
Asim Shankar263c73b2015-03-19 18:31:26 -0700425 var blessingPattern security.BlessingPattern
426 blessingPattern, name = security.SplitPatternName(name)
David Why Use Two When One Will Do Presotto38788d42015-03-31 17:13:54 -0700427 if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
David Why Use Two When One Will Do Presotto8de85852015-01-21 11:05:09 -0800428 vlog.Errorf("Resolve: %v", err)
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800429 // We always return NoServers as the error so that the caller knows
430 // that's ok to retry the operation since the name may be registered
431 // in the near future.
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700432 switch {
433 case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
Jiri Simsa074bf362015-02-17 09:29:45 -0800434 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name)
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700435 case verror.ErrorID(err) == verror.ErrNoServers.ID:
436 // Avoid wrapping errors unnecessarily.
437 return nil, verror.NoRetry, err
438 default:
439 return nil, verror.NoRetry, verror.New(verror.ErrNoServers, ctx, name, err)
Ryan Brown6153c6c2014-12-11 13:10:09 -0800440 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700441 } else {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800442 if len(resolved.Servers) == 0 {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700443 // This should never happen.
444 return nil, verror.NoRetry, verror.New(verror.ErrInternal, ctx, name)
Suharsh Sivakumar65e44c22014-12-10 17:15:19 -0800445 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800446 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800447 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800448 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700449 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700450 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800451
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700452 // We need to ensure calls to v23 factory methods do not occur during runtime
453 // initialization. Currently, the agent, which uses SecurityNone, is the only caller
454 // during runtime initialization. We would like to set the principal in the context
455 // to nil if we are running in SecurityNone, but this always results in a panic since
456 // the agent client would trigger the call v23.SetPrincipal during runtime
457 // initialization. So, we gate the call to v23.GetPrincipal instead since the agent
458 // client will have callEncrypted == false.
459 // Potential solutions to this are:
460 // (1) Create a separate client for the agent so that this code doesn't have to
461 // account for its use during runtime initialization.
462 // (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
463 // on here.
464 var principal security.Principal
465 if callEncrypted(opts) {
Suharsh Sivakumar0ed10c22015-04-06 12:55:55 -0700466 if principal = v23.GetPrincipal(ctx); principal == nil {
467 return nil, verror.NoRetry, verror.New(errNoPrincipal, ctx)
468 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700469 }
470
Asim Shankarb547ea92015-02-17 18:49:45 -0800471 // servers is now ordered by the priority heurestic implemented in
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800472 // filterAndOrderServers.
Asim Shankaraae31802015-01-22 11:59:42 -0800473 //
474 // Try to connect to all servers in parallel. Provide sufficient
475 // buffering for all of the connections to finish instantaneously. This
476 // is important because we want to process the responses in priority
477 // order; that order is indicated by the order of entries in servers.
478 // So, if two respones come in at the same 'instant', we prefer the
479 // first in the resolved.Servers)
480 attempts := len(resolved.Servers)
Ankur50a5f392015-02-27 18:46:30 -0800481
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800482 responses := make([]*serverStatus, attempts)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800483 ch := make(chan *serverStatus, attempts)
Asim Shankaraae31802015-01-22 11:59:42 -0800484 vcOpts := append(getVCOpts(opts), c.vcOpts...)
Asim Shankar263c73b2015-03-19 18:31:26 -0700485 authorizer := newServerAuthorizer(blessingPattern, opts...)
Asim Shankaraae31802015-01-22 11:59:42 -0800486 for i, server := range resolved.Names() {
Asim Shankar263c73b2015-03-19 18:31:26 -0700487 // Create a copy of vcOpts for each call to tryCreateFlow
488 // to avoid concurrent tryCreateFlows from stepping on each
489 // other while manipulating their copy of the options.
Ankur50a5f392015-02-27 18:46:30 -0800490 vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
491 copy(vcOptsCopy, vcOpts)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700492 go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700493 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800494
Todd Wangf6a06882015-02-27 17:38:01 -0800495 var timeoutChan <-chan time.Time
496 if deadline, ok := ctx.Deadline(); ok {
497 timeoutChan = time.After(deadline.Sub(time.Now()))
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800498 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800499
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800500 for {
Todd Wangef05c062014-11-15 09:51:43 -0800501 // Block for at least one new response from the server, or the timeout.
502 select {
503 case r := <-ch:
504 responses[r.index] = r
505 // Read as many more responses as we can without blocking.
506 LoopNonBlocking:
507 for {
508 select {
509 default:
510 break LoopNonBlocking
511 case r := <-ch:
512 responses[r.index] = r
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800513 }
Todd Wangef05c062014-11-15 09:51:43 -0800514 }
515 case <-timeoutChan:
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700516 vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
Asim Shankaraae31802015-01-22 11:59:42 -0800517 _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
Todd Wang8fa38762015-03-25 14:04:59 -0700518 if verror.ErrorID(err) != verror.ErrTimeout.ID {
Jiri Simsa074bf362015-02-17 09:29:45 -0800519 return nil, verror.NoRetry, verror.New(verror.ErrTimeout, ctx, err)
Cosmos Nicolaou38209d42014-12-09 16:50:38 -0800520 }
521 return nil, verror.NoRetry, err
Todd Wangef05c062014-11-15 09:51:43 -0800522 }
523
Ankur50a5f392015-02-27 18:46:30 -0800524 dc := c.dc
525 if shouldNotFetchDischarges(opts) {
526 dc = nil
527 }
Todd Wangef05c062014-11-15 09:51:43 -0800528 // Process new responses, in priority order.
529 numResponses := 0
530 for _, r := range responses {
531 if r != nil {
532 numResponses++
533 }
534 if r == nil || r.flow == nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800535 continue
536 }
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800537
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800538 doneChan := ctx.Done()
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800539 r.flow.SetDeadline(doneChan)
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700540 // TODO(cnicolaou): continue verror testing from here.
Ankur50a5f392015-02-27 18:46:30 -0800541 fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
542 if err != nil {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700543 return nil, verror.NoRetry, err
Ankur50a5f392015-02-27 18:46:30 -0800544 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800545
Ankurdda16492015-04-07 12:35:42 -0700546 if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
Ankur50a5f392015-02-27 18:46:30 -0800547 r.err = verror.New(verror.ErrNotTrusted, ctx, name, r.flow.RemoteBlessings(), err)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700548 vlog.VI(2).Infof("rpc: err: %s", r.err)
Ankur50a5f392015-02-27 18:46:30 -0800549 r.flow.Close()
550 r.flow = nil
551 continue
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800552 }
553
Todd Wangef05c062014-11-15 09:51:43 -0800554 // This is the 'point of no return'; once the RPC is started (fc.start
555 // below) we can't be sure if it makes it to the server or not so, this
556 // code will never call fc.start more than once to ensure that we provide
557 // 'at-most-once' rpc semantics at this level. Retrying the network
558 // connections (i.e. creating flows) is fine since we can cleanup that
559 // state if we abort a call (i.e. close the flow).
560 //
561 // We must ensure that all flows other than r.flow are closed.
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800562 //
563 // TODO(cnicolaou): all errors below are marked as NoRetry
564 // because we want to provide at-most-once rpc semantics so
565 // we only ever attempt an RPC once. In the future, we'll cache
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700566 // responses on the server and then we can retry in-flight
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800567 // RPCs.
Todd Wangef05c062014-11-15 09:51:43 -0800568 go cleanupTryCall(r, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800569
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800570 if doneChan != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800571 go func() {
572 select {
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800573 case <-doneChan:
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800574 vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
Matt Rosencrantz9346b412014-12-18 15:59:19 -0800575 fc.flow.Cancel()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800576 case <-fc.flow.Closed():
577 }
578 }()
579 }
580
Todd Wangf6a06882015-02-27 17:38:01 -0800581 deadline, _ := ctx.Deadline()
Ankur50a5f392015-02-27 18:46:30 -0800582 if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800583 return nil, verror.NoRetry, verr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800584 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800585 return fc, verror.NoRetry, nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800586 }
Todd Wangef05c062014-11-15 09:51:43 -0800587 if numResponses == len(responses) {
Asim Shankaraae31802015-01-22 11:59:42 -0800588 return c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800589 }
590 }
Todd Wangef05c062014-11-15 09:51:43 -0800591}
592
Asim Shankaraae31802015-01-22 11:59:42 -0800593// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
Todd Wangef05c062014-11-15 09:51:43 -0800594// goroutines, and have closed the flow from each one except skip. This is a
595// blocking function; it should be called in its own goroutine.
596func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
597 numPending := 0
598 for _, r := range responses {
599 switch {
600 case r == nil:
601 // The response hasn't arrived yet.
602 numPending++
603 case r == skip || r.flow == nil:
604 // Either we should skip this flow, or we've closed the flow for this
605 // response already; nothing more to do.
606 default:
607 // We received the response, but haven't closed the flow yet.
608 r.flow.Close()
609 }
610 }
611 // Now we just need to wait for the pending responses and close their flows.
612 for i := 0; i < numPending; i++ {
613 if r := <-ch; r.flow != nil {
614 r.flow.Close()
615 }
616 }
617}
618
619// failedTryCall performs asynchronous cleanup for tryCall, and returns an
620// appropriate error from the responses we've already received. All parallel
621// calls in tryCall failed or we timed out if we get here.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700622func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, error) {
Todd Wangef05c062014-11-15 09:51:43 -0800623 go cleanupTryCall(nil, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800624 c.ns.FlushCacheEntry(name)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800625 noconn, untrusted := []string{}, []string{}
Asim Shankaraae31802015-01-22 11:59:42 -0800626 for _, r := range responses {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800627 if r != nil && r.err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800628 switch {
Todd Wang8fa38762015-03-25 14:04:59 -0700629 case verror.ErrorID(r.err) == verror.ErrNotTrusted.ID || verror.ErrorID(r.err) == errAuthError.ID:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800630 untrusted = append(untrusted, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800631 default:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800632 noconn = append(noconn, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800633 }
Todd Wangef05c062014-11-15 09:51:43 -0800634 }
635 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800636 // TODO(cnicolaou): we get system errors for things like dialing using
637 // the 'ws' protocol which can never succeed even if we retry the connection,
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700638 // hence we return RetryRefetch below except for the case where the servers
639 // are not trusted, in case there's no point in retrying at all.
640 // TODO(cnicolaou): implementing at-most-once rpc semantics in the future
641 // will require thinking through all of the cases where the RPC can
642 // be retried by the client whilst it's actually being executed on the
643 // client.
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800644 switch {
645 case len(untrusted) > 0 && len(noconn) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800646 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServersAndAuth, ctx, append(noconn, untrusted...))
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800647 case len(noconn) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800648 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, noconn)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800649 case len(untrusted) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800650 return nil, verror.NoRetry, verror.New(verror.ErrNotTrusted, ctx, untrusted)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800651 default:
Jiri Simsa074bf362015-02-17 09:29:45 -0800652 return nil, verror.RetryRefetch, verror.New(verror.ErrTimeout, ctx)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800653 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700654}
655
Ankur50a5f392015-02-27 18:46:30 -0800656// prepareBlessingsAndDischarges prepares blessings and discharges for
657// the call.
658//
659// This includes: (1) preparing blessings that must be granted to the
660// server, (2) preparing blessings that the client authenticates with,
661// and, (3) preparing any discharges for third-party caveats on the client's
662// blessings.
Ankurdda16492015-04-07 12:35:42 -0700663func (fc *flowClient) prepareBlessingsAndDischarges(ctx *context.T, method, suffix string, args []interface{}, rejectedServerBlessings []security.RejectedBlessing, opts []rpc.CallOpt) error {
Ankur50a5f392015-02-27 18:46:30 -0800664 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700665 // SecurityNone.
Ankur50a5f392015-02-27 18:46:30 -0800666 if fc.flow.LocalPrincipal() == nil {
667 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700668 }
Ankur50a5f392015-02-27 18:46:30 -0800669
Ankur50a5f392015-02-27 18:46:30 -0800670 // Fetch blessings from the client's blessing store that are to be
671 // shared with the server.
672 if fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...); fc.blessings.IsZero() {
673 // TODO(ataly, ashankar): We need not error out here and instead can just send the <nil> blessings
674 // to the server.
675 return verror.New(errNoBlessingsForPeer, fc.ctx, fc.server, rejectedServerBlessings)
676 }
677
678 // Fetch any discharges for third-party caveats on the client's blessings.
679 if !fc.blessings.IsZero() && fc.dc != nil {
680 impetus, err := mkDischargeImpetus(fc.server, method, args)
681 if err != nil {
682 // TODO(toddw): Fix up the internal error.
683 return verror.New(verror.ErrBadProtocol, fc.ctx, fmt.Errorf("couldn't make discharge impetus: %v", err))
684 }
685 fc.discharges = fc.dc.PrepareDischarges(fc.ctx, fc.blessings.ThirdPartyCaveats(), impetus)
686 }
Ankurdda16492015-04-07 12:35:42 -0700687
688 // Prepare blessings that must be granted to the server (using any
689 // rpc.Granter implementation in 'opts').
690 //
691 // NOTE(ataly, suharshs): Before invoking the granter, we set the parameters of
692 // the current call on the context. The context would now have two principals
693 // set on it -- one available via v23.GetPrincipal(ctx) and the other available
694 // via security.GetCall(ctx).LocalPrincipal(). While in theory the two principals
695 // can be different, the flow.LocalPrincipal == nil check at the beginning
696 // of this method ensures that the two are the same and non-nil at this point
697 // in the code.
698 ldischargeMap := make(map[string]security.Discharge)
699 for _, d := range fc.discharges {
700 ldischargeMap[d.ID()] = d
701 }
702 seccall := security.NewCall(&security.CallParams{
703 LocalPrincipal: fc.flow.LocalPrincipal(),
704 LocalBlessings: fc.blessings,
705 RemoteBlessings: fc.flow.RemoteBlessings(),
706 LocalEndpoint: fc.flow.LocalEndpoint(),
707 RemoteEndpoint: fc.flow.RemoteEndpoint(),
708 LocalDischarges: ldischargeMap,
709 RemoteDischarges: fc.flow.RemoteDischarges(),
710 Method: method,
711 Suffix: suffix,
712 })
713 ctx = security.SetCall(ctx, seccall)
714 if err := fc.prepareGrantedBlessings(ctx, opts); err != nil {
715 return err
716 }
Ankur50a5f392015-02-27 18:46:30 -0800717 return nil
718}
719
Ankurdda16492015-04-07 12:35:42 -0700720func (fc *flowClient) prepareGrantedBlessings(ctx *context.T, opts []rpc.CallOpt) error {
Asim Shankarb54d7642014-06-05 13:08:04 -0700721 for _, o := range opts {
722 switch v := o.(type) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700723 case rpc.Granter:
Ankurdda16492015-04-07 12:35:42 -0700724 if b, err := v.Grant(ctx); err != nil {
Ankur50a5f392015-02-27 18:46:30 -0800725 return verror.New(errBlessingGrant, fc.ctx, fc.server, err)
726 } else if fc.grantedBlessings, err = security.UnionOfBlessings(fc.grantedBlessings, b); err != nil {
727 return verror.New(errBlessingAdd, fc.ctx, fc.server, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700728 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700729 }
730 }
Ankur50a5f392015-02-27 18:46:30 -0800731 return nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700732}
733
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700734func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700735 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700736 c.vcMapMu.Lock()
737 for _, v := range c.vcMap {
738 c.streamMgr.ShutdownEndpoint(v.remoteEP)
739 }
740 c.vcMap = nil
741 c.vcMapMu.Unlock()
742}
743
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700744// flowClient implements the RPC client-side protocol for a single RPC, over a
745// flow that's already connected to the server.
746type flowClient struct {
Todd Wang3425a902015-01-21 18:43:59 -0800747 ctx *context.T // context to annotate with call details
748 dec *vom.Decoder // to decode responses and results from the server
749 enc *vom.Encoder // to encode requests and args to the server
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700750 server []string // Blessings bound to the server that authorize it to receive the RPC request from the client.
Todd Wang3425a902015-01-21 18:43:59 -0800751 flow stream.Flow // the underlying flow
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700752 response rpc.Response // each decoded response message is kept here
Asim Shankar1707e432014-05-29 19:42:41 -0700753
Ankure49a86a2014-11-11 18:52:43 -0800754 discharges []security.Discharge // discharges used for this request
755 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700756
Ankur50a5f392015-02-27 18:46:30 -0800757 blessings security.Blessings // the local blessings for the current RPC.
758 grantedBlessings security.Blessings // the blessings granted to the server.
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800759
Asim Shankar1707e432014-05-29 19:42:41 -0700760 sendClosedMu sync.Mutex
761 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800762 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700763}
764
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700765var _ rpc.ClientCall = (*flowClient)(nil)
766var _ rpc.Stream = (*flowClient)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700767
Ankur50a5f392015-02-27 18:46:30 -0800768func newFlowClient(ctx *context.T, flow stream.Flow, server []string, dc vc.DischargeClient) (*flowClient, error) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800769 fc := &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800770 ctx: ctx,
Ankure49a86a2014-11-11 18:52:43 -0800771 flow: flow,
Ankur50a5f392015-02-27 18:46:30 -0800772 server: server,
Ankure49a86a2014-11-11 18:52:43 -0800773 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700774 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800775 var err error
Jungho Ahn60408fa2015-03-27 15:28:22 -0700776 typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
777 if typeenc == nil {
778 if fc.enc, err = vom.NewEncoder(flow); err != nil {
779 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
780 return nil, fc.close(berr)
781 }
782 if fc.dec, err = vom.NewDecoder(flow); err != nil {
783 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
784 return nil, fc.close(berr)
785 }
786 } else {
787 if fc.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
788 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
789 return nil, fc.close(berr)
790 }
791 typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
792 if fc.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
793 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
794 return nil, fc.close(berr)
795 }
Todd Wang34ed4c62014-11-26 15:15:52 -0800796 }
797 return fc, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700798}
799
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800800func (fc *flowClient) close(err error) error {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700801 if _, ok := err.(verror.E); err != nil && !ok {
802 // TODO(cnicolaou): remove this once the second CL in this
803 // series of CLs to use verror consistently is complete.
804 vlog.Infof("WARNING: expected %v to be a verror", err)
805 }
806 subErr := verror.SubErr{Err: err, Options: verror.Print}
807 subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800808 if cerr := fc.flow.Close(); cerr != nil && err == nil {
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700809 return verror.New(verror.ErrInternal, fc.ctx, subErr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700810 }
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800811 switch {
Todd Wang8fa38762015-03-25 14:04:59 -0700812 case verror.ErrorID(err) == verror.ErrBadProtocol.ID:
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800813 switch fc.ctx.Err() {
814 case context.DeadlineExceeded:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700815 timeout := verror.New(verror.ErrTimeout, fc.ctx)
816 err := verror.AddSubErrs(timeout, fc.ctx, subErr)
817 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800818 case context.Canceled:
Cosmos Nicolaoub291d6c2015-03-27 09:01:46 -0700819 canceled := verror.New(verror.ErrCanceled, fc.ctx)
820 err := verror.AddSubErrs(canceled, fc.ctx, subErr)
821 return err
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800822 }
Todd Wang8fa38762015-03-25 14:04:59 -0700823 case verror.ErrorID(err) == verror.ErrTimeout.ID:
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800824 // Canceled trumps timeout.
825 if fc.ctx.Err() == context.Canceled {
826 // TODO(cnicolaou,m3b): reintroduce 'append' when the new verror API is done.
Jiri Simsa074bf362015-02-17 09:29:45 -0800827 return verror.New(verror.ErrCanceled, fc.ctx, err.Error())
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800828 }
829 }
830 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700831}
832
Ankur50a5f392015-02-27 18:46:30 -0800833func (fc *flowClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800834 // Encode the Blessings information for the client to authorize the flow.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700835 var blessingsRequest rpc.BlessingsRequest
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800836 if fc.flow.LocalPrincipal() != nil {
Jungho Ahn44d8daf2015-01-16 10:39:15 -0800837 blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800838 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700839 req := rpc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700840 Suffix: suffix,
841 Method: method,
842 NumPosArgs: uint64(len(args)),
Todd Wangf6a06882015-02-27 17:38:01 -0800843 Deadline: vtime.Deadline{deadline},
Asim Shankarb07ec692015-02-27 23:40:44 -0800844 GrantedBlessings: fc.grantedBlessings,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800845 Blessings: blessingsRequest,
Asim Shankar08642822015-03-02 21:21:09 -0800846 Discharges: fc.discharges,
Matt Rosencrantz2803fe92015-03-09 15:26:32 -0700847 TraceRequest: vtrace.GetRequest(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700848 }
849 if err := fc.enc.Encode(req); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800850 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800851 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700852 }
853 for ix, arg := range args {
854 if err := fc.enc.Encode(arg); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800855 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, ix, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800856 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700857 }
858 }
859 return nil
860}
861
862func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700863 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700864 if fc.sendClosed {
Jiri Simsa074bf362015-02-17 09:29:45 -0800865 return verror.New(verror.ErrAborted, fc.ctx)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700866 }
867
868 // The empty request header indicates what follows is a streaming arg.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700869 if err := fc.enc.Encode(rpc.Request{}); err != nil {
870 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, rpc.Request{}, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800871 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700872 }
873 if err := fc.enc.Encode(item); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800874 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, -1, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800875 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700876 }
877 return nil
878}
879
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800880func decodeNetError(ctx *context.T, err error) verror.IDAction {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800881 if neterr, ok := err.(net.Error); ok {
882 if neterr.Timeout() || neterr.Temporary() {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800883 // If a read is canceled in the lower levels we see
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800884 // a timeout error - see readLocked in vc/reader.go
885 if ctx.Err() == context.Canceled {
Jiri Simsa074bf362015-02-17 09:29:45 -0800886 return verror.ErrCanceled
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800887 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800888 return verror.ErrTimeout
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800889 }
890 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800891 return verror.ErrBadProtocol
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800892}
893
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700894func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700895 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700896 switch {
897 case fc.response.Error != nil:
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800898 // TODO(cnicolaou): this will become a verror.E when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800899 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800900 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700901 case fc.response.EndStreamResults:
902 return io.EOF
903 }
904
905 // Decode the response header and handle errors and EOF.
906 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800907 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800908 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700909 }
910 if fc.response.Error != nil {
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800911 // TODO(cnicolaou): this will become a verror.E when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800912 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800913 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700914 }
915 if fc.response.EndStreamResults {
916 // Return EOF to indicate to the caller that there are no more stream
917 // results. Any error sent by the server is kept in fc.response.Error, and
918 // returned to the user in Finish.
919 return io.EOF
920 }
921 // Decode the streaming result.
922 if err := fc.dec.Decode(itemptr); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800923 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800924 // TODO(cnicolaou): should we be caching this?
925 fc.response.Error = berr
926 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700927 }
928 return nil
929}
930
931func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700932 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700933 return fc.closeSend()
934}
935
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800936// closeSend ensures CloseSend always returns verror.E.
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800937func (fc *flowClient) closeSend() error {
Asim Shankar1707e432014-05-29 19:42:41 -0700938 fc.sendClosedMu.Lock()
939 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700940 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700941 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700942 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700943 if err := fc.enc.Encode(rpc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700944 // TODO(caprita): Indiscriminately closing the flow below causes
945 // a race as described in:
946 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
947 //
948 // There should be a finer grained way to fix this (for example,
949 // encoding errors should probably still result in closing the
950 // flow); on the flip side, there may exist other instances
951 // where we are closing the flow but should not.
952 //
953 // For now, commenting out the line below removes the flakiness
954 // from our existing unit tests, but this needs to be revisited
955 // and fixed correctly.
956 //
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700957 // return fc.close(verror.ErrBadProtocolf("rpc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700958 }
959 fc.sendClosed = true
960 return nil
961}
962
963func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700964 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700965 err := fc.finish(resultptrs...)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800966 vtrace.GetSpan(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700967 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700968}
969
Mike Burrows2ec2bb32015-02-26 15:14:43 -0800970// finish ensures Finish always returns a verror.E.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800971func (fc *flowClient) finish(resultptrs ...interface{}) error {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700972 if fc.finished {
Todd Wangff73e1f2015-02-10 21:45:52 -0800973 err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
Jiri Simsa074bf362015-02-17 09:29:45 -0800974 return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700975 }
976 fc.finished = true
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800977
Todd Wangce3033b2014-05-23 17:04:44 -0700978 // Call closeSend implicitly, if the user hasn't already called it. There are
979 // three cases:
980 // 1) Server is blocked on Recv waiting for the final request message.
981 // 2) Server has already finished processing, the final response message and
982 // out args are queued up on the client, and the flow is closed.
983 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
984 // response and args aren't queued up yet, and the flow isn't closed.
985 //
986 // We must call closeSend to handle case (1) and unblock the server; otherwise
987 // we'll deadlock with both client and server waiting for each other. We must
988 // ignore the error (if any) to handle case (2). In that case the flow is
989 // closed, meaning writes will fail and reads will succeed, and closeSend will
990 // always return an error. But this isn't a "real" error; the client should
991 // read the rest of the results and succeed.
992 _ = fc.closeSend()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700993 // Decode the response header, if it hasn't already been decoded by Recv.
994 if fc.response.Error == nil && !fc.response.EndStreamResults {
995 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800996 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800997 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700998 }
999 // The response header must indicate the streaming results have ended.
1000 if fc.response.Error == nil && !fc.response.EndStreamResults {
Jiri Simsa074bf362015-02-17 09:29:45 -08001001 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRemainingStreamResults, fc.ctx))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001002 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001003 }
1004 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001005 if fc.response.AckBlessings {
1006 clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
1007 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001008 // Incorporate any VTrace info that was returned.
Matt Rosencrantz2803fe92015-03-09 15:26:32 -07001009 vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001010 if fc.response.Error != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001011 // TODO(cnicolaou): remove verror.ErrNoAccess with verror version
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001012 // when rpc.Server is converted.
Todd Wang8fa38762015-03-25 14:04:59 -07001013 if verror.ErrorID(fc.response.Error) == verror.ErrNoAccess.ID && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001014 // In case the error was caused by a bad discharge, we do not want to get stuck
1015 // with retrying again and again with this discharge. As there is no direct way
1016 // to detect it, we conservatively flush all discharges we used from the cache.
1017 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Asim Shankar77befba2015-01-09 12:49:04 -08001018 vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Ankure49a86a2014-11-11 18:52:43 -08001019 fc.dc.Invalidate(fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001020 }
Jiri Simsa074bf362015-02-17 09:29:45 -08001021 return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001022 }
1023 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Jiri Simsa074bf362015-02-17 09:29:45 -08001024 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001025 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001026 }
1027 for ix, r := range resultptrs {
1028 if err := fc.dec.Decode(r); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -08001029 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08001030 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001031 }
1032 }
1033 return fc.close(nil)
1034}
1035
Asim Shankar2d731a92014-09-29 17:46:38 -07001036func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -07001037 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -07001038}
Ankur50a5f392015-02-27 18:46:30 -08001039
1040func bpatterns(patterns []string) []security.BlessingPattern {
1041 if patterns == nil {
1042 return nil
1043 }
1044 bpatterns := make([]security.BlessingPattern, len(patterns))
1045 for i, p := range patterns {
1046 bpatterns[i] = security.BlessingPattern(p)
1047 }
1048 return bpatterns
1049}