blob: 5e0815e1c5ac86175319d5171c8b47ba749b1e14 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -07006 "math"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07007 "math/rand"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08008 "net"
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -08009 "reflect"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -070010 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070011 "sync"
12 "time"
13
Suharsh Sivakumaraf862a52015-02-04 13:50:47 -080014 "v.io/core/veyron/runtimes/google/ipc/stream"
Jiri Simsa764efb72014-12-25 20:57:03 -080015 "v.io/core/veyron2/context"
16 "v.io/core/veyron2/i18n"
17 "v.io/core/veyron2/ipc"
Jiri Simsa764efb72014-12-25 20:57:03 -080018 "v.io/core/veyron2/naming"
19 "v.io/core/veyron2/options"
20 "v.io/core/veyron2/security"
Todd Wangb86b3522015-01-22 13:34:20 -080021 "v.io/core/veyron2/vdl"
Todd Wangff73e1f2015-02-10 21:45:52 -080022 "v.io/core/veyron2/verror"
Jiri Simsa764efb72014-12-25 20:57:03 -080023 "v.io/core/veyron2/vlog"
Todd Wang3425a902015-01-21 18:43:59 -080024 "v.io/core/veyron2/vom"
Jiri Simsa764efb72014-12-25 20:57:03 -080025 "v.io/core/veyron2/vtrace"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080026
Jiri Simsa764efb72014-12-25 20:57:03 -080027 "v.io/core/veyron/runtimes/google/ipc/stream/vc"
28 "v.io/core/veyron/runtimes/google/ipc/version"
29 inaming "v.io/core/veyron/runtimes/google/naming"
30 ivtrace "v.io/core/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070031)
32
Jiri Simsa764efb72014-12-25 20:57:03 -080033const pkgPath = "v.io/core/veyron/runtimes/google/ipc"
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080034
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080035// TODO(cnicolaou): for local errors, automatically assign a new 'id',
36// don't use pkgPath etc. Can then move them into being defined on each line
37// and not here.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038var (
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080039 // Local errs that are used to provide details to the public ones.
40 errClientCloseAlreadyCalled = verror.Register(pkgPath+".closeAlreadyCalled", verror.NoRetry,
41 "ipc.Client.Close has already been called")
42
43 errClientFinishAlreadyCalled = verror.Register(pkgPath+".finishAlreadyCalled", verror.NoRetry, "ipc.Call.Finish has already been called")
44
45 errNonRootedName = verror.Register(pkgPath+".nonRootedName", verror.NoRetry, "{3} does not appear to contain an address")
46
47 errInvalidEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an invalid endpoint")
48
49 errIncompatibleEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an incompatible endpoint")
50
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080051 errNotTrusted = verror.Register(pkgPath+".notTrusted", verror.NoRetry, "name {3} not trusted using blessings {4}{:5}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080052
53 errAuthError = verror.Register(pkgPath+".authError", verror.RetryRefetch, "authentication error from server {3}{:4}")
54
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080055 errSystemRetry = verror.Register(pkgPath+".sysErrorRetryConnection", verror.RetryConnection, "{:3:}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080056
Todd Wang34ed4c62014-11-26 15:15:52 -080057 errVomEncoder = verror.Register(pkgPath+".vomEncoder", verror.NoRetry, "failed to create vom encoder {:3}")
58 errVomDecoder = verror.Register(pkgPath+".vomDecoder", verror.NoRetry, "failed to create vom decoder {:3}")
59
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080060 errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
61
Suharsh Sivakumar720b7042014-12-22 17:33:23 -080062 errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
63
64 errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080065
66 errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
67
Benjamin Prosnitz0db77a22015-01-20 14:25:15 -080068 errMismatchedResults = verror.Register(pkgPath+".mismatchedResults", verror.NoRetry, "got {3} results, but want {4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080069
70 errResultDecoding = verror.Register(pkgPath+".resultDecoding", verror.NoRetry, "failed to decode result #{3}{:4}")
71
72 errResponseDecoding = verror.Register(pkgPath+".responseDecoding", verror.NoRetry, "failed to decode response{:3}")
73
74 errRemainingStreamResults = verror.Register(pkgPath+".remaingStreamResults", verror.NoRetry, "stream closed with remaining stream results")
75
76 errNoBlessings = verror.Register(pkgPath+".noBlessings", verror.NoRetry, "server has not presented any blessings")
77
78 errAuthNoPatternMatch = verror.Register(pkgPath+".authNoPatternMatch",
Ryan Brown41093a92015-02-10 10:59:14 -080079 verror.NoRetry, "server blessings {3} do not match pattern {4}{:5}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080080
Asim Shankar558ea012015-01-28 12:49:36 -080081 errAuthServerNotAllowed = verror.Register(pkgPath+".authServerNotAllowed",
82 verror.NoRetry, "set of allowed servers {3} not matched by server blessings {4}")
83
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -080084 errAuthServerKeyNotAllowed = verror.Register(pkgPath+".authServerKeyNotAllowed",
85 verror.NoRetry, "remote public key {3} not matched by server key {4}")
86
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080087 errDefaultAuthDenied = verror.Register(pkgPath+".defaultAuthDenied", verror.NoRetry, "default authorization precludes talking to server with blessings{:3}")
88
89 errBlessingGrant = verror.Register(pkgPath+".blessingGrantFailed", verror.NoRetry, "failed to grant blessing to server with blessings {3}{:4}")
90
91 errBlessingAdd = verror.Register(pkgPath+".blessingAddFailed", verror.NoRetry, "failed to add blessing granted to server {3}{:4}")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092)
93
Ryan Brown2726b402014-11-04 17:13:27 -080094// TODO(ribrdb): Flip this to true once everything is updated.
95const enableSecureServerAuth = false
96
Jiri Simsa5293dcb2014-05-10 09:56:38 -070097type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080098 streamMgr stream.Manager
99 ns naming.Namespace
100 vcOpts []stream.VCOpt // vc opts passed to dial
101 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700102
Jungho Ahn25545d32015-01-26 15:14:14 -0800103 // We cache the IP networks on the device since it is not that cheap to read
104 // network interfaces through os syscall.
105 // TODO(jhahn): Add monitoring the network interface changes.
106 ipNets []*net.IPNet
107
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700108 // We support concurrent calls to StartCall and Close, so we must protect the
109 // vcMap. Everything else is initialized upon client construction, and safe
110 // to use concurrently.
111 vcMapMu sync.Mutex
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800112 vcMap map[vcMapKey]*vcInfo
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700113
Ankure49a86a2014-11-11 18:52:43 -0800114 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115}
116
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700117var _ ipc.Client = (*client)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700118
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700119type vcInfo struct {
120 vc stream.VC
121 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700122}
123
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800124type vcMapKey struct {
125 endpoint string
126 encrypted bool
127}
128
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800129// PreferredProtocols instructs the Runtime implementation to select
130// endpoints with the specified protocols and to order them in the
131// specified order.
132type PreferredProtocols []string
133
134func (PreferredProtocols) IPCClientOpt() {}
135
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700136func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700137 c := &client{
Ankure49a86a2014-11-11 18:52:43 -0800138 streamMgr: streamMgr,
139 ns: ns,
Jungho Ahn25545d32015-01-26 15:14:14 -0800140 ipNets: ipNetworks(),
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800141 vcMap: make(map[vcMapKey]*vcInfo),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700142 }
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800143 c.dc = InternalNewDischargeClient(nil, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700144 for _, opt := range opts {
145 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800146 switch v := opt.(type) {
147 case stream.VCOpt:
148 c.vcOpts = append(c.vcOpts, v)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800149 case PreferredProtocols:
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800150 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700151 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700152 }
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800153 c.vcOpts = append(c.vcOpts, c.dc)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800154
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700155 return c, nil
156}
157
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800158func vcEncrypted(vcOpts []stream.VCOpt) bool {
159 encrypted := true
160 for _, o := range vcOpts {
161 switch o {
162 case options.VCSecurityNone:
163 encrypted = false
164 case options.VCSecurityConfidential:
165 encrypted = true
166 }
167 }
168 return encrypted
169}
170
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800171func (c *client) createFlow(ctx *context.T, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700172 c.vcMapMu.Lock()
173 defer c.vcMapMu.Unlock()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800174 if c.vcMap == nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800175 return nil, verror.New(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800176 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800177 vcKey := vcMapKey{ep.String(), vcEncrypted(vcOpts)}
178 if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700179 if flow, err := vcinfo.vc.Connect(); err == nil {
180 return flow, nil
181 }
182 // If the vc fails to establish a new flow, we assume it's
183 // broken, remove it from the map, and proceed to establishing
184 // a new vc.
185 // TODO(caprita): Should we distinguish errors due to vc being
186 // closed from other errors? If not, should we call vc.Close()
187 // before removing the vc from the map?
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800188 delete(c.vcMap, vcKey)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700189 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800190 sm := c.streamMgr
Robin Thellendee439642014-10-20 14:39:17 -0700191 c.vcMapMu.Unlock()
Asim Shankar70494752015-01-23 16:10:23 -0800192 // Include the context when Dial-ing. This is currently done via an
193 // option, and for thread-safety reasons - cannot append directly to
194 // vcOpts.
195 // TODO(ashankar,mattr): Revisit the API in ipc/stream and explicitly
196 // provide a context to Dial and other relevant operations.
197 cpy := make([]stream.VCOpt, len(vcOpts)+1)
198 cpy[copy(cpy, vcOpts)] = vc.DialContext{ctx}
199 vcOpts = cpy
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800200 vc, err := sm.Dial(ep, vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -0700201 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700202 if err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800203 if strings.Contains(err.Error(), "authentication failed") {
Todd Wangff73e1f2015-02-10 21:45:52 -0800204 return nil, verror.New(errAuthError, ctx, ep, err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800205 } else {
Todd Wangff73e1f2015-02-10 21:45:52 -0800206 return nil, verror.New(errSystemRetry, ctx, err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800207 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700208 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800209 if c.vcMap == nil {
210 sm.ShutdownEndpoint(ep)
Todd Wangff73e1f2015-02-10 21:45:52 -0800211 return nil, verror.New(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800212 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800213 if othervc, exists := c.vcMap[vcKey]; exists {
Robin Thellendee439642014-10-20 14:39:17 -0700214 vc = othervc.vc
215 // TODO(ashankar,toddw): Figure out how to close up the VC that
216 // is discarded. vc.Close?
217 } else {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800218 c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
Robin Thellendee439642014-10-20 14:39:17 -0700219 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800220 flow, err := vc.Connect()
221 if err != nil {
222
Todd Wangff73e1f2015-02-10 21:45:52 -0800223 return nil, verror.New(errAuthError, ctx, ep, err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800224 }
225 return flow, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700226}
227
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700228// A randomized exponential backoff. The randomness deters error convoys from forming.
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800229// TODO(cnicolaou): rationalize this and the backoff in ipc.Server. Note
230// that rand is not thread safe and may crash.
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700231func backoff(n int, deadline time.Time) bool {
232 b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
233 if b > maxBackoff {
234 b = maxBackoff
235 }
236 r := deadline.Sub(time.Now())
237 if b > r {
238 // We need to leave a little time for the call to start or
239 // we'll just timeout in startCall before we actually do
240 // anything. If we just have a millisecond left, give up.
241 if r <= time.Millisecond {
242 return false
243 }
244 b = r - time.Millisecond
245 }
246 time.Sleep(b)
247 return true
248}
249
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800250func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
251 defer vlog.LogCall()()
252 return c.startCall(ctx, name, method, args, opts)
253}
254
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700255func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
256 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700257 if r, ok := o.(options.RetryTimeout); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700258 return time.Duration(r), true
259 }
260 }
261 return 0, false
262}
263
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700264func getNoResolveOpt(opts []ipc.CallOpt) bool {
265 for _, o := range opts {
Suharsh Sivakumarb59a96d2015-01-09 16:39:54 -0800266 if _, ok := o.(options.NoResolve); ok {
267 return true
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700268 }
269 }
270 return false
271}
272
Suharsh Sivakumar11316872014-11-25 15:57:00 -0800273func shouldNotFetchDischarges(opts []ipc.CallOpt) bool {
274 for _, o := range opts {
275 if _, ok := o.(vc.NoDischarges); ok {
276 return true
277 }
278 }
279 return false
280}
281
Suharsh Sivakumar90da4c22015-02-12 15:45:56 -0800282func getNoRetryOpt(opts []ipc.CallOpt) bool {
283 for _, o := range opts {
284 if _, ok := o.(options.NoRetry); ok {
285 return true
286 }
287 }
288 return false
289}
290
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800291func getVCOpts(opts []ipc.CallOpt) (vcOpts []stream.VCOpt) {
292 for _, o := range opts {
293 if v, ok := o.(stream.VCOpt); ok {
294 vcOpts = append(vcOpts, v)
295 }
296 }
297 return
298}
299
300func getResolveOpts(opts []ipc.CallOpt) (resolveOpts []naming.ResolveOpt) {
301 for _, o := range opts {
302 if r, ok := o.(naming.ResolveOpt); ok {
303 resolveOpts = append(resolveOpts, r)
304 }
305 }
306 return
307}
308
Ankure49a86a2014-11-11 18:52:43 -0800309func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) security.DischargeImpetus {
310 var impetus security.DischargeImpetus
311 if len(serverBlessings) > 0 {
312 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
313 for i, b := range serverBlessings {
314 impetus.Server[i] = security.BlessingPattern(b)
315 }
316 }
317 impetus.Method = method
318 if len(args) > 0 {
Todd Wangb86b3522015-01-22 13:34:20 -0800319 impetus.Arguments = make([]vdl.AnyRep, len(args))
Ankure49a86a2014-11-11 18:52:43 -0800320 for i, a := range args {
Todd Wangb86b3522015-01-22 13:34:20 -0800321 impetus.Arguments[i] = vdl.AnyRep(a)
Ankure49a86a2014-11-11 18:52:43 -0800322 }
323 }
324 return impetus
325}
326
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800327// startCall ensures StartCall always returns verror.Standard.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800328func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, error) {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800329 if !ctx.Initialized() {
Jiri Simsa074bf362015-02-17 09:29:45 -0800330 return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.NoLangID, "ipc.Client", "StartCall")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700331 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800332
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800333 ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800334 ctx = verror.ContextWithComponentName(ctx, "ipc.Client")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700335
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700336 // Context specified deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700337 deadline, hasDeadline := ctx.Deadline()
338 if !hasDeadline {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700339 // Default deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700340 deadline = time.Now().Add(defaultCallTimeout)
341 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700342 if r, ok := getRetryTimeoutOpt(opts); ok {
343 // Caller specified deadline.
344 deadline = time.Now().Add(time.Duration(r))
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700345 }
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800346
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800347 var lastErr error
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700348 for retries := 0; ; retries++ {
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700349 if retries != 0 {
350 if !backoff(retries, deadline) {
351 break
352 }
353 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800354 call, action, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700355 if err == nil {
356 return call, nil
357 }
358 lastErr = err
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800359 shouldRetry := true
360 switch {
Suharsh Sivakumar90da4c22015-02-12 15:45:56 -0800361 case getNoRetryOpt(opts):
362 shouldRetry = false
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800363 case action != verror.RetryConnection && action != verror.RetryRefetch:
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800364 shouldRetry = false
365 case time.Now().After(deadline):
366 shouldRetry = false
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800367 case action == verror.RetryRefetch && getNoResolveOpt(opts):
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800368 // If we're skipping resolution and there are no servers for
369 // this call retrying is not going to help, we can't come up
370 // with new servers if there is no resolution.
371 shouldRetry = false
372 }
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800373 if !shouldRetry {
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800374 span.Annotatef("Cannot retry after error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700375 break
376 }
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800377 span.Annotatef("Retrying due to error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700378 }
379 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700380}
381
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800382type serverStatus struct {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800383 index int
384 suffix string
385 flow stream.Flow
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800386 err error
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800387}
388
Asim Shankaraae31802015-01-22 11:59:42 -0800389// tryCreateFlow attempts to establish a Flow to "server" (which must be a
390// rooted name), over which a method invocation request could be sent.
Cosmos Nicolaou00a0f802014-11-16 22:44:55 -0800391// TODO(cnicolaou): implement real, configurable load balancing.
Asim Shankaraae31802015-01-22 11:59:42 -0800392func (c *client) tryCreateFlow(ctx *context.T, index int, server string, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800393 status := &serverStatus{index: index}
Asim Shankarf4864f42014-11-25 18:53:05 -0800394 var span vtrace.Span
Asim Shankaraae31802015-01-22 11:59:42 -0800395 ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow")
Asim Shankarf4864f42014-11-25 18:53:05 -0800396 span.Annotatef("address:%v", server)
Asim Shankaraae31802015-01-22 11:59:42 -0800397 defer func() {
398 ch <- status
399 span.Finish()
400 }()
401 address, suffix := naming.SplitAddressName(server)
402 if len(address) == 0 {
Todd Wangff73e1f2015-02-10 21:45:52 -0800403 status.err = verror.New(errNonRootedName, ctx, server)
Asim Shankaraae31802015-01-22 11:59:42 -0800404 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800405 }
Asim Shankaraae31802015-01-22 11:59:42 -0800406 ep, err := inaming.NewEndpoint(address)
407 if err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800408 status.err = verror.New(errInvalidEndpoint, ctx, address)
Asim Shankaraae31802015-01-22 11:59:42 -0800409 return
410 }
411 if err = version.CheckCompatibility(ep); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800412 status.err = verror.New(errIncompatibleEndpoint, ctx, ep)
Asim Shankaraae31802015-01-22 11:59:42 -0800413 return
414 }
415 if status.flow, status.err = c.createFlow(ctx, ep, vcOpts); status.err != nil {
416 vlog.VI(2).Infof("ipc: connect to %v: %v", server, status.err)
417 return
418 }
419 status.suffix = suffix
420 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800421}
422
Asim Shankaraae31802015-01-22 11:59:42 -0800423// tryCall makes a single attempt at a call. It may connect to multiple servers
424// (all that serve "name"), but will invoke the method on at most one of them
425// (the server running on the most preferred protcol and network amongst all
426// the servers that were successfully connected to and authorized).
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800427func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.ActionCode, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800428 var resolved *naming.MountEntry
Asim Shankaraae31802015-01-22 11:59:42 -0800429 var err error
430 if resolved, err = c.ns.Resolve(ctx, name, getResolveOpts(opts)...); err != nil {
David Why Use Two When One Will Do Presotto8de85852015-01-21 11:05:09 -0800431 vlog.Errorf("Resolve: %v", err)
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800432 // We always return NoServers as the error so that the caller knows
433 // that's ok to retry the operation since the name may be registered
434 // in the near future.
Ryan Brown6153c6c2014-12-11 13:10:09 -0800435 if verror.Is(err, naming.ErrNoSuchName.ID) {
Jiri Simsa074bf362015-02-17 09:29:45 -0800436 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name)
Ryan Brown6153c6c2014-12-11 13:10:09 -0800437 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800438 return nil, verror.NoRetry, verror.New(verror.ErrNoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700439 } else {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800440 if len(resolved.Servers) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800441 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name)
Suharsh Sivakumar65e44c22014-12-10 17:15:19 -0800442 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800443 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800444 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800445 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700446 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700447 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800448
Asim Shankarb547ea92015-02-17 18:49:45 -0800449 // servers is now ordered by the priority heurestic implemented in
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800450 // filterAndOrderServers.
Asim Shankaraae31802015-01-22 11:59:42 -0800451 //
452 // Try to connect to all servers in parallel. Provide sufficient
453 // buffering for all of the connections to finish instantaneously. This
454 // is important because we want to process the responses in priority
455 // order; that order is indicated by the order of entries in servers.
456 // So, if two respones come in at the same 'instant', we prefer the
457 // first in the resolved.Servers)
458 attempts := len(resolved.Servers)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800459 responses := make([]*serverStatus, attempts)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800460 ch := make(chan *serverStatus, attempts)
Asim Shankaraae31802015-01-22 11:59:42 -0800461 vcOpts := append(getVCOpts(opts), c.vcOpts...)
Asim Shankaraae31802015-01-22 11:59:42 -0800462 for i, server := range resolved.Names() {
463 go c.tryCreateFlow(ctx, i, server, ch, vcOpts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700464 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800465
466 delay := time.Duration(ipc.NoTimeout)
Todd Wangef05c062014-11-15 09:51:43 -0800467 if dl, ok := ctx.Deadline(); ok {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800468 delay = dl.Sub(time.Now())
469 }
470 timeoutChan := time.After(delay)
471
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800472 for {
Todd Wangef05c062014-11-15 09:51:43 -0800473 // Block for at least one new response from the server, or the timeout.
474 select {
475 case r := <-ch:
476 responses[r.index] = r
477 // Read as many more responses as we can without blocking.
478 LoopNonBlocking:
479 for {
480 select {
481 default:
482 break LoopNonBlocking
483 case r := <-ch:
484 responses[r.index] = r
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800485 }
Todd Wangef05c062014-11-15 09:51:43 -0800486 }
487 case <-timeoutChan:
488 vlog.VI(2).Infof("ipc: timeout on connection to server %v ", name)
Asim Shankaraae31802015-01-22 11:59:42 -0800489 _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
Jiri Simsa074bf362015-02-17 09:29:45 -0800490 if !verror.Is(err, verror.ErrTimeout.ID) {
491 return nil, verror.NoRetry, verror.New(verror.ErrTimeout, ctx, err)
Cosmos Nicolaou38209d42014-12-09 16:50:38 -0800492 }
493 return nil, verror.NoRetry, err
Todd Wangef05c062014-11-15 09:51:43 -0800494 }
495
496 // Process new responses, in priority order.
497 numResponses := 0
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800498 noDischarges := shouldNotFetchDischarges(opts)
Todd Wangef05c062014-11-15 09:51:43 -0800499 for _, r := range responses {
500 if r != nil {
501 numResponses++
502 }
503 if r == nil || r.flow == nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800504 continue
505 }
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800506
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800507 doneChan := ctx.Done()
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800508 r.flow.SetDeadline(doneChan)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800509
510 var (
511 serverB []string
512 grantedB security.Blessings
513 )
514
515 // LocalPrincipal is nil means that the client wanted to avoid
516 // authentication, and thus wanted to skip authorization as well.
Todd Wangef05c062014-11-15 09:51:43 -0800517 if r.flow.LocalPrincipal() != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800518 // Validate caveats on the server's identity for the context associated with this call.
519 var err error
Asim Shankarb547ea92015-02-17 18:49:45 -0800520 patterns := resolved.Servers[r.index].BlessingPatterns
521 if serverB, grantedB, err = c.authorizeServer(ctx, r.flow, name, method, patterns, opts); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800522 r.err = verror.New(errNotTrusted, ctx, name, r.flow.RemoteBlessings(), err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800523 vlog.VI(2).Infof("ipc: err: %s", r.err)
Todd Wangef05c062014-11-15 09:51:43 -0800524 r.flow.Close()
525 r.flow = nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800526 continue
527 }
528 }
529
Todd Wangef05c062014-11-15 09:51:43 -0800530 // This is the 'point of no return'; once the RPC is started (fc.start
531 // below) we can't be sure if it makes it to the server or not so, this
532 // code will never call fc.start more than once to ensure that we provide
533 // 'at-most-once' rpc semantics at this level. Retrying the network
534 // connections (i.e. creating flows) is fine since we can cleanup that
535 // state if we abort a call (i.e. close the flow).
536 //
537 // We must ensure that all flows other than r.flow are closed.
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800538 //
539 // TODO(cnicolaou): all errors below are marked as NoRetry
540 // because we want to provide at-most-once rpc semantics so
541 // we only ever attempt an RPC once. In the future, we'll cache
542 // responses on the server and then we can retry in-process
543 // RPCs.
Todd Wangef05c062014-11-15 09:51:43 -0800544 go cleanupTryCall(r, responses, ch)
Todd Wang34ed4c62014-11-26 15:15:52 -0800545 fc, err := newFlowClient(ctx, serverB, r.flow, c.dc)
546 if err != nil {
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800547 return nil, verror.NoRetry, err.(error)
Todd Wang34ed4c62014-11-26 15:15:52 -0800548 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800549
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800550 if doneChan != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800551 go func() {
552 select {
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800553 case <-doneChan:
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800554 vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
Matt Rosencrantz9346b412014-12-18 15:59:19 -0800555 fc.flow.Cancel()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800556 case <-fc.flow.Closed():
557 }
558 }()
559 }
560
561 timeout := time.Duration(ipc.NoTimeout)
562 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
563 timeout = deadline.Sub(time.Now())
564 }
Suharsh Sivakumar11316872014-11-25 15:57:00 -0800565 if noDischarges {
566 fc.dc = nil
567 }
Todd Wangef05c062014-11-15 09:51:43 -0800568 if verr := fc.start(r.suffix, method, args, timeout, grantedB); verr != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800569 return nil, verror.NoRetry, verr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800570 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800571 return fc, verror.NoRetry, nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800572 }
Todd Wangef05c062014-11-15 09:51:43 -0800573 if numResponses == len(responses) {
Asim Shankaraae31802015-01-22 11:59:42 -0800574 return c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800575 }
576 }
Todd Wangef05c062014-11-15 09:51:43 -0800577}
578
Asim Shankaraae31802015-01-22 11:59:42 -0800579// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
Todd Wangef05c062014-11-15 09:51:43 -0800580// goroutines, and have closed the flow from each one except skip. This is a
581// blocking function; it should be called in its own goroutine.
582func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
583 numPending := 0
584 for _, r := range responses {
585 switch {
586 case r == nil:
587 // The response hasn't arrived yet.
588 numPending++
589 case r == skip || r.flow == nil:
590 // Either we should skip this flow, or we've closed the flow for this
591 // response already; nothing more to do.
592 default:
593 // We received the response, but haven't closed the flow yet.
594 r.flow.Close()
595 }
596 }
597 // Now we just need to wait for the pending responses and close their flows.
598 for i := 0; i < numPending; i++ {
599 if r := <-ch; r.flow != nil {
600 r.flow.Close()
601 }
602 }
603}
604
605// failedTryCall performs asynchronous cleanup for tryCall, and returns an
606// appropriate error from the responses we've already received. All parallel
607// calls in tryCall failed or we timed out if we get here.
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800608func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (ipc.Call, verror.ActionCode, error) {
Todd Wangef05c062014-11-15 09:51:43 -0800609 go cleanupTryCall(nil, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800610 c.ns.FlushCacheEntry(name)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800611 noconn, untrusted := []string{}, []string{}
Asim Shankaraae31802015-01-22 11:59:42 -0800612 for _, r := range responses {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800613 if r != nil && r.err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800614 switch {
615 case verror.Is(r.err, errNotTrusted.ID) || verror.Is(r.err, errAuthError.ID):
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800616 untrusted = append(untrusted, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800617 default:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800618 noconn = append(noconn, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800619 }
Todd Wangef05c062014-11-15 09:51:43 -0800620 }
621 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800622 // TODO(cnicolaou): we get system errors for things like dialing using
623 // the 'ws' protocol which can never succeed even if we retry the connection,
624 // hence we return RetryRefetch in all cases below. In the future, we'll
625 // pick out this error and then we can retry the connection also. This also
626 // plays into the 'at-most-once' rpc semantics change that's needed in order
627 // to retry an in-flight RPC.
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800628 switch {
629 case len(untrusted) > 0 && len(noconn) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800630 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServersAndAuth, ctx, append(noconn, untrusted...))
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800631 case len(noconn) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800632 return nil, verror.RetryRefetch, verror.New(verror.ErrNoServers, ctx, noconn)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800633 case len(untrusted) > 0:
Jiri Simsa074bf362015-02-17 09:29:45 -0800634 return nil, verror.NoRetry, verror.New(verror.ErrNotTrusted, ctx, untrusted)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800635 default:
Jiri Simsa074bf362015-02-17 09:29:45 -0800636 return nil, verror.RetryRefetch, verror.New(verror.ErrTimeout, ctx)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800637 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700638}
639
Asim Shankar8f05c222014-10-06 22:08:19 -0700640// authorizeServer validates that the server (remote end of flow) has the credentials to serve
641// the RPC name.method for the client (local end of the flow). It returns the blessings at the
642// server that are authorized for this purpose and any blessings that are to be granted to
643// the server (via ipc.Granter implementations in opts.)
Asim Shankarb547ea92015-02-17 18:49:45 -0800644func (c *client) authorizeServer(ctx *context.T, flow stream.Flow, name, method string, serverPatterns []string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) {
Asim Shankar220a0152014-10-30 21:21:09 -0700645 if flow.RemoteBlessings() == nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800646 return nil, nil, verror.New(errNoBlessings, ctx)
Asim Shankarb54d7642014-06-05 13:08:04 -0700647 }
Ankure49a86a2014-11-11 18:52:43 -0800648 ctxt := security.NewContext(&security.ContextParams{
649 LocalPrincipal: flow.LocalPrincipal(),
650 LocalBlessings: flow.LocalBlessings(),
651 RemoteBlessings: flow.RemoteBlessings(),
652 LocalEndpoint: flow.LocalEndpoint(),
653 RemoteEndpoint: flow.RemoteEndpoint(),
654 RemoteDischarges: flow.RemoteDischarges(),
655 Method: method,
Todd Wang9a7f5162014-11-13 13:24:33 -0800656 Suffix: name})
Asim Shankarb547ea92015-02-17 18:49:45 -0800657 var rejectedBlessings []security.RejectedBlessing
658 serverBlessings, rejectedBlessings = flow.RemoteBlessings().ForContext(ctxt)
659 var ignorePatterns bool
Asim Shankarb54d7642014-06-05 13:08:04 -0700660 for _, o := range opts {
661 switch v := o.(type) {
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -0800662 case options.ServerPublicKey:
Suharsh Sivakumara8633b02015-02-14 17:08:07 -0800663 if remoteKey, key := flow.RemoteBlessings().PublicKey(), v.PublicKey; !reflect.DeepEqual(remoteKey, key) {
664 return nil, nil, verror.New(errAuthServerKeyNotAllowed, ctx, remoteKey, key)
Suharsh Sivakumar67ef84a2015-02-13 13:04:44 -0800665 }
Asim Shankar558ea012015-01-28 12:49:36 -0800666 case options.AllowedServersPolicy:
667 allowed := false
668 for _, p := range v {
669 if p.MatchedBy(serverBlessings...) {
670 allowed = true
671 break
672 }
673 }
674 if !allowed {
Todd Wangff73e1f2015-02-10 21:45:52 -0800675 return nil, nil, verror.New(errAuthServerNotAllowed, ctx, v, serverBlessings)
Asim Shankar558ea012015-01-28 12:49:36 -0800676 }
Asim Shankarb547ea92015-02-17 18:49:45 -0800677 case options.SkipResolveAuthorization:
678 ignorePatterns = true
Asim Shankarb54d7642014-06-05 13:08:04 -0700679 case ipc.Granter:
Asim Shankar8f05c222014-10-06 22:08:19 -0700680 if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800681 return nil, nil, verror.New(errBlessingGrant, ctx, serverBlessings, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700682 } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800683 return nil, nil, verror.New(errBlessingAdd, ctx, serverBlessings, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700684 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700685 }
686 }
Asim Shankarb547ea92015-02-17 18:49:45 -0800687 if len(serverPatterns) > 0 && !ignorePatterns {
688 matched := false
689 for _, p := range serverPatterns {
690 if security.BlessingPattern(p).MatchedBy(serverBlessings...) {
691 matched = true
692 break
693 }
694 }
695 if !matched {
696 return nil, nil, verror.New(errAuthNoPatternMatch, ctx, serverBlessings, serverPatterns, rejectedBlessings)
697 }
698 } else if enableSecureServerAuth && !ignorePatterns {
699 if err := (defaultAuthorizer{}).Authorize(ctxt); err != nil {
700 return nil, nil, verror.New(errDefaultAuthDenied, ctx, serverBlessings)
701 }
702 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700703 return serverBlessings, grantedBlessings, nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700704}
705
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700706func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700707 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700708 c.vcMapMu.Lock()
709 for _, v := range c.vcMap {
710 c.streamMgr.ShutdownEndpoint(v.remoteEP)
711 }
712 c.vcMap = nil
713 c.vcMapMu.Unlock()
714}
715
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700716// flowClient implements the RPC client-side protocol for a single RPC, over a
717// flow that's already connected to the server.
718type flowClient struct {
Todd Wang3425a902015-01-21 18:43:59 -0800719 ctx *context.T // context to annotate with call details
720 dec *vom.Decoder // to decode responses and results from the server
721 enc *vom.Encoder // to encode requests and args to the server
722 server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
723 flow stream.Flow // the underlying flow
724 response ipc.Response // each decoded response message is kept here
Asim Shankar1707e432014-05-29 19:42:41 -0700725
Ankure49a86a2014-11-11 18:52:43 -0800726 discharges []security.Discharge // discharges used for this request
727 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700728
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800729 blessings security.Blessings // the local blessings for the current RPC.
730
Asim Shankar1707e432014-05-29 19:42:41 -0700731 sendClosedMu sync.Mutex
732 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800733 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700734}
735
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700736var _ ipc.Call = (*flowClient)(nil)
737var _ ipc.Stream = (*flowClient)(nil)
738
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800739func newFlowClient(ctx *context.T, server []string, flow stream.Flow, dc vc.DischargeClient) (*flowClient, error) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800740 fc := &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800741 ctx: ctx,
Ankure49a86a2014-11-11 18:52:43 -0800742 server: server,
743 flow: flow,
744 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700745 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800746 var err error
Todd Wang3425a902015-01-21 18:43:59 -0800747 if fc.enc, err = vom.NewBinaryEncoder(flow); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800748 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomEncoder, fc.ctx, err))
Todd Wangf519f8f2015-01-21 10:07:41 -0800749 return nil, fc.close(berr)
750 }
Todd Wang3425a902015-01-21 18:43:59 -0800751 if fc.dec, err = vom.NewDecoder(flow); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800752 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errVomDecoder, fc.ctx, err))
Todd Wangf519f8f2015-01-21 10:07:41 -0800753 return nil, fc.close(berr)
Todd Wang34ed4c62014-11-26 15:15:52 -0800754 }
755 return fc, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700756}
757
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800758func (fc *flowClient) close(err error) error {
759 if cerr := fc.flow.Close(); cerr != nil && err == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800760 return verror.New(verror.ErrInternal, fc.ctx, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700761 }
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800762 switch {
Jiri Simsa074bf362015-02-17 09:29:45 -0800763 case verror.Is(err, verror.ErrBadProtocol.ID):
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800764 switch fc.ctx.Err() {
765 case context.DeadlineExceeded:
766 // TODO(cnicolaou,m3b): reintroduce 'append' when the new verror API is done.
Jiri Simsa074bf362015-02-17 09:29:45 -0800767 //return verror.Append(verror.New(verror.ErrTimeout, fc.ctx), verr)
768 return verror.New(verror.ErrTimeout, fc.ctx, err.Error())
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800769 case context.Canceled:
770 // TODO(cnicolaou,m3b): reintroduce 'append' when the new verror API is done.
Jiri Simsa074bf362015-02-17 09:29:45 -0800771 //return verror.Append(verror.New(verror.ErrCanceled, fc.ctx), verr)
772 return verror.New(verror.ErrCanceled, fc.ctx, err.Error())
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800773 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800774 case verror.Is(err, verror.ErrTimeout.ID):
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800775 // Canceled trumps timeout.
776 if fc.ctx.Err() == context.Canceled {
777 // TODO(cnicolaou,m3b): reintroduce 'append' when the new verror API is done.
Jiri Simsa074bf362015-02-17 09:29:45 -0800778 return verror.New(verror.ErrCanceled, fc.ctx, err.Error())
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800779 }
780 }
781 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700782}
783
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800784func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) error {
Ankure49a86a2014-11-11 18:52:43 -0800785 // Fetch any discharges for third-party caveats on the client's blessings
786 // if this client owns a discharge-client.
787 if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
Asim Shankarf4864f42014-11-25 18:53:05 -0800788 fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
Ankure49a86a2014-11-11 18:52:43 -0800789 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800790 // Encode the Blessings information for the client to authorize the flow.
791 var blessingsRequest ipc.BlessingsRequest
792 if fc.flow.LocalPrincipal() != nil {
Jungho Ahn44d8daf2015-01-16 10:39:15 -0800793 fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...)
794 blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800795 }
Asim Shankaref951492015-02-11 11:41:03 -0800796 discharges := make([]security.WireDischarge, len(fc.discharges))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800797 for i, d := range fc.discharges {
Asim Shankaref951492015-02-11 11:41:03 -0800798 discharges[i] = security.MarshalDischarge(d)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800799 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700800 req := ipc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700801 Suffix: suffix,
802 Method: method,
803 NumPosArgs: uint64(len(args)),
804 Timeout: int64(timeout),
805 GrantedBlessings: security.MarshalBlessings(blessings),
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800806 Blessings: blessingsRequest,
Asim Shankaref951492015-02-11 11:41:03 -0800807 Discharges: discharges,
Asim Shankarf4864f42014-11-25 18:53:05 -0800808 TraceRequest: ivtrace.Request(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700809 }
810 if err := fc.enc.Encode(req); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800811 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800812 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700813 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800814
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700815 for ix, arg := range args {
816 if err := fc.enc.Encode(arg); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800817 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, ix, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800818 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700819 }
820 }
821 return nil
822}
823
824func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700825 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700826 if fc.sendClosed {
Jiri Simsa074bf362015-02-17 09:29:45 -0800827 return verror.New(verror.ErrAborted, fc.ctx)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700828 }
829
830 // The empty request header indicates what follows is a streaming arg.
831 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800832 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, ipc.Request{}, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800833 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700834 }
835 if err := fc.enc.Encode(item); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800836 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errArgEncoding, fc.ctx, -1, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800837 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838 }
839 return nil
840}
841
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800842func decodeNetError(ctx *context.T, err error) verror.IDAction {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800843 if neterr, ok := err.(net.Error); ok {
844 if neterr.Timeout() || neterr.Temporary() {
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800845 // If a read is canceled in the lower levels we see
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800846 // a timeout error - see readLocked in vc/reader.go
847 if ctx.Err() == context.Canceled {
Jiri Simsa074bf362015-02-17 09:29:45 -0800848 return verror.ErrCanceled
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800849 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800850 return verror.ErrTimeout
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800851 }
852 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800853 return verror.ErrBadProtocol
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800854}
855
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700856func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700857 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700858 switch {
859 case fc.response.Error != nil:
Todd Wang1f98c412015-02-11 14:17:35 -0800860 // TODO(cnicolaou): this will become a verror.Standard when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800861 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800862 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700863 case fc.response.EndStreamResults:
864 return io.EOF
865 }
866
867 // Decode the response header and handle errors and EOF.
868 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800869 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800870 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700871 }
872 if fc.response.Error != nil {
Todd Wang1f98c412015-02-11 14:17:35 -0800873 // TODO(cnicolaou): this will become a verror.Standard when we convert the
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800874 // server.
Jiri Simsa074bf362015-02-17 09:29:45 -0800875 return verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700876 }
877 if fc.response.EndStreamResults {
878 // Return EOF to indicate to the caller that there are no more stream
879 // results. Any error sent by the server is kept in fc.response.Error, and
880 // returned to the user in Finish.
881 return io.EOF
882 }
883 // Decode the streaming result.
884 if err := fc.dec.Decode(itemptr); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800885 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800886 // TODO(cnicolaou): should we be caching this?
887 fc.response.Error = berr
888 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700889 }
890 return nil
891}
892
893func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700894 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700895 return fc.closeSend()
896}
897
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800898// closeSend ensures CloseSend always returns verror.Standard.
899func (fc *flowClient) closeSend() error {
Asim Shankar1707e432014-05-29 19:42:41 -0700900 fc.sendClosedMu.Lock()
901 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700902 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700903 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700904 }
905 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700906 // TODO(caprita): Indiscriminately closing the flow below causes
907 // a race as described in:
908 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
909 //
910 // There should be a finer grained way to fix this (for example,
911 // encoding errors should probably still result in closing the
912 // flow); on the flip side, there may exist other instances
913 // where we are closing the flow but should not.
914 //
915 // For now, commenting out the line below removes the flakiness
916 // from our existing unit tests, but this needs to be revisited
917 // and fixed correctly.
918 //
Jiri Simsa074bf362015-02-17 09:29:45 -0800919 // return fc.close(verror.ErrBadProtocolf("ipc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700920 }
921 fc.sendClosed = true
922 return nil
923}
924
925func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700926 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700927 err := fc.finish(resultptrs...)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800928 vtrace.GetSpan(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700929 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700930}
931
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800932// finish ensures Finish always returns a verror.Standard.
Cosmos Nicolaoud1ca6862015-01-30 11:43:39 -0800933func (fc *flowClient) finish(resultptrs ...interface{}) error {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700934 if fc.finished {
Todd Wangff73e1f2015-02-10 21:45:52 -0800935 err := verror.New(errClientFinishAlreadyCalled, fc.ctx)
Jiri Simsa074bf362015-02-17 09:29:45 -0800936 return fc.close(verror.New(verror.ErrBadState, fc.ctx, err))
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700937 }
938 fc.finished = true
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800939
Todd Wangce3033b2014-05-23 17:04:44 -0700940 // Call closeSend implicitly, if the user hasn't already called it. There are
941 // three cases:
942 // 1) Server is blocked on Recv waiting for the final request message.
943 // 2) Server has already finished processing, the final response message and
944 // out args are queued up on the client, and the flow is closed.
945 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
946 // response and args aren't queued up yet, and the flow isn't closed.
947 //
948 // We must call closeSend to handle case (1) and unblock the server; otherwise
949 // we'll deadlock with both client and server waiting for each other. We must
950 // ignore the error (if any) to handle case (2). In that case the flow is
951 // closed, meaning writes will fail and reads will succeed, and closeSend will
952 // always return an error. But this isn't a "real" error; the client should
953 // read the rest of the results and succeed.
954 _ = fc.closeSend()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700955 // Decode the response header, if it hasn't already been decoded by Recv.
956 if fc.response.Error == nil && !fc.response.EndStreamResults {
957 if err := fc.dec.Decode(&fc.response); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800958 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResponseDecoding, fc.ctx, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800959 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700960 }
961 // The response header must indicate the streaming results have ended.
962 if fc.response.Error == nil && !fc.response.EndStreamResults {
Jiri Simsa074bf362015-02-17 09:29:45 -0800963 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRemainingStreamResults, fc.ctx))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800964 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700965 }
966 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800967 if fc.response.AckBlessings {
968 clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
969 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700970 // Incorporate any VTrace info that was returned.
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800971 ivtrace.Merge(fc.ctx, fc.response.TraceResponse)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700972 if fc.response.Error != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800973 // TODO(cnicolaou): remove verror.ErrNoAccess with verror version
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800974 // when ipc.Server is converted.
Jiri Simsa074bf362015-02-17 09:29:45 -0800975 if verror.Is(fc.response.Error, verror.ErrNoAccess.ID) && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700976 // In case the error was caused by a bad discharge, we do not want to get stuck
977 // with retrying again and again with this discharge. As there is no direct way
978 // to detect it, we conservatively flush all discharges we used from the cache.
979 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Asim Shankar77befba2015-01-09 12:49:04 -0800980 vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Ankure49a86a2014-11-11 18:52:43 -0800981 fc.dc.Invalidate(fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700982 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800983 return fc.close(verror.Convert(verror.ErrInternal, fc.ctx, fc.response.Error))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700984 }
985 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Jiri Simsa074bf362015-02-17 09:29:45 -0800986 berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errMismatchedResults, fc.ctx, got, want))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800987 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700988 }
989 for ix, r := range resultptrs {
990 if err := fc.dec.Decode(r); err != nil {
Todd Wangff73e1f2015-02-10 21:45:52 -0800991 berr := verror.New(decodeNetError(fc.ctx, err), fc.ctx, verror.New(errResultDecoding, fc.ctx, ix, err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800992 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700993 }
994 }
995 return fc.close(nil)
996}
997
Asim Shankar2d731a92014-09-29 17:46:38 -0700998func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -0700999 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -07001000}