blob: de89583fa7431d156b56d2f38f6cac497fb3d122 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Asim Shankarb54d7642014-06-05 13:08:04 -07004 "fmt"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "io"
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -07006 "math"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07007 "math/rand"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -08008 "net"
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -07009 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070010 "sync"
11 "time"
12
Jiri Simsa764efb72014-12-25 20:57:03 -080013 "v.io/core/veyron2/context"
14 "v.io/core/veyron2/i18n"
15 "v.io/core/veyron2/ipc"
16 "v.io/core/veyron2/ipc/stream"
17 "v.io/core/veyron2/naming"
18 "v.io/core/veyron2/options"
19 "v.io/core/veyron2/security"
Todd Wangb86b3522015-01-22 13:34:20 -080020 "v.io/core/veyron2/vdl"
Jiri Simsa764efb72014-12-25 20:57:03 -080021 old_verror "v.io/core/veyron2/verror"
22 verror "v.io/core/veyron2/verror2"
23 "v.io/core/veyron2/vlog"
Todd Wang3425a902015-01-21 18:43:59 -080024 "v.io/core/veyron2/vom"
Jiri Simsa764efb72014-12-25 20:57:03 -080025 "v.io/core/veyron2/vtrace"
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080026
Jiri Simsa764efb72014-12-25 20:57:03 -080027 "v.io/core/veyron/runtimes/google/ipc/stream/vc"
28 "v.io/core/veyron/runtimes/google/ipc/version"
29 inaming "v.io/core/veyron/runtimes/google/naming"
30 ivtrace "v.io/core/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070031)
32
Jiri Simsa764efb72014-12-25 20:57:03 -080033const pkgPath = "v.io/core/veyron/runtimes/google/ipc"
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080034
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080035// TODO(cnicolaou): for local errors, automatically assign a new 'id',
36// don't use pkgPath etc. Can then move them into being defined on each line
37// and not here.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038var (
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080039 // Local errs that are used to provide details to the public ones.
40 errClientCloseAlreadyCalled = verror.Register(pkgPath+".closeAlreadyCalled", verror.NoRetry,
41 "ipc.Client.Close has already been called")
42
43 errClientFinishAlreadyCalled = verror.Register(pkgPath+".finishAlreadyCalled", verror.NoRetry, "ipc.Call.Finish has already been called")
44
45 errNonRootedName = verror.Register(pkgPath+".nonRootedName", verror.NoRetry, "{3} does not appear to contain an address")
46
47 errInvalidEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an invalid endpoint")
48
49 errIncompatibleEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an incompatible endpoint")
50
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080051 errNotTrusted = verror.Register(pkgPath+".notTrusted", verror.NoRetry, "name {3} not trusted using blessings {4}{:5}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080052
53 errAuthError = verror.Register(pkgPath+".authError", verror.RetryRefetch, "authentication error from server {3}{:4}")
54
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080055 errSystemRetry = verror.Register(pkgPath+".sysErrorRetryConnection", verror.RetryConnection, "{:3:}")
56 errClosingFlow = verror.Register(pkgPath+".errClosingFlow", verror.NoRetry, "{:3:}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080057
Todd Wang34ed4c62014-11-26 15:15:52 -080058 errVomEncoder = verror.Register(pkgPath+".vomEncoder", verror.NoRetry, "failed to create vom encoder {:3}")
59 errVomDecoder = verror.Register(pkgPath+".vomDecoder", verror.NoRetry, "failed to create vom decoder {:3}")
60
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080061 errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
62
Suharsh Sivakumar720b7042014-12-22 17:33:23 -080063 errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
64
65 errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080066
67 errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
68
Benjamin Prosnitz0db77a22015-01-20 14:25:15 -080069 errMismatchedResults = verror.Register(pkgPath+".mismatchedResults", verror.NoRetry, "got {3} results, but want {4}")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -080070
71 errResultDecoding = verror.Register(pkgPath+".resultDecoding", verror.NoRetry, "failed to decode result #{3}{:4}")
72
73 errResponseDecoding = verror.Register(pkgPath+".responseDecoding", verror.NoRetry, "failed to decode response{:3}")
74
75 errRemainingStreamResults = verror.Register(pkgPath+".remaingStreamResults", verror.NoRetry, "stream closed with remaining stream results")
76
77 errNoBlessings = verror.Register(pkgPath+".noBlessings", verror.NoRetry, "server has not presented any blessings")
78
79 errAuthNoPatternMatch = verror.Register(pkgPath+".authNoPatternMatch",
80 verror.NoRetry, "server blessings {3} do not match pattern {4}")
81
82 errDefaultAuthDenied = verror.Register(pkgPath+".defaultAuthDenied", verror.NoRetry, "default authorization precludes talking to server with blessings{:3}")
83
84 errBlessingGrant = verror.Register(pkgPath+".blessingGrantFailed", verror.NoRetry, "failed to grant blessing to server with blessings {3}{:4}")
85
86 errBlessingAdd = verror.Register(pkgPath+".blessingAddFailed", verror.NoRetry, "failed to add blessing granted to server {3}{:4}")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070087)
88
Ryan Brown2726b402014-11-04 17:13:27 -080089// TODO(ribrdb): Flip this to true once everything is updated.
90const enableSecureServerAuth = false
91
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092type client struct {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080093 streamMgr stream.Manager
94 ns naming.Namespace
95 vcOpts []stream.VCOpt // vc opts passed to dial
96 preferredProtocols []string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070097
Jungho Ahn25545d32015-01-26 15:14:14 -080098 // We cache the IP networks on the device since it is not that cheap to read
99 // network interfaces through os syscall.
100 // TODO(jhahn): Add monitoring the network interface changes.
101 ipNets []*net.IPNet
102
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103 // We support concurrent calls to StartCall and Close, so we must protect the
104 // vcMap. Everything else is initialized upon client construction, and safe
105 // to use concurrently.
106 vcMapMu sync.Mutex
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800107 vcMap map[vcMapKey]*vcInfo
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700108
Ankure49a86a2014-11-11 18:52:43 -0800109 dc vc.DischargeClient
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700110}
111
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700112var _ ipc.Client = (*client)(nil)
113var _ ipc.BindOpt = (*client)(nil)
114
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115type vcInfo struct {
116 vc stream.VC
117 remoteEP naming.Endpoint
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700118}
119
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800120type vcMapKey struct {
121 endpoint string
122 encrypted bool
123}
124
Cosmos Nicolaou4e029972014-06-13 14:53:08 -0700125func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700126 c := &client{
Ankure49a86a2014-11-11 18:52:43 -0800127 streamMgr: streamMgr,
128 ns: ns,
Jungho Ahn25545d32015-01-26 15:14:14 -0800129 ipNets: ipNetworks(),
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800130 vcMap: make(map[vcMapKey]*vcInfo),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700131 }
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800132 c.dc = InternalNewDischargeClient(nil, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700133 for _, opt := range opts {
134 // Collect all client opts that are also vc opts.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800135 switch v := opt.(type) {
136 case stream.VCOpt:
137 c.vcOpts = append(c.vcOpts, v)
138 case options.PreferredProtocols:
139 c.preferredProtocols = v
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700140 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700141 }
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800142 c.vcOpts = append(c.vcOpts, c.dc)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800143
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700144 return c, nil
145}
146
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800147func vcEncrypted(vcOpts []stream.VCOpt) bool {
148 encrypted := true
149 for _, o := range vcOpts {
150 switch o {
151 case options.VCSecurityNone:
152 encrypted = false
153 case options.VCSecurityConfidential:
154 encrypted = true
155 }
156 }
157 return encrypted
158}
159
160func (c *client) createFlow(ctx *context.T, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700161 c.vcMapMu.Lock()
162 defer c.vcMapMu.Unlock()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800163 if c.vcMap == nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800164 return nil, verror.Make(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800165 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800166 vcKey := vcMapKey{ep.String(), vcEncrypted(vcOpts)}
167 if vcinfo := c.vcMap[vcKey]; vcinfo != nil {
Bogdan Caprita783f7792014-05-15 09:29:17 -0700168 if flow, err := vcinfo.vc.Connect(); err == nil {
169 return flow, nil
170 }
171 // If the vc fails to establish a new flow, we assume it's
172 // broken, remove it from the map, and proceed to establishing
173 // a new vc.
174 // TODO(caprita): Should we distinguish errors due to vc being
175 // closed from other errors? If not, should we call vc.Close()
176 // before removing the vc from the map?
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800177 delete(c.vcMap, vcKey)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700178 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800179 sm := c.streamMgr
Robin Thellendee439642014-10-20 14:39:17 -0700180 c.vcMapMu.Unlock()
Asim Shankar70494752015-01-23 16:10:23 -0800181 // Include the context when Dial-ing. This is currently done via an
182 // option, and for thread-safety reasons - cannot append directly to
183 // vcOpts.
184 // TODO(ashankar,mattr): Revisit the API in ipc/stream and explicitly
185 // provide a context to Dial and other relevant operations.
186 cpy := make([]stream.VCOpt, len(vcOpts)+1)
187 cpy[copy(cpy, vcOpts)] = vc.DialContext{ctx}
188 vcOpts = cpy
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800189 vc, err := sm.Dial(ep, vcOpts...)
Robin Thellendee439642014-10-20 14:39:17 -0700190 c.vcMapMu.Lock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700191 if err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800192 if strings.Contains(err.Error(), "authentication failed") {
193 return nil, verror.Make(errAuthError, ctx, ep, err)
194 } else {
195 return nil, verror.Make(errSystemRetry, ctx, err)
196 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700197 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800198 if c.vcMap == nil {
199 sm.ShutdownEndpoint(ep)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800200 return nil, verror.Make(errClientCloseAlreadyCalled, ctx)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800201 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800202 if othervc, exists := c.vcMap[vcKey]; exists {
Robin Thellendee439642014-10-20 14:39:17 -0700203 vc = othervc.vc
204 // TODO(ashankar,toddw): Figure out how to close up the VC that
205 // is discarded. vc.Close?
206 } else {
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800207 c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep}
Robin Thellendee439642014-10-20 14:39:17 -0700208 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800209 flow, err := vc.Connect()
210 if err != nil {
211
212 return nil, verror.Make(errAuthError, ctx, ep, err)
213 }
214 return flow, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215}
216
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700217// A randomized exponential backoff. The randomness deters error convoys from forming.
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800218// TODO(cnicolaou): rationalize this and the backoff in ipc.Server. Note
219// that rand is not thread safe and may crash.
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700220func backoff(n int, deadline time.Time) bool {
221 b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second))
222 if b > maxBackoff {
223 b = maxBackoff
224 }
225 r := deadline.Sub(time.Now())
226 if b > r {
227 // We need to leave a little time for the call to start or
228 // we'll just timeout in startCall before we actually do
229 // anything. If we just have a millisecond left, give up.
230 if r <= time.Millisecond {
231 return false
232 }
233 b = r - time.Millisecond
234 }
235 time.Sleep(b)
236 return true
237}
238
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800239func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
240 defer vlog.LogCall()()
241 return c.startCall(ctx, name, method, args, opts)
242}
243
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700244func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
245 for _, o := range opts {
Asim Shankarcc044212014-10-15 23:25:26 -0700246 if r, ok := o.(options.RetryTimeout); ok {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700247 return time.Duration(r), true
248 }
249 }
250 return 0, false
251}
252
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700253func getNoResolveOpt(opts []ipc.CallOpt) bool {
254 for _, o := range opts {
Suharsh Sivakumarb59a96d2015-01-09 16:39:54 -0800255 if _, ok := o.(options.NoResolve); ok {
256 return true
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700257 }
258 }
259 return false
260}
261
Suharsh Sivakumar11316872014-11-25 15:57:00 -0800262func shouldNotFetchDischarges(opts []ipc.CallOpt) bool {
263 for _, o := range opts {
264 if _, ok := o.(vc.NoDischarges); ok {
265 return true
266 }
267 }
268 return false
269}
270
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800271func getVCOpts(opts []ipc.CallOpt) (vcOpts []stream.VCOpt) {
272 for _, o := range opts {
273 if v, ok := o.(stream.VCOpt); ok {
274 vcOpts = append(vcOpts, v)
275 }
276 }
277 return
278}
279
280func getResolveOpts(opts []ipc.CallOpt) (resolveOpts []naming.ResolveOpt) {
281 for _, o := range opts {
282 if r, ok := o.(naming.ResolveOpt); ok {
283 resolveOpts = append(resolveOpts, r)
284 }
285 }
286 return
287}
288
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800289func allowCancel(opts []ipc.CallOpt) bool {
290 for _, o := range opts {
291 if _, ok := o.(inaming.NoCancel); ok {
292 return false
293 }
294 }
295 return true
296}
297
Ankure49a86a2014-11-11 18:52:43 -0800298func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) security.DischargeImpetus {
299 var impetus security.DischargeImpetus
300 if len(serverBlessings) > 0 {
301 impetus.Server = make([]security.BlessingPattern, len(serverBlessings))
302 for i, b := range serverBlessings {
303 impetus.Server[i] = security.BlessingPattern(b)
304 }
305 }
306 impetus.Method = method
307 if len(args) > 0 {
Todd Wangb86b3522015-01-22 13:34:20 -0800308 impetus.Arguments = make([]vdl.AnyRep, len(args))
Ankure49a86a2014-11-11 18:52:43 -0800309 for i, a := range args {
Todd Wangb86b3522015-01-22 13:34:20 -0800310 impetus.Arguments[i] = vdl.AnyRep(a)
Ankure49a86a2014-11-11 18:52:43 -0800311 }
312 }
313 return impetus
314}
315
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700316// startCall ensures StartCall always returns verror.E.
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800317func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
318 if !ctx.Initialized() {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800319 return nil, verror.ExplicitMake(verror.BadArg, i18n.NoLangID, "ipc.Client", "StartCall")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700320 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800321
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800322 ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800323 ctx = verror.ContextWithComponentName(ctx, "ipc.Client")
Matt Rosencrantz321a51d2014-10-30 10:37:56 -0700324
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700325 // Context specified deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700326 deadline, hasDeadline := ctx.Deadline()
327 if !hasDeadline {
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700328 // Default deadline.
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700329 deadline = time.Now().Add(defaultCallTimeout)
330 }
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700331 if r, ok := getRetryTimeoutOpt(opts); ok {
332 // Caller specified deadline.
333 deadline = time.Now().Add(time.Duration(r))
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700334 }
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800335
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700336 var lastErr verror.E
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700337 for retries := 0; ; retries++ {
David Why Use Two When One Will Do Presottob02cf902014-09-12 17:22:56 -0700338 if retries != 0 {
339 if !backoff(retries, deadline) {
340 break
341 }
342 }
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800343 call, action, err := c.tryCall(ctx, name, method, args, opts)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700344 if err == nil {
345 return call, nil
346 }
347 lastErr = err
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800348 shouldRetry := true
349 switch {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800350 case action != verror.RetryConnection && action != verror.RetryRefetch:
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800351 shouldRetry = false
352 case time.Now().After(deadline):
353 shouldRetry = false
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800354 case action == verror.RetryRefetch && getNoResolveOpt(opts):
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800355 // If we're skipping resolution and there are no servers for
356 // this call retrying is not going to help, we can't come up
357 // with new servers if there is no resolution.
358 shouldRetry = false
359 }
Matt Rosencrantzcc922c12014-11-28 20:28:59 -0800360 if !shouldRetry {
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800361 span.Annotatef("Cannot retry after error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700362 break
363 }
Matt Rosencrantzabacd432014-11-24 10:44:31 -0800364 span.Annotatef("Retrying due to error: %s", err)
David Why Use Two When One Will Do Presotto06c60f02014-09-11 14:52:19 -0700365 }
366 return nil, lastErr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700367}
368
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800369type serverStatus struct {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800370 index int
371 suffix string
372 flow stream.Flow
373 err verror.E
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800374}
375
Asim Shankaraae31802015-01-22 11:59:42 -0800376// tryCreateFlow attempts to establish a Flow to "server" (which must be a
377// rooted name), over which a method invocation request could be sent.
Cosmos Nicolaou00a0f802014-11-16 22:44:55 -0800378// TODO(cnicolaou): implement real, configurable load balancing.
Asim Shankaraae31802015-01-22 11:59:42 -0800379func (c *client) tryCreateFlow(ctx *context.T, index int, server string, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800380 status := &serverStatus{index: index}
Asim Shankarf4864f42014-11-25 18:53:05 -0800381 var span vtrace.Span
Asim Shankaraae31802015-01-22 11:59:42 -0800382 ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow")
Asim Shankarf4864f42014-11-25 18:53:05 -0800383 span.Annotatef("address:%v", server)
Asim Shankaraae31802015-01-22 11:59:42 -0800384 defer func() {
385 ch <- status
386 span.Finish()
387 }()
388 address, suffix := naming.SplitAddressName(server)
389 if len(address) == 0 {
390 status.err = verror.Make(errNonRootedName, ctx, server)
391 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800392 }
Asim Shankaraae31802015-01-22 11:59:42 -0800393 ep, err := inaming.NewEndpoint(address)
394 if err != nil {
395 status.err = verror.Make(errInvalidEndpoint, ctx, address)
396 return
397 }
398 if err = version.CheckCompatibility(ep); err != nil {
399 status.err = verror.Make(errIncompatibleEndpoint, ctx, ep)
400 return
401 }
402 if status.flow, status.err = c.createFlow(ctx, ep, vcOpts); status.err != nil {
403 vlog.VI(2).Infof("ipc: connect to %v: %v", server, status.err)
404 return
405 }
406 status.suffix = suffix
407 return
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800408}
409
Asim Shankaraae31802015-01-22 11:59:42 -0800410// tryCall makes a single attempt at a call. It may connect to multiple servers
411// (all that serve "name"), but will invoke the method on at most one of them
412// (the server running on the most preferred protcol and network amongst all
413// the servers that were successfully connected to and authorized).
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800414func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.ActionCode, verror.E) {
Asim Shankaraae31802015-01-22 11:59:42 -0800415 var resolved *naming.MountEntry
Ryan Brown6153c6c2014-12-11 13:10:09 -0800416 var pattern security.BlessingPattern
Asim Shankaraae31802015-01-22 11:59:42 -0800417 var err error
418 if resolved, err = c.ns.Resolve(ctx, name, getResolveOpts(opts)...); err != nil {
David Why Use Two When One Will Do Presotto8de85852015-01-21 11:05:09 -0800419 vlog.Errorf("Resolve: %v", err)
Ryan Brown6153c6c2014-12-11 13:10:09 -0800420 if verror.Is(err, naming.ErrNoSuchName.ID) {
421 return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name)
422 }
423 return nil, verror.NoRetry, verror.Make(verror.NoExist, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700424 } else {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800425 pattern = security.BlessingPattern(resolved.Pattern)
426 if len(resolved.Servers) == 0 {
427 return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name)
Suharsh Sivakumar65e44c22014-12-10 17:15:19 -0800428 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800429 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800430 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800431 return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name, err)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700432 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700433 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800434
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800435 // servers is now orderd by the priority heurestic implemented in
436 // filterAndOrderServers.
Asim Shankaraae31802015-01-22 11:59:42 -0800437 //
438 // Try to connect to all servers in parallel. Provide sufficient
439 // buffering for all of the connections to finish instantaneously. This
440 // is important because we want to process the responses in priority
441 // order; that order is indicated by the order of entries in servers.
442 // So, if two respones come in at the same 'instant', we prefer the
443 // first in the resolved.Servers)
444 attempts := len(resolved.Servers)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800445 responses := make([]*serverStatus, attempts)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800446 ch := make(chan *serverStatus, attempts)
Asim Shankaraae31802015-01-22 11:59:42 -0800447 vcOpts := append(getVCOpts(opts), c.vcOpts...)
Asim Shankaraae31802015-01-22 11:59:42 -0800448 for i, server := range resolved.Names() {
449 go c.tryCreateFlow(ctx, i, server, ch, vcOpts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700450 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800451
452 delay := time.Duration(ipc.NoTimeout)
Todd Wangef05c062014-11-15 09:51:43 -0800453 if dl, ok := ctx.Deadline(); ok {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800454 delay = dl.Sub(time.Now())
455 }
456 timeoutChan := time.After(delay)
457
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800458 for {
Todd Wangef05c062014-11-15 09:51:43 -0800459 // Block for at least one new response from the server, or the timeout.
460 select {
461 case r := <-ch:
462 responses[r.index] = r
463 // Read as many more responses as we can without blocking.
464 LoopNonBlocking:
465 for {
466 select {
467 default:
468 break LoopNonBlocking
469 case r := <-ch:
470 responses[r.index] = r
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800471 }
Todd Wangef05c062014-11-15 09:51:43 -0800472 }
473 case <-timeoutChan:
474 vlog.VI(2).Infof("ipc: timeout on connection to server %v ", name)
Asim Shankaraae31802015-01-22 11:59:42 -0800475 _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou38209d42014-12-09 16:50:38 -0800476 if !verror.Is(err, verror.Timeout.ID) {
477 return nil, verror.NoRetry, verror.Make(verror.Timeout, ctx, err)
478 }
479 return nil, verror.NoRetry, err
Todd Wangef05c062014-11-15 09:51:43 -0800480 }
481
482 // Process new responses, in priority order.
483 numResponses := 0
Suharsh Sivakumarae774a52015-01-09 14:26:32 -0800484 noDischarges := shouldNotFetchDischarges(opts)
Todd Wangef05c062014-11-15 09:51:43 -0800485 for _, r := range responses {
486 if r != nil {
487 numResponses++
488 }
489 if r == nil || r.flow == nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800490 continue
491 }
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800492
493 var doneChan <-chan struct{}
494 if allowCancel(opts) {
495 doneChan = ctx.Done()
496 }
497 r.flow.SetDeadline(doneChan)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800498
499 var (
500 serverB []string
501 grantedB security.Blessings
502 )
503
504 // LocalPrincipal is nil means that the client wanted to avoid
505 // authentication, and thus wanted to skip authorization as well.
Todd Wangef05c062014-11-15 09:51:43 -0800506 if r.flow.LocalPrincipal() != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800507 // Validate caveats on the server's identity for the context associated with this call.
508 var err error
Ryan Brown6153c6c2014-12-11 13:10:09 -0800509 if serverB, grantedB, err = c.authorizeServer(ctx, r.flow, name, method, pattern, opts); err != nil {
Asim Shankaraae31802015-01-22 11:59:42 -0800510 r.err = verror.Make(errNotTrusted, ctx, name, r.flow.RemoteBlessings(), err)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800511 vlog.VI(2).Infof("ipc: err: %s", r.err)
Todd Wangef05c062014-11-15 09:51:43 -0800512 r.flow.Close()
513 r.flow = nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800514 continue
515 }
516 }
517
Todd Wangef05c062014-11-15 09:51:43 -0800518 // This is the 'point of no return'; once the RPC is started (fc.start
519 // below) we can't be sure if it makes it to the server or not so, this
520 // code will never call fc.start more than once to ensure that we provide
521 // 'at-most-once' rpc semantics at this level. Retrying the network
522 // connections (i.e. creating flows) is fine since we can cleanup that
523 // state if we abort a call (i.e. close the flow).
524 //
525 // We must ensure that all flows other than r.flow are closed.
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800526 //
527 // TODO(cnicolaou): all errors below are marked as NoRetry
528 // because we want to provide at-most-once rpc semantics so
529 // we only ever attempt an RPC once. In the future, we'll cache
530 // responses on the server and then we can retry in-process
531 // RPCs.
Todd Wangef05c062014-11-15 09:51:43 -0800532 go cleanupTryCall(r, responses, ch)
Todd Wang34ed4c62014-11-26 15:15:52 -0800533 fc, err := newFlowClient(ctx, serverB, r.flow, c.dc)
534 if err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800535 return nil, verror.NoRetry, err.(verror.E)
Todd Wang34ed4c62014-11-26 15:15:52 -0800536 }
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800537
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800538 if doneChan != nil {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800539 go func() {
540 select {
Matt Rosencrantzfa3082c2015-01-22 21:39:04 -0800541 case <-doneChan:
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800542 vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
Matt Rosencrantz9346b412014-12-18 15:59:19 -0800543 fc.flow.Cancel()
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800544 case <-fc.flow.Closed():
545 }
546 }()
547 }
548
549 timeout := time.Duration(ipc.NoTimeout)
550 if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
551 timeout = deadline.Sub(time.Now())
552 }
Suharsh Sivakumar11316872014-11-25 15:57:00 -0800553 if noDischarges {
554 fc.dc = nil
555 }
Todd Wangef05c062014-11-15 09:51:43 -0800556 if verr := fc.start(r.suffix, method, args, timeout, grantedB); verr != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800557 return nil, verror.NoRetry, verr
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800558 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800559 return fc, verror.NoRetry, nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800560 }
Todd Wangef05c062014-11-15 09:51:43 -0800561 if numResponses == len(responses) {
Asim Shankaraae31802015-01-22 11:59:42 -0800562 return c.failedTryCall(ctx, name, method, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800563 }
564 }
Todd Wangef05c062014-11-15 09:51:43 -0800565}
566
Asim Shankaraae31802015-01-22 11:59:42 -0800567// cleanupTryCall ensures we've waited for every response from the tryCreateFlow
Todd Wangef05c062014-11-15 09:51:43 -0800568// goroutines, and have closed the flow from each one except skip. This is a
569// blocking function; it should be called in its own goroutine.
570func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
571 numPending := 0
572 for _, r := range responses {
573 switch {
574 case r == nil:
575 // The response hasn't arrived yet.
576 numPending++
577 case r == skip || r.flow == nil:
578 // Either we should skip this flow, or we've closed the flow for this
579 // response already; nothing more to do.
580 default:
581 // We received the response, but haven't closed the flow yet.
582 r.flow.Close()
583 }
584 }
585 // Now we just need to wait for the pending responses and close their flows.
586 for i := 0; i < numPending; i++ {
587 if r := <-ch; r.flow != nil {
588 r.flow.Close()
589 }
590 }
591}
592
593// failedTryCall performs asynchronous cleanup for tryCall, and returns an
594// appropriate error from the responses we've already received. All parallel
595// calls in tryCall failed or we timed out if we get here.
Asim Shankaraae31802015-01-22 11:59:42 -0800596func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (ipc.Call, verror.ActionCode, verror.E) {
Todd Wangef05c062014-11-15 09:51:43 -0800597 go cleanupTryCall(nil, responses, ch)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800598 c.ns.FlushCacheEntry(name)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800599 noconn, untrusted := []string{}, []string{}
Asim Shankaraae31802015-01-22 11:59:42 -0800600 for _, r := range responses {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800601 if r != nil && r.err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800602 switch {
603 case verror.Is(r.err, errNotTrusted.ID) || verror.Is(r.err, errAuthError.ID):
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800604 untrusted = append(untrusted, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800605 default:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800606 noconn = append(noconn, "("+r.err.Error()+") ")
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800607 }
Todd Wangef05c062014-11-15 09:51:43 -0800608 }
609 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800610 // TODO(cnicolaou): we get system errors for things like dialing using
611 // the 'ws' protocol which can never succeed even if we retry the connection,
612 // hence we return RetryRefetch in all cases below. In the future, we'll
613 // pick out this error and then we can retry the connection also. This also
614 // plays into the 'at-most-once' rpc semantics change that's needed in order
615 // to retry an in-flight RPC.
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800616 switch {
617 case len(untrusted) > 0 && len(noconn) > 0:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800618 return nil, verror.RetryRefetch, verror.Make(verror.NoServersAndAuth, ctx, append(noconn, untrusted...))
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800619 case len(noconn) > 0:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800620 return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, noconn)
621 case len(untrusted) > 0:
622 return nil, verror.NoRetry, verror.Make(verror.NotTrusted, ctx, untrusted)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800623 default:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800624 return nil, verror.RetryRefetch, verror.Make(verror.Timeout, ctx)
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800625 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700626}
627
Asim Shankar8f05c222014-10-06 22:08:19 -0700628// authorizeServer validates that the server (remote end of flow) has the credentials to serve
629// the RPC name.method for the client (local end of the flow). It returns the blessings at the
630// server that are authorized for this purpose and any blessings that are to be granted to
631// the server (via ipc.Granter implementations in opts.)
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800632func (c *client) authorizeServer(ctx *context.T, flow stream.Flow, name, method string, serverPattern security.BlessingPattern, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err verror.E) {
Asim Shankar220a0152014-10-30 21:21:09 -0700633 if flow.RemoteBlessings() == nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800634 return nil, nil, verror.Make(errNoBlessings, ctx)
Asim Shankarb54d7642014-06-05 13:08:04 -0700635 }
Ankure49a86a2014-11-11 18:52:43 -0800636 ctxt := security.NewContext(&security.ContextParams{
637 LocalPrincipal: flow.LocalPrincipal(),
638 LocalBlessings: flow.LocalBlessings(),
639 RemoteBlessings: flow.RemoteBlessings(),
640 LocalEndpoint: flow.LocalEndpoint(),
641 RemoteEndpoint: flow.RemoteEndpoint(),
642 RemoteDischarges: flow.RemoteDischarges(),
643 Method: method,
Todd Wang9a7f5162014-11-13 13:24:33 -0800644 Suffix: name})
Ryan Brown2726b402014-11-04 17:13:27 -0800645 serverBlessings = flow.RemoteBlessings().ForContext(ctxt)
646 if serverPattern != "" {
647 if !serverPattern.MatchedBy(serverBlessings...) {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800648 return nil, nil, verror.Make(errAuthNoPatternMatch, ctx, serverBlessings, serverPattern)
Ryan Brown2726b402014-11-04 17:13:27 -0800649 }
650 } else if enableSecureServerAuth {
651 if err := (defaultAuthorizer{}).Authorize(ctxt); err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800652 return nil, nil, verror.Make(errDefaultAuthDenied, ctx, serverBlessings)
Ryan Brown2726b402014-11-04 17:13:27 -0800653 }
654 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700655 for _, o := range opts {
656 switch v := o.(type) {
Asim Shankarb54d7642014-06-05 13:08:04 -0700657 case ipc.Granter:
Asim Shankar8f05c222014-10-06 22:08:19 -0700658 if b, err := v.Grant(flow.RemoteBlessings()); err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800659 return nil, nil, verror.Make(errBlessingGrant, ctx, serverBlessings, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700660 } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800661 return nil, nil, verror.Make(errBlessingAdd, ctx, serverBlessings, err)
Asim Shankar8f05c222014-10-06 22:08:19 -0700662 }
Asim Shankarb54d7642014-06-05 13:08:04 -0700663 }
664 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700665 return serverBlessings, grantedBlessings, nil
Asim Shankarb54d7642014-06-05 13:08:04 -0700666}
667
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700668func (c *client) Close() {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700669 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700670 c.vcMapMu.Lock()
671 for _, v := range c.vcMap {
672 c.streamMgr.ShutdownEndpoint(v.remoteEP)
673 }
674 c.vcMap = nil
675 c.vcMapMu.Unlock()
676}
677
678// IPCBindOpt makes client implement BindOpt.
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700679func (c *client) IPCBindOpt() {
680 //nologcall
681}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700682
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700683// flowClient implements the RPC client-side protocol for a single RPC, over a
684// flow that's already connected to the server.
685type flowClient struct {
Todd Wang3425a902015-01-21 18:43:59 -0800686 ctx *context.T // context to annotate with call details
687 dec *vom.Decoder // to decode responses and results from the server
688 enc *vom.Encoder // to encode requests and args to the server
689 server []string // Blessings bound to the server that authorize it to receive the IPC request from the client.
690 flow stream.Flow // the underlying flow
691 response ipc.Response // each decoded response message is kept here
Asim Shankar1707e432014-05-29 19:42:41 -0700692
Ankure49a86a2014-11-11 18:52:43 -0800693 discharges []security.Discharge // discharges used for this request
694 dc vc.DischargeClient // client-global discharge-client
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700695
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800696 blessings security.Blessings // the local blessings for the current RPC.
697
Asim Shankar1707e432014-05-29 19:42:41 -0700698 sendClosedMu sync.Mutex
699 sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800700 finished bool // has Finish() already been called?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700701}
702
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700703var _ ipc.Call = (*flowClient)(nil)
704var _ ipc.Stream = (*flowClient)(nil)
705
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800706func newFlowClient(ctx *context.T, server []string, flow stream.Flow, dc vc.DischargeClient) (*flowClient, error) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800707 fc := &flowClient{
Ankure49a86a2014-11-11 18:52:43 -0800708 ctx: ctx,
Ankure49a86a2014-11-11 18:52:43 -0800709 server: server,
710 flow: flow,
711 dc: dc,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700712 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800713 var err error
Todd Wang3425a902015-01-21 18:43:59 -0800714 if fc.enc, err = vom.NewBinaryEncoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800715 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errVomEncoder, fc.ctx, err))
716 return nil, fc.close(berr)
717 }
Todd Wang3425a902015-01-21 18:43:59 -0800718 if fc.dec, err = vom.NewDecoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800719 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errVomDecoder, fc.ctx, err))
720 return nil, fc.close(berr)
Todd Wang34ed4c62014-11-26 15:15:52 -0800721 }
722 return fc, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700723}
724
725func (fc *flowClient) close(verr verror.E) verror.E {
726 if err := fc.flow.Close(); err != nil && verr == nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800727 verr = verror.Make(errClosingFlow, fc.ctx, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700728 }
729 return verr
730}
731
Asim Shankar8f05c222014-10-06 22:08:19 -0700732func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E {
Ankure49a86a2014-11-11 18:52:43 -0800733 // Fetch any discharges for third-party caveats on the client's blessings
734 // if this client owns a discharge-client.
735 if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
Asim Shankarf4864f42014-11-25 18:53:05 -0800736 fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
Ankure49a86a2014-11-11 18:52:43 -0800737 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800738 // Encode the Blessings information for the client to authorize the flow.
739 var blessingsRequest ipc.BlessingsRequest
740 if fc.flow.LocalPrincipal() != nil {
Jungho Ahn44d8daf2015-01-16 10:39:15 -0800741 fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...)
742 blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings)
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800743 }
744 // TODO(suharshs, ataly): Make security.Discharge a vdl type.
Todd Wangb86b3522015-01-22 13:34:20 -0800745 anyDischarges := make([]vdl.AnyRep, len(fc.discharges))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800746 for i, d := range fc.discharges {
747 anyDischarges[i] = d
748 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700749 req := ipc.Request{
Asim Shankar8f05c222014-10-06 22:08:19 -0700750 Suffix: suffix,
751 Method: method,
752 NumPosArgs: uint64(len(args)),
753 Timeout: int64(timeout),
754 GrantedBlessings: security.MarshalBlessings(blessings),
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800755 Blessings: blessingsRequest,
756 Discharges: anyDischarges,
Asim Shankarf4864f42014-11-25 18:53:05 -0800757 TraceRequest: ivtrace.Request(fc.ctx),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700758 }
759 if err := fc.enc.Encode(req); err != nil {
Todd Wangbc4875f2014-12-12 10:30:26 -0800760 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800761 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700762 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800763
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700764 for ix, arg := range args {
765 if err := fc.enc.Encode(arg); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800766 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, ix, err))
767 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700768 }
769 }
770 return nil
771}
772
773func (fc *flowClient) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700774 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700775 if fc.sendClosed {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800776 return verror.Make(verror.Aborted, fc.ctx)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700777 }
778
779 // The empty request header indicates what follows is a streaming arg.
780 if err := fc.enc.Encode(ipc.Request{}); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800781 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, ipc.Request{}, err))
782 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700783 }
784 if err := fc.enc.Encode(item); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800785 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, -1, err))
786 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700787 }
788 return nil
789}
790
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800791func decodeNetError(ctx *context.T, err error) verror.IDAction {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800792 if neterr, ok := err.(net.Error); ok {
793 if neterr.Timeout() || neterr.Temporary() {
794 // If a read is cancelled in the lower levels we see
795 // a timeout error - see readLocked in vc/reader.go
796 if ctx.Err() == context.Canceled {
797 return verror.Cancelled
798 }
799 return verror.Timeout
800 }
801 }
802 return verror.BadProtocol
803}
804
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700805func (fc *flowClient) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700806 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700807 switch {
808 case fc.response.Error != nil:
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800809 // TODO(cnicolaou): this will become a verror2.E when we convert the
810 // server.
811 return verror.Make(verror.BadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700812 case fc.response.EndStreamResults:
813 return io.EOF
814 }
815
816 // Decode the response header and handle errors and EOF.
817 if err := fc.dec.Decode(&fc.response); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800818 berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err))
819 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700820 }
821 if fc.response.Error != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800822 // TODO(cnicolaou): this will become a verror2.E when we convert the
823 // server.
824 return verror.Make(verror.BadProtocol, fc.ctx, fc.response.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700825 }
826 if fc.response.EndStreamResults {
827 // Return EOF to indicate to the caller that there are no more stream
828 // results. Any error sent by the server is kept in fc.response.Error, and
829 // returned to the user in Finish.
830 return io.EOF
831 }
832 // Decode the streaming result.
833 if err := fc.dec.Decode(itemptr); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800834 berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err))
835 // TODO(cnicolaou): should we be caching this?
836 fc.response.Error = berr
837 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838 }
839 return nil
840}
841
842func (fc *flowClient) CloseSend() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700843 defer vlog.LogCall()()
Tilak Sharma0c766112014-05-20 17:47:27 -0700844 return fc.closeSend()
845}
846
847// closeSend ensures CloseSend always returns verror.E.
848func (fc *flowClient) closeSend() verror.E {
Asim Shankar1707e432014-05-29 19:42:41 -0700849 fc.sendClosedMu.Lock()
850 defer fc.sendClosedMu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700851 if fc.sendClosed {
Asim Shankar1707e432014-05-29 19:42:41 -0700852 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700853 }
854 if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700855 // TODO(caprita): Indiscriminately closing the flow below causes
856 // a race as described in:
857 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
858 //
859 // There should be a finer grained way to fix this (for example,
860 // encoding errors should probably still result in closing the
861 // flow); on the flip side, there may exist other instances
862 // where we are closing the flow but should not.
863 //
864 // For now, commenting out the line below removes the flakiness
865 // from our existing unit tests, but this needs to be revisited
866 // and fixed correctly.
867 //
868 // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700869 }
870 fc.sendClosed = true
871 return nil
872}
873
874func (fc *flowClient) Finish(resultptrs ...interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700875 defer vlog.LogCall()()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700876 err := fc.finish(resultptrs...)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800877 vtrace.GetSpan(fc.ctx).Finish()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700878 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700879}
880
881// finish ensures Finish always returns verror.E.
882func (fc *flowClient) finish(resultptrs ...interface{}) verror.E {
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700883 if fc.finished {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800884 err := verror.Make(errClientFinishAlreadyCalled, fc.ctx)
885 return fc.close(verror.Make(verror.BadState, fc.ctx, err))
Ken Ashcraft2b8309a2014-09-09 10:44:43 -0700886 }
887 fc.finished = true
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800888
Todd Wangce3033b2014-05-23 17:04:44 -0700889 // Call closeSend implicitly, if the user hasn't already called it. There are
890 // three cases:
891 // 1) Server is blocked on Recv waiting for the final request message.
892 // 2) Server has already finished processing, the final response message and
893 // out args are queued up on the client, and the flow is closed.
894 // 3) Between 1 and 2: the server isn't blocked on Recv, but the final
895 // response and args aren't queued up yet, and the flow isn't closed.
896 //
897 // We must call closeSend to handle case (1) and unblock the server; otherwise
898 // we'll deadlock with both client and server waiting for each other. We must
899 // ignore the error (if any) to handle case (2). In that case the flow is
900 // closed, meaning writes will fail and reads will succeed, and closeSend will
901 // always return an error. But this isn't a "real" error; the client should
902 // read the rest of the results and succeed.
903 _ = fc.closeSend()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700904 // Decode the response header, if it hasn't already been decoded by Recv.
905 if fc.response.Error == nil && !fc.response.EndStreamResults {
906 if err := fc.dec.Decode(&fc.response); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800907 berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err))
908 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700909 }
910 // The response header must indicate the streaming results have ended.
911 if fc.response.Error == nil && !fc.response.EndStreamResults {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800912 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRemainingStreamResults, fc.ctx))
913 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700914 }
915 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800916 if fc.response.AckBlessings {
917 clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
918 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700919 // Incorporate any VTrace info that was returned.
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800920 ivtrace.Merge(fc.ctx, fc.response.TraceResponse)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700921 if fc.response.Error != nil {
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800922 // TODO(cnicolaou): remove verror.NoAccess with verror version
923 // when ipc.Server is converted.
924 if verror.Is(fc.response.Error, old_verror.NoAccess) && fc.dc != nil {
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700925 // In case the error was caused by a bad discharge, we do not want to get stuck
926 // with retrying again and again with this discharge. As there is no direct way
927 // to detect it, we conservatively flush all discharges we used from the cache.
928 // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
Asim Shankar77befba2015-01-09 12:49:04 -0800929 vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
Ankure49a86a2014-11-11 18:52:43 -0800930 fc.dc.Invalidate(fc.discharges...)
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700931 }
Cosmos Nicolaou112bf1c2014-11-21 15:43:11 -0800932 return fc.close(verror.Convert(verror.Internal, fc.ctx, fc.response.Error))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700933 }
934 if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800935 berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errMismatchedResults, fc.ctx, got, want))
936 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700937 }
938 for ix, r := range resultptrs {
939 if err := fc.dec.Decode(r); err != nil {
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800940 berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResultDecoding, fc.ctx, ix, err))
941 return fc.close(berr)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700942 }
943 }
944 return fc.close(nil)
945}
946
Asim Shankar2d731a92014-09-29 17:46:38 -0700947func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) {
Asim Shankar8f05c222014-10-06 22:08:19 -0700948 return fc.server, fc.flow.RemoteBlessings()
Asim Shankar2d731a92014-09-29 17:46:38 -0700949}