Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 4 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | "io" |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 6 | "math" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 7 | "math/rand" |
| 8 | "strings" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 9 | "sync" |
| 10 | "time" |
| 11 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 12 | "veyron.io/veyron/veyron/runtimes/google/ipc/version" |
| 13 | inaming "veyron.io/veyron/veyron/runtimes/google/naming" |
Asim Shankar | 5c576c4 | 2014-10-01 12:19:12 -0700 | [diff] [blame] | 14 | isecurity "veyron.io/veyron/veyron/runtimes/google/security" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 15 | "veyron.io/veyron/veyron/runtimes/google/vtrace" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 16 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 17 | "veyron.io/veyron/veyron2/context" |
| 18 | "veyron.io/veyron/veyron2/ipc" |
| 19 | "veyron.io/veyron/veyron2/ipc/stream" |
| 20 | "veyron.io/veyron/veyron2/naming" |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 21 | "veyron.io/veyron/veyron2/options" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 22 | "veyron.io/veyron/veyron2/security" |
| 23 | "veyron.io/veyron/veyron2/verror" |
| 24 | "veyron.io/veyron/veyron2/vlog" |
| 25 | "veyron.io/veyron/veyron2/vom" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 26 | ) |
| 27 | |
| 28 | var ( |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 29 | errNoServers = verror.NoExistf("ipc: no servers") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 30 | errFlowClosed = verror.Abortedf("ipc: flow closed") |
| 31 | errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results") |
| 32 | errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name") |
| 33 | ) |
| 34 | |
| 35 | type client struct { |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 36 | streamMgr stream.Manager |
| 37 | ns naming.Namespace |
| 38 | vcOpts []stream.VCOpt // vc opts passed to dial |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 39 | |
| 40 | // We support concurrent calls to StartCall and Close, so we must protect the |
| 41 | // vcMap. Everything else is initialized upon client construction, and safe |
| 42 | // to use concurrently. |
| 43 | vcMapMu sync.Mutex |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 44 | // TODO(ashankar): The key should be a function of the blessings shared with the server? |
| 45 | vcMap map[string]*vcInfo // map key is endpoint.String |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 46 | |
| 47 | dischargeCache dischargeCache |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 48 | } |
| 49 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 50 | var _ ipc.Client = (*client)(nil) |
| 51 | var _ ipc.BindOpt = (*client)(nil) |
| 52 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 53 | type vcInfo struct { |
| 54 | vc stream.VC |
| 55 | remoteEP naming.Endpoint |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 56 | } |
| 57 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 58 | func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 59 | c := &client{ |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 60 | streamMgr: streamMgr, |
| 61 | ns: ns, |
| 62 | vcMap: make(map[string]*vcInfo), |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 63 | dischargeCache: dischargeCache{cache: make(map[string]security.Discharge)}, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 64 | } |
| 65 | for _, opt := range opts { |
| 66 | // Collect all client opts that are also vc opts. |
| 67 | if vcOpt, ok := opt.(stream.VCOpt); ok { |
| 68 | c.vcOpts = append(c.vcOpts, vcOpt) |
| 69 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 70 | } |
| 71 | return c, nil |
| 72 | } |
| 73 | |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 74 | func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 75 | c.vcMapMu.Lock() |
| 76 | defer c.vcMapMu.Unlock() |
| 77 | if vcinfo := c.vcMap[ep.String()]; vcinfo != nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 78 | if flow, err := vcinfo.vc.Connect(); err == nil { |
| 79 | return flow, nil |
| 80 | } |
| 81 | // If the vc fails to establish a new flow, we assume it's |
| 82 | // broken, remove it from the map, and proceed to establishing |
| 83 | // a new vc. |
| 84 | // TODO(caprita): Should we distinguish errors due to vc being |
| 85 | // closed from other errors? If not, should we call vc.Close() |
| 86 | // before removing the vc from the map? |
| 87 | delete(c.vcMap, ep.String()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 88 | } |
| 89 | vc, err := c.streamMgr.Dial(ep, c.vcOpts...) |
| 90 | if err != nil { |
| 91 | return nil, err |
| 92 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 93 | c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep} |
| 94 | return vc.Connect() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 95 | } |
| 96 | |
| 97 | // connectFlow parses an endpoint and a suffix out of the server and establishes |
| 98 | // a flow to the endpoint, returning the parsed suffix. |
| 99 | // The server name passed in should be a rooted name, of the form "/ep/suffix" or |
| 100 | // "/ep//suffix", or just "/ep". |
| 101 | func (c *client) connectFlow(server string) (stream.Flow, string, error) { |
| 102 | address, suffix := naming.SplitAddressName(server) |
| 103 | if len(address) == 0 { |
| 104 | return nil, "", errNonRootedName |
| 105 | } |
| 106 | ep, err := inaming.NewEndpoint(address) |
| 107 | if err != nil { |
| 108 | return nil, "", err |
| 109 | } |
| 110 | if err = version.CheckCompatibility(ep); err != nil { |
| 111 | return nil, "", err |
| 112 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 113 | flow, err := c.createFlow(ep) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 114 | if err != nil { |
| 115 | return nil, "", err |
| 116 | } |
| 117 | return flow, suffix, nil |
| 118 | } |
| 119 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 120 | // A randomized exponential backoff. The randomness deters error convoys from forming. |
| 121 | func backoff(n int, deadline time.Time) bool { |
| 122 | b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second)) |
| 123 | if b > maxBackoff { |
| 124 | b = maxBackoff |
| 125 | } |
| 126 | r := deadline.Sub(time.Now()) |
| 127 | if b > r { |
| 128 | // We need to leave a little time for the call to start or |
| 129 | // we'll just timeout in startCall before we actually do |
| 130 | // anything. If we just have a millisecond left, give up. |
| 131 | if r <= time.Millisecond { |
| 132 | return false |
| 133 | } |
| 134 | b = r - time.Millisecond |
| 135 | } |
| 136 | time.Sleep(b) |
| 137 | return true |
| 138 | } |
| 139 | |
| 140 | // TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack. |
| 141 | func retriable(err error) bool { |
| 142 | e := err.Error() |
| 143 | // Authentication errors are permanent. |
| 144 | if strings.Contains(e, "authorized") { |
| 145 | return false |
| 146 | } |
| 147 | // Resolution errors are retriable. |
| 148 | if strings.Contains(e, "ipc: Resolve") { |
| 149 | return true |
| 150 | } |
| 151 | // Kernel level errors are retriable. |
| 152 | if strings.Contains(e, "errno") { |
| 153 | return true |
| 154 | } |
Matt Rosencrantz | 0c4032e | 2014-09-23 12:43:24 -0700 | [diff] [blame] | 155 | // Connection refused is retriable. |
| 156 | if strings.Contains(e, "connection refused") { |
| 157 | return true |
| 158 | } |
| 159 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 160 | return false |
| 161 | } |
| 162 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 163 | func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) { |
| 164 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 165 | if r, ok := o.(options.RetryTimeout); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 166 | return time.Duration(r), true |
| 167 | } |
| 168 | } |
| 169 | return 0, false |
| 170 | } |
| 171 | |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 172 | func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 173 | defer vlog.LogCall()() |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 174 | // Context specified deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 175 | deadline, hasDeadline := ctx.Deadline() |
| 176 | if !hasDeadline { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 177 | // Default deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 178 | deadline = time.Now().Add(defaultCallTimeout) |
| 179 | } |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 180 | if r, ok := getRetryTimeoutOpt(opts); ok { |
| 181 | // Caller specified deadline. |
| 182 | deadline = time.Now().Add(time.Duration(r)) |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 183 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 184 | var lastErr verror.E |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 185 | for retries := 0; ; retries++ { |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 186 | if retries != 0 { |
| 187 | if !backoff(retries, deadline) { |
| 188 | break |
| 189 | } |
| 190 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 191 | call, err := c.startCall(ctx, name, method, args, opts...) |
| 192 | if err == nil { |
| 193 | return call, nil |
| 194 | } |
| 195 | lastErr = err |
Nicolas LaCasse | 27f7041 | 2014-10-03 16:51:55 -0700 | [diff] [blame] | 196 | if time.Now().After(deadline) || !retriable(err) { |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 197 | break |
| 198 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 199 | } |
| 200 | return nil, lastErr |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 201 | } |
| 202 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 203 | func getNoResolveOpt(opts []ipc.CallOpt) bool { |
| 204 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 205 | if r, ok := o.(options.NoResolve); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 206 | return bool(r) |
| 207 | } |
| 208 | } |
| 209 | return false |
| 210 | } |
| 211 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 212 | // startCall ensures StartCall always returns verror.E. |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 213 | func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) { |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 214 | if ctx == nil { |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 215 | return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method) |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 216 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 217 | ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method)) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 218 | // Resolve name unless told not to. |
| 219 | var servers []string |
| 220 | if getNoResolveOpt(opts) { |
| 221 | servers = []string{name} |
| 222 | } else { |
| 223 | var err error |
| 224 | if servers, err = c.ns.Resolve(ctx, name); err != nil { |
| 225 | return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err) |
| 226 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 227 | } |
| 228 | // Try all servers, and if none of them are authorized for the call then return the error of the last server |
| 229 | // that was tried. |
| 230 | var lastErr verror.E |
| 231 | for _, server := range servers { |
| 232 | flow, suffix, err := c.connectFlow(server) |
| 233 | if err != nil { |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 234 | lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 235 | vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err) |
| 236 | continue // Try the next server. |
| 237 | } |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 238 | flow.SetDeadline(ctx.Done()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 239 | |
| 240 | // Validate caveats on the server's identity for the context associated with this call. |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 241 | serverB, grantedB, err := c.authorizeServer(flow, name, suffix, method, opts) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 242 | if err != nil { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 243 | lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err) |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 244 | flow.Close() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 245 | continue |
| 246 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 247 | // Fetch any discharges for third-party caveats on the client's blessings. |
| 248 | var discharges []security.Discharge |
| 249 | if self := flow.LocalBlessings(); self != nil { |
| 250 | if tpcavs := self.ThirdPartyCaveats(); len(tpcavs) > 0 { |
| 251 | discharges = c.prepareDischarges(ctx, tpcavs, mkDischargeImpetus(serverB, method, args), opts) |
| 252 | } |
| 253 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 254 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 255 | lastErr = nil |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 256 | fc := newFlowClient(ctx, serverB, flow, &c.dischargeCache, discharges) |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 257 | |
Matt Rosencrantz | a9036db | 2014-09-26 12:52:58 -0700 | [diff] [blame] | 258 | if doneChan := ctx.Done(); doneChan != nil { |
| 259 | go func() { |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 260 | select { |
| 261 | case <-ctx.Done(): |
| 262 | fc.Cancel() |
| 263 | case <-fc.flow.Closed(): |
| 264 | } |
Matt Rosencrantz | a9036db | 2014-09-26 12:52:58 -0700 | [diff] [blame] | 265 | }() |
| 266 | } |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 267 | |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 268 | timeout := time.Duration(ipc.NoTimeout) |
| 269 | if deadline, hasDeadline := ctx.Deadline(); hasDeadline { |
| 270 | timeout = deadline.Sub(time.Now()) |
| 271 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 272 | if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 273 | return nil, verr |
| 274 | } |
| 275 | return fc, nil |
| 276 | } |
| 277 | if lastErr != nil { |
David Why Use Two When One Will Do Presotto | f3f39ae | 2014-08-27 11:13:27 -0700 | [diff] [blame] | 278 | // If there was any problem starting the call, flush the cache entry under the |
| 279 | // assumption that it was caused by stale data. |
| 280 | c.ns.FlushCacheEntry(name) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 281 | return nil, lastErr |
| 282 | } |
| 283 | return nil, errNoServers |
| 284 | } |
| 285 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 286 | // authorizeServer validates that the server (remote end of flow) has the credentials to serve |
| 287 | // the RPC name.method for the client (local end of the flow). It returns the blessings at the |
| 288 | // server that are authorized for this purpose and any blessings that are to be granted to |
| 289 | // the server (via ipc.Granter implementations in opts.) |
| 290 | func (c *client) authorizeServer(flow stream.Flow, name, suffix, method string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) { |
| 291 | if flow.RemoteID() == nil && flow.RemoteBlessings() == nil { |
| 292 | return nil, nil, fmt.Errorf("server has not presented any blessings") |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 293 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 294 | authctx := isecurity.NewContext(isecurity.ContextArgs{ |
| 295 | LocalID: flow.LocalID(), |
| 296 | RemoteID: flow.RemoteID(), |
| 297 | Debug: "ClientAuthorizingServer", |
| 298 | LocalPrincipal: flow.LocalPrincipal(), |
| 299 | LocalBlessings: flow.LocalBlessings(), |
| 300 | RemoteBlessings: flow.RemoteBlessings(), |
| 301 | /* TODO(ashankar,ataly): Uncomment this! This is disabled till the hack to skip third-party caveat |
| 302 | validation on a server's blessings are disabled. Commenting out the next three lines affects more |
| 303 | than third-party caveats, so yeah, have to remove this soon! |
| 304 | Method: method, |
| 305 | Name: name, |
| 306 | Suffix: suffix, */ |
| 307 | LocalEndpoint: flow.LocalEndpoint(), |
| 308 | RemoteEndpoint: flow.RemoteEndpoint(), |
| 309 | }) |
| 310 | if serverID := flow.RemoteID(); flow.RemoteBlessings() == nil && serverID != nil { |
| 311 | serverID, err = serverID.Authorize(authctx) |
| 312 | if err != nil { |
| 313 | return nil, nil, err |
| 314 | } |
| 315 | serverBlessings = serverID.Names() |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 316 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 317 | if server := flow.RemoteBlessings(); server != nil { |
| 318 | serverBlessings = server.ForContext(authctx) |
| 319 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 320 | for _, o := range opts { |
| 321 | switch v := o.(type) { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 322 | case options.RemoteID: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 323 | if !security.BlessingPattern(v).MatchedBy(serverBlessings...) { |
| 324 | return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, v) |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 325 | } |
| 326 | case ipc.Granter: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 327 | if b, err := v.Grant(flow.RemoteBlessings()); err != nil { |
| 328 | return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err) |
| 329 | } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil { |
| 330 | return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err) |
| 331 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 332 | } |
| 333 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 334 | return serverBlessings, grantedBlessings, nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 335 | } |
| 336 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 337 | func (c *client) Close() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 338 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 339 | c.vcMapMu.Lock() |
| 340 | for _, v := range c.vcMap { |
| 341 | c.streamMgr.ShutdownEndpoint(v.remoteEP) |
| 342 | } |
| 343 | c.vcMap = nil |
| 344 | c.vcMapMu.Unlock() |
| 345 | } |
| 346 | |
| 347 | // IPCBindOpt makes client implement BindOpt. |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 348 | func (c *client) IPCBindOpt() { |
| 349 | //nologcall |
| 350 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 351 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 352 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 353 | // flow that's already connected to the server. |
| 354 | type flowClient struct { |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 355 | ctx context.T // context to annotate with call details |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 356 | dec *vom.Decoder // to decode responses and results from the server |
| 357 | enc *vom.Encoder // to encode requests and args to the server |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 358 | server []string // Blessings bound to the server that authorize it to receive the IPC request from the client. |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 359 | flow stream.Flow // the underlying flow |
| 360 | response ipc.Response // each decoded response message is kept here |
| 361 | |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 362 | discharges []security.Discharge // discharges used for this request |
| 363 | dischargeCache *dischargeCache // client-global discharge cache reference type |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 364 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 365 | sendClosedMu sync.Mutex |
| 366 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 367 | |
| 368 | finished bool // has Finish() already been called? |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 369 | } |
| 370 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 371 | var _ ipc.Call = (*flowClient)(nil) |
| 372 | var _ ipc.Stream = (*flowClient)(nil) |
| 373 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 374 | func newFlowClient(ctx context.T, server []string, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 375 | return &flowClient{ |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 376 | ctx: ctx, |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 377 | dec: vom.NewDecoder(flow), |
| 378 | enc: vom.NewEncoder(flow), |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 379 | server: server, |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 380 | flow: flow, |
| 381 | discharges: discharges, |
| 382 | dischargeCache: dischargeCache, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 383 | } |
| 384 | } |
| 385 | |
| 386 | func (fc *flowClient) close(verr verror.E) verror.E { |
| 387 | if err := fc.flow.Close(); err != nil && verr == nil { |
| 388 | verr = verror.Internalf("ipc: flow close failed: %v", err) |
| 389 | } |
| 390 | return verr |
| 391 | } |
| 392 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 393 | func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 394 | req := ipc.Request{ |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 395 | Suffix: suffix, |
| 396 | Method: method, |
| 397 | NumPosArgs: uint64(len(args)), |
| 398 | Timeout: int64(timeout), |
| 399 | GrantedBlessings: security.MarshalBlessings(blessings), |
| 400 | NumDischarges: uint64(len(fc.discharges)), |
| 401 | TraceRequest: vtrace.Request(fc.ctx), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 402 | } |
| 403 | if err := fc.enc.Encode(req); err != nil { |
| 404 | return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err)) |
| 405 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 406 | for _, d := range fc.discharges { |
| 407 | if err := fc.enc.Encode(d); err != nil { |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 408 | return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err)) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 409 | } |
| 410 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 411 | for ix, arg := range args { |
| 412 | if err := fc.enc.Encode(arg); err != nil { |
| 413 | return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err)) |
| 414 | } |
| 415 | } |
| 416 | return nil |
| 417 | } |
| 418 | |
| 419 | func (fc *flowClient) Send(item interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 420 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 421 | if fc.sendClosed { |
| 422 | return errFlowClosed |
| 423 | } |
| 424 | |
| 425 | // The empty request header indicates what follows is a streaming arg. |
| 426 | if err := fc.enc.Encode(ipc.Request{}); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 427 | return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 428 | } |
| 429 | if err := fc.enc.Encode(item); err != nil { |
| 430 | return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err)) |
| 431 | } |
| 432 | return nil |
| 433 | } |
| 434 | |
| 435 | func (fc *flowClient) Recv(itemptr interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 436 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 437 | switch { |
| 438 | case fc.response.Error != nil: |
| 439 | return fc.response.Error |
| 440 | case fc.response.EndStreamResults: |
| 441 | return io.EOF |
| 442 | } |
| 443 | |
| 444 | // Decode the response header and handle errors and EOF. |
| 445 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 446 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 447 | } |
| 448 | if fc.response.Error != nil { |
| 449 | return fc.response.Error |
| 450 | } |
| 451 | if fc.response.EndStreamResults { |
| 452 | // Return EOF to indicate to the caller that there are no more stream |
| 453 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 454 | // returned to the user in Finish. |
| 455 | return io.EOF |
| 456 | } |
| 457 | // Decode the streaming result. |
| 458 | if err := fc.dec.Decode(itemptr); err != nil { |
| 459 | return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err)) |
| 460 | } |
| 461 | return nil |
| 462 | } |
| 463 | |
| 464 | func (fc *flowClient) CloseSend() error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 465 | defer vlog.LogCall()() |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 466 | return fc.closeSend() |
| 467 | } |
| 468 | |
| 469 | // closeSend ensures CloseSend always returns verror.E. |
| 470 | func (fc *flowClient) closeSend() verror.E { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 471 | fc.sendClosedMu.Lock() |
| 472 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 473 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 474 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 475 | } |
| 476 | if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 477 | // TODO(caprita): Indiscriminately closing the flow below causes |
| 478 | // a race as described in: |
| 479 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 480 | // |
| 481 | // There should be a finer grained way to fix this (for example, |
| 482 | // encoding errors should probably still result in closing the |
| 483 | // flow); on the flip side, there may exist other instances |
| 484 | // where we are closing the flow but should not. |
| 485 | // |
| 486 | // For now, commenting out the line below removes the flakiness |
| 487 | // from our existing unit tests, but this needs to be revisited |
| 488 | // and fixed correctly. |
| 489 | // |
| 490 | // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 491 | } |
| 492 | fc.sendClosed = true |
| 493 | return nil |
| 494 | } |
| 495 | |
| 496 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 497 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 498 | err := fc.finish(resultptrs...) |
| 499 | vtrace.FromContext(fc.ctx).Annotate("Finished") |
| 500 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 501 | } |
| 502 | |
| 503 | // finish ensures Finish always returns verror.E. |
| 504 | func (fc *flowClient) finish(resultptrs ...interface{}) verror.E { |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 505 | if fc.finished { |
| 506 | return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed")) |
| 507 | } |
| 508 | fc.finished = true |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 509 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 510 | // three cases: |
| 511 | // 1) Server is blocked on Recv waiting for the final request message. |
| 512 | // 2) Server has already finished processing, the final response message and |
| 513 | // out args are queued up on the client, and the flow is closed. |
| 514 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 515 | // response and args aren't queued up yet, and the flow isn't closed. |
| 516 | // |
| 517 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 518 | // we'll deadlock with both client and server waiting for each other. We must |
| 519 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 520 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 521 | // always return an error. But this isn't a "real" error; the client should |
| 522 | // read the rest of the results and succeed. |
| 523 | _ = fc.closeSend() |
| 524 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 525 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 526 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 527 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 528 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 529 | } |
| 530 | // The response header must indicate the streaming results have ended. |
| 531 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 532 | return fc.close(errRemainingStreamResults) |
| 533 | } |
| 534 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 535 | |
| 536 | // Incorporate any VTrace info that was returned. |
| 537 | vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse) |
| 538 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 539 | if fc.response.Error != nil { |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 540 | if verror.Is(fc.response.Error, verror.NoAccess) && fc.dischargeCache != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 541 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 542 | // with retrying again and again with this discharge. As there is no direct way |
| 543 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 544 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Ankur | 57444f3 | 2014-08-13 11:03:39 -0700 | [diff] [blame] | 545 | vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 546 | fc.dischargeCache.Invalidate(fc.discharges...) |
| 547 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 548 | return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error)) |
| 549 | } |
| 550 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
Cosmos Nicolaou | 9c9918d | 2014-09-23 08:45:56 -0700 | [diff] [blame] | 551 | return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 552 | } |
| 553 | for ix, r := range resultptrs { |
| 554 | if err := fc.dec.Decode(r); err != nil { |
| 555 | return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err)) |
| 556 | } |
| 557 | } |
| 558 | return fc.close(nil) |
| 559 | } |
| 560 | |
| 561 | func (fc *flowClient) Cancel() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 562 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 563 | vtrace.FromContext(fc.ctx).Annotate("Cancelled") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 564 | fc.flow.Cancel() |
| 565 | } |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 566 | |
| 567 | func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 568 | return fc.server, fc.flow.RemoteBlessings() |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 569 | } |