Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 4 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | "io" |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 6 | "math" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 7 | "math/rand" |
| 8 | "strings" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 9 | "sync" |
| 10 | "time" |
| 11 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 12 | "veyron.io/veyron/veyron/runtimes/google/ipc/version" |
| 13 | inaming "veyron.io/veyron/veyron/runtimes/google/naming" |
Asim Shankar | 5c576c4 | 2014-10-01 12:19:12 -0700 | [diff] [blame] | 14 | isecurity "veyron.io/veyron/veyron/runtimes/google/security" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 15 | "veyron.io/veyron/veyron/runtimes/google/vtrace" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 16 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 17 | "veyron.io/veyron/veyron2/context" |
| 18 | "veyron.io/veyron/veyron2/ipc" |
| 19 | "veyron.io/veyron/veyron2/ipc/stream" |
| 20 | "veyron.io/veyron/veyron2/naming" |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 21 | "veyron.io/veyron/veyron2/options" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 22 | "veyron.io/veyron/veyron2/security" |
| 23 | "veyron.io/veyron/veyron2/verror" |
| 24 | "veyron.io/veyron/veyron2/vlog" |
| 25 | "veyron.io/veyron/veyron2/vom" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 26 | ) |
| 27 | |
| 28 | var ( |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 29 | errNoServers = verror.NoExistf("ipc: no servers") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 30 | errFlowClosed = verror.Abortedf("ipc: flow closed") |
| 31 | errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results") |
| 32 | errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name") |
| 33 | ) |
| 34 | |
| 35 | type client struct { |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 36 | streamMgr stream.Manager |
| 37 | ns naming.Namespace |
| 38 | vcOpts []stream.VCOpt // vc opts passed to dial |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 39 | |
| 40 | // We support concurrent calls to StartCall and Close, so we must protect the |
| 41 | // vcMap. Everything else is initialized upon client construction, and safe |
| 42 | // to use concurrently. |
| 43 | vcMapMu sync.Mutex |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 44 | // TODO(ashankar): The key should be a function of the blessings shared with the server? |
| 45 | vcMap map[string]*vcInfo // map key is endpoint.String |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 46 | |
| 47 | dischargeCache dischargeCache |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 48 | } |
| 49 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 50 | var _ ipc.Client = (*client)(nil) |
| 51 | var _ ipc.BindOpt = (*client)(nil) |
| 52 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 53 | type vcInfo struct { |
| 54 | vc stream.VC |
| 55 | remoteEP naming.Endpoint |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 56 | } |
| 57 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 58 | func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 59 | c := &client{ |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 60 | streamMgr: streamMgr, |
| 61 | ns: ns, |
| 62 | vcMap: make(map[string]*vcInfo), |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 63 | dischargeCache: dischargeCache{cache: make(map[string]security.Discharge)}, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 64 | } |
| 65 | for _, opt := range opts { |
| 66 | // Collect all client opts that are also vc opts. |
| 67 | if vcOpt, ok := opt.(stream.VCOpt); ok { |
| 68 | c.vcOpts = append(c.vcOpts, vcOpt) |
| 69 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 70 | } |
| 71 | return c, nil |
| 72 | } |
| 73 | |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 74 | func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 75 | c.vcMapMu.Lock() |
| 76 | defer c.vcMapMu.Unlock() |
| 77 | if vcinfo := c.vcMap[ep.String()]; vcinfo != nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 78 | if flow, err := vcinfo.vc.Connect(); err == nil { |
| 79 | return flow, nil |
| 80 | } |
| 81 | // If the vc fails to establish a new flow, we assume it's |
| 82 | // broken, remove it from the map, and proceed to establishing |
| 83 | // a new vc. |
| 84 | // TODO(caprita): Should we distinguish errors due to vc being |
| 85 | // closed from other errors? If not, should we call vc.Close() |
| 86 | // before removing the vc from the map? |
| 87 | delete(c.vcMap, ep.String()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 88 | } |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 89 | c.vcMapMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 90 | vc, err := c.streamMgr.Dial(ep, c.vcOpts...) |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 91 | c.vcMapMu.Lock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 92 | if err != nil { |
| 93 | return nil, err |
| 94 | } |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 95 | if othervc, exists := c.vcMap[ep.String()]; exists { |
| 96 | vc = othervc.vc |
| 97 | // TODO(ashankar,toddw): Figure out how to close up the VC that |
| 98 | // is discarded. vc.Close? |
| 99 | } else { |
| 100 | c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep} |
| 101 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 102 | return vc.Connect() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 103 | } |
| 104 | |
| 105 | // connectFlow parses an endpoint and a suffix out of the server and establishes |
| 106 | // a flow to the endpoint, returning the parsed suffix. |
| 107 | // The server name passed in should be a rooted name, of the form "/ep/suffix" or |
| 108 | // "/ep//suffix", or just "/ep". |
| 109 | func (c *client) connectFlow(server string) (stream.Flow, string, error) { |
| 110 | address, suffix := naming.SplitAddressName(server) |
| 111 | if len(address) == 0 { |
| 112 | return nil, "", errNonRootedName |
| 113 | } |
| 114 | ep, err := inaming.NewEndpoint(address) |
| 115 | if err != nil { |
| 116 | return nil, "", err |
| 117 | } |
| 118 | if err = version.CheckCompatibility(ep); err != nil { |
| 119 | return nil, "", err |
| 120 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 121 | flow, err := c.createFlow(ep) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 122 | if err != nil { |
| 123 | return nil, "", err |
| 124 | } |
| 125 | return flow, suffix, nil |
| 126 | } |
| 127 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 128 | // A randomized exponential backoff. The randomness deters error convoys from forming. |
| 129 | func backoff(n int, deadline time.Time) bool { |
| 130 | b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second)) |
| 131 | if b > maxBackoff { |
| 132 | b = maxBackoff |
| 133 | } |
| 134 | r := deadline.Sub(time.Now()) |
| 135 | if b > r { |
| 136 | // We need to leave a little time for the call to start or |
| 137 | // we'll just timeout in startCall before we actually do |
| 138 | // anything. If we just have a millisecond left, give up. |
| 139 | if r <= time.Millisecond { |
| 140 | return false |
| 141 | } |
| 142 | b = r - time.Millisecond |
| 143 | } |
| 144 | time.Sleep(b) |
| 145 | return true |
| 146 | } |
| 147 | |
| 148 | // TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack. |
| 149 | func retriable(err error) bool { |
| 150 | e := err.Error() |
| 151 | // Authentication errors are permanent. |
| 152 | if strings.Contains(e, "authorized") { |
| 153 | return false |
| 154 | } |
| 155 | // Resolution errors are retriable. |
| 156 | if strings.Contains(e, "ipc: Resolve") { |
| 157 | return true |
| 158 | } |
| 159 | // Kernel level errors are retriable. |
| 160 | if strings.Contains(e, "errno") { |
| 161 | return true |
| 162 | } |
Matt Rosencrantz | 0c4032e | 2014-09-23 12:43:24 -0700 | [diff] [blame] | 163 | // Connection refused is retriable. |
| 164 | if strings.Contains(e, "connection refused") { |
| 165 | return true |
| 166 | } |
| 167 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 168 | return false |
| 169 | } |
| 170 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 171 | func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) { |
| 172 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 173 | if r, ok := o.(options.RetryTimeout); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 174 | return time.Duration(r), true |
| 175 | } |
| 176 | } |
| 177 | return 0, false |
| 178 | } |
| 179 | |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 180 | func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 181 | defer vlog.LogCall()() |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 182 | // Context specified deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 183 | deadline, hasDeadline := ctx.Deadline() |
| 184 | if !hasDeadline { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 185 | // Default deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 186 | deadline = time.Now().Add(defaultCallTimeout) |
| 187 | } |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 188 | if r, ok := getRetryTimeoutOpt(opts); ok { |
| 189 | // Caller specified deadline. |
| 190 | deadline = time.Now().Add(time.Duration(r)) |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 191 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 192 | var lastErr verror.E |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 193 | for retries := 0; ; retries++ { |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 194 | if retries != 0 { |
| 195 | if !backoff(retries, deadline) { |
| 196 | break |
| 197 | } |
| 198 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 199 | call, err := c.startCall(ctx, name, method, args, opts...) |
| 200 | if err == nil { |
| 201 | return call, nil |
| 202 | } |
| 203 | lastErr = err |
Nicolas LaCasse | 27f7041 | 2014-10-03 16:51:55 -0700 | [diff] [blame] | 204 | if time.Now().After(deadline) || !retriable(err) { |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 205 | break |
| 206 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 207 | } |
| 208 | return nil, lastErr |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 209 | } |
| 210 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 211 | func getNoResolveOpt(opts []ipc.CallOpt) bool { |
| 212 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 213 | if r, ok := o.(options.NoResolve); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 214 | return bool(r) |
| 215 | } |
| 216 | } |
| 217 | return false |
| 218 | } |
| 219 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 220 | // startCall ensures StartCall always returns verror.E. |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 221 | func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) { |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 222 | if ctx == nil { |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 223 | return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method) |
Matt Rosencrantz | 7e68d5a | 2014-06-11 15:28:51 +0000 | [diff] [blame] | 224 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 225 | ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method)) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 226 | // Resolve name unless told not to. |
| 227 | var servers []string |
| 228 | if getNoResolveOpt(opts) { |
| 229 | servers = []string{name} |
| 230 | } else { |
| 231 | var err error |
| 232 | if servers, err = c.ns.Resolve(ctx, name); err != nil { |
| 233 | return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err) |
| 234 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 235 | } |
| 236 | // Try all servers, and if none of them are authorized for the call then return the error of the last server |
| 237 | // that was tried. |
| 238 | var lastErr verror.E |
| 239 | for _, server := range servers { |
| 240 | flow, suffix, err := c.connectFlow(server) |
| 241 | if err != nil { |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 242 | lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 243 | vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err) |
| 244 | continue // Try the next server. |
| 245 | } |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 246 | flow.SetDeadline(ctx.Done()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 247 | |
Suharsh Sivakumar | cd743f7 | 2014-10-27 10:03:42 -0700 | [diff] [blame] | 248 | var serverB []string |
| 249 | var grantedB security.Blessings |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 250 | var discharges []security.Discharge |
Suharsh Sivakumar | cd743f7 | 2014-10-27 10:03:42 -0700 | [diff] [blame] | 251 | |
| 252 | // LocalPrincipal is nil means that the client wanted to avoid authentication, |
| 253 | // and thus wanted to skip authorization as well. |
Suharsh Sivakumar | 223f536 | 2014-10-27 14:21:49 -0700 | [diff] [blame] | 254 | // TODO(suharshs,ataly,ashankar): Remove flow.LocalID() after the old security model is dead. |
| 255 | if flow.LocalPrincipal() != nil || flow.LocalID() != nil { |
Suharsh Sivakumar | cd743f7 | 2014-10-27 10:03:42 -0700 | [diff] [blame] | 256 | // Validate caveats on the server's identity for the context associated with this call. |
| 257 | if serverB, grantedB, err = c.authorizeServer(flow, name, suffix, method, opts); err != nil { |
| 258 | lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err) |
| 259 | flow.Close() |
| 260 | continue |
| 261 | } |
| 262 | // Fetch any discharges for third-party caveats on the client's blessings. |
| 263 | if self := flow.LocalBlessings(); self != nil { |
| 264 | if tpcavs := self.ThirdPartyCaveats(); len(tpcavs) > 0 { |
| 265 | discharges = c.prepareDischarges(ctx, tpcavs, mkDischargeImpetus(serverB, method, args), opts) |
| 266 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 267 | } |
| 268 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 269 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 270 | lastErr = nil |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 271 | fc := newFlowClient(ctx, serverB, flow, &c.dischargeCache, discharges) |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 272 | |
Matt Rosencrantz | a9036db | 2014-09-26 12:52:58 -0700 | [diff] [blame] | 273 | if doneChan := ctx.Done(); doneChan != nil { |
| 274 | go func() { |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 275 | select { |
| 276 | case <-ctx.Done(): |
| 277 | fc.Cancel() |
| 278 | case <-fc.flow.Closed(): |
| 279 | } |
Matt Rosencrantz | a9036db | 2014-09-26 12:52:58 -0700 | [diff] [blame] | 280 | }() |
| 281 | } |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 282 | |
Matt Rosencrantz | 8689793 | 2014-10-02 09:34:34 -0700 | [diff] [blame] | 283 | timeout := time.Duration(ipc.NoTimeout) |
| 284 | if deadline, hasDeadline := ctx.Deadline(); hasDeadline { |
| 285 | timeout = deadline.Sub(time.Now()) |
| 286 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 287 | if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 288 | return nil, verr |
| 289 | } |
| 290 | return fc, nil |
| 291 | } |
| 292 | if lastErr != nil { |
David Why Use Two When One Will Do Presotto | f3f39ae | 2014-08-27 11:13:27 -0700 | [diff] [blame] | 293 | // If there was any problem starting the call, flush the cache entry under the |
| 294 | // assumption that it was caused by stale data. |
| 295 | c.ns.FlushCacheEntry(name) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 296 | return nil, lastErr |
| 297 | } |
| 298 | return nil, errNoServers |
| 299 | } |
| 300 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 301 | // authorizeServer validates that the server (remote end of flow) has the credentials to serve |
| 302 | // the RPC name.method for the client (local end of the flow). It returns the blessings at the |
| 303 | // server that are authorized for this purpose and any blessings that are to be granted to |
| 304 | // the server (via ipc.Granter implementations in opts.) |
| 305 | func (c *client) authorizeServer(flow stream.Flow, name, suffix, method string, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) { |
| 306 | if flow.RemoteID() == nil && flow.RemoteBlessings() == nil { |
| 307 | return nil, nil, fmt.Errorf("server has not presented any blessings") |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 308 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 309 | authctx := isecurity.NewContext(isecurity.ContextArgs{ |
| 310 | LocalID: flow.LocalID(), |
| 311 | RemoteID: flow.RemoteID(), |
| 312 | Debug: "ClientAuthorizingServer", |
| 313 | LocalPrincipal: flow.LocalPrincipal(), |
| 314 | LocalBlessings: flow.LocalBlessings(), |
| 315 | RemoteBlessings: flow.RemoteBlessings(), |
| 316 | /* TODO(ashankar,ataly): Uncomment this! This is disabled till the hack to skip third-party caveat |
| 317 | validation on a server's blessings are disabled. Commenting out the next three lines affects more |
| 318 | than third-party caveats, so yeah, have to remove this soon! |
| 319 | Method: method, |
| 320 | Name: name, |
| 321 | Suffix: suffix, */ |
| 322 | LocalEndpoint: flow.LocalEndpoint(), |
| 323 | RemoteEndpoint: flow.RemoteEndpoint(), |
| 324 | }) |
| 325 | if serverID := flow.RemoteID(); flow.RemoteBlessings() == nil && serverID != nil { |
| 326 | serverID, err = serverID.Authorize(authctx) |
| 327 | if err != nil { |
| 328 | return nil, nil, err |
| 329 | } |
| 330 | serverBlessings = serverID.Names() |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 331 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 332 | if server := flow.RemoteBlessings(); server != nil { |
| 333 | serverBlessings = server.ForContext(authctx) |
| 334 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 335 | for _, o := range opts { |
| 336 | switch v := o.(type) { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 337 | case options.RemoteID: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 338 | if !security.BlessingPattern(v).MatchedBy(serverBlessings...) { |
| 339 | return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, v) |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 340 | } |
| 341 | case ipc.Granter: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 342 | if b, err := v.Grant(flow.RemoteBlessings()); err != nil { |
| 343 | return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err) |
| 344 | } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil { |
| 345 | return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err) |
| 346 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 347 | } |
| 348 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 349 | return serverBlessings, grantedBlessings, nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 350 | } |
| 351 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 352 | func (c *client) Close() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 353 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 354 | c.vcMapMu.Lock() |
| 355 | for _, v := range c.vcMap { |
| 356 | c.streamMgr.ShutdownEndpoint(v.remoteEP) |
| 357 | } |
| 358 | c.vcMap = nil |
| 359 | c.vcMapMu.Unlock() |
| 360 | } |
| 361 | |
| 362 | // IPCBindOpt makes client implement BindOpt. |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 363 | func (c *client) IPCBindOpt() { |
| 364 | //nologcall |
| 365 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 366 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 367 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 368 | // flow that's already connected to the server. |
| 369 | type flowClient struct { |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 370 | ctx context.T // context to annotate with call details |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 371 | dec *vom.Decoder // to decode responses and results from the server |
| 372 | enc *vom.Encoder // to encode requests and args to the server |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 373 | server []string // Blessings bound to the server that authorize it to receive the IPC request from the client. |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 374 | flow stream.Flow // the underlying flow |
| 375 | response ipc.Response // each decoded response message is kept here |
| 376 | |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 377 | discharges []security.Discharge // discharges used for this request |
| 378 | dischargeCache *dischargeCache // client-global discharge cache reference type |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 379 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 380 | sendClosedMu sync.Mutex |
| 381 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 382 | |
| 383 | finished bool // has Finish() already been called? |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 384 | } |
| 385 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 386 | var _ ipc.Call = (*flowClient)(nil) |
| 387 | var _ ipc.Stream = (*flowClient)(nil) |
| 388 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 389 | func newFlowClient(ctx context.T, server []string, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 390 | return &flowClient{ |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 391 | ctx: ctx, |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 392 | dec: vom.NewDecoder(flow), |
| 393 | enc: vom.NewEncoder(flow), |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 394 | server: server, |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 395 | flow: flow, |
| 396 | discharges: discharges, |
| 397 | dischargeCache: dischargeCache, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 398 | } |
| 399 | } |
| 400 | |
| 401 | func (fc *flowClient) close(verr verror.E) verror.E { |
| 402 | if err := fc.flow.Close(); err != nil && verr == nil { |
| 403 | verr = verror.Internalf("ipc: flow close failed: %v", err) |
| 404 | } |
| 405 | return verr |
| 406 | } |
| 407 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 408 | func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 409 | req := ipc.Request{ |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 410 | Suffix: suffix, |
| 411 | Method: method, |
| 412 | NumPosArgs: uint64(len(args)), |
| 413 | Timeout: int64(timeout), |
| 414 | GrantedBlessings: security.MarshalBlessings(blessings), |
| 415 | NumDischarges: uint64(len(fc.discharges)), |
| 416 | TraceRequest: vtrace.Request(fc.ctx), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 417 | } |
| 418 | if err := fc.enc.Encode(req); err != nil { |
| 419 | return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err)) |
| 420 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 421 | for _, d := range fc.discharges { |
| 422 | if err := fc.enc.Encode(d); err != nil { |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 423 | return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err)) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 424 | } |
| 425 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 426 | for ix, arg := range args { |
| 427 | if err := fc.enc.Encode(arg); err != nil { |
| 428 | return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err)) |
| 429 | } |
| 430 | } |
| 431 | return nil |
| 432 | } |
| 433 | |
| 434 | func (fc *flowClient) Send(item interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 435 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 436 | if fc.sendClosed { |
| 437 | return errFlowClosed |
| 438 | } |
| 439 | |
| 440 | // The empty request header indicates what follows is a streaming arg. |
| 441 | if err := fc.enc.Encode(ipc.Request{}); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 442 | return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 443 | } |
| 444 | if err := fc.enc.Encode(item); err != nil { |
| 445 | return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err)) |
| 446 | } |
| 447 | return nil |
| 448 | } |
| 449 | |
| 450 | func (fc *flowClient) Recv(itemptr interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 451 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 452 | switch { |
| 453 | case fc.response.Error != nil: |
| 454 | return fc.response.Error |
| 455 | case fc.response.EndStreamResults: |
| 456 | return io.EOF |
| 457 | } |
| 458 | |
| 459 | // Decode the response header and handle errors and EOF. |
| 460 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 461 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 462 | } |
| 463 | if fc.response.Error != nil { |
| 464 | return fc.response.Error |
| 465 | } |
| 466 | if fc.response.EndStreamResults { |
| 467 | // Return EOF to indicate to the caller that there are no more stream |
| 468 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 469 | // returned to the user in Finish. |
| 470 | return io.EOF |
| 471 | } |
| 472 | // Decode the streaming result. |
| 473 | if err := fc.dec.Decode(itemptr); err != nil { |
| 474 | return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err)) |
| 475 | } |
| 476 | return nil |
| 477 | } |
| 478 | |
| 479 | func (fc *flowClient) CloseSend() error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 480 | defer vlog.LogCall()() |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 481 | return fc.closeSend() |
| 482 | } |
| 483 | |
| 484 | // closeSend ensures CloseSend always returns verror.E. |
| 485 | func (fc *flowClient) closeSend() verror.E { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 486 | fc.sendClosedMu.Lock() |
| 487 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 488 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 489 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 490 | } |
| 491 | if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 492 | // TODO(caprita): Indiscriminately closing the flow below causes |
| 493 | // a race as described in: |
| 494 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 495 | // |
| 496 | // There should be a finer grained way to fix this (for example, |
| 497 | // encoding errors should probably still result in closing the |
| 498 | // flow); on the flip side, there may exist other instances |
| 499 | // where we are closing the flow but should not. |
| 500 | // |
| 501 | // For now, commenting out the line below removes the flakiness |
| 502 | // from our existing unit tests, but this needs to be revisited |
| 503 | // and fixed correctly. |
| 504 | // |
| 505 | // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 506 | } |
| 507 | fc.sendClosed = true |
| 508 | return nil |
| 509 | } |
| 510 | |
| 511 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 512 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 513 | err := fc.finish(resultptrs...) |
Matt Rosencrantz | 1fa3277 | 2014-10-28 11:31:46 -0700 | [diff] [blame^] | 514 | vtrace.FromContext(fc.ctx).Finish() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 515 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 516 | } |
| 517 | |
| 518 | // finish ensures Finish always returns verror.E. |
| 519 | func (fc *flowClient) finish(resultptrs ...interface{}) verror.E { |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 520 | if fc.finished { |
| 521 | return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed")) |
| 522 | } |
| 523 | fc.finished = true |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 524 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 525 | // three cases: |
| 526 | // 1) Server is blocked on Recv waiting for the final request message. |
| 527 | // 2) Server has already finished processing, the final response message and |
| 528 | // out args are queued up on the client, and the flow is closed. |
| 529 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 530 | // response and args aren't queued up yet, and the flow isn't closed. |
| 531 | // |
| 532 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 533 | // we'll deadlock with both client and server waiting for each other. We must |
| 534 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 535 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 536 | // always return an error. But this isn't a "real" error; the client should |
| 537 | // read the rest of the results and succeed. |
| 538 | _ = fc.closeSend() |
| 539 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 540 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 541 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 542 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 543 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 544 | } |
| 545 | // The response header must indicate the streaming results have ended. |
| 546 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 547 | return fc.close(errRemainingStreamResults) |
| 548 | } |
| 549 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 550 | |
| 551 | // Incorporate any VTrace info that was returned. |
| 552 | vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse) |
| 553 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 554 | if fc.response.Error != nil { |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 555 | if verror.Is(fc.response.Error, verror.NoAccess) && fc.dischargeCache != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 556 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 557 | // with retrying again and again with this discharge. As there is no direct way |
| 558 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 559 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Ankur | 57444f3 | 2014-08-13 11:03:39 -0700 | [diff] [blame] | 560 | vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 561 | fc.dischargeCache.Invalidate(fc.discharges...) |
| 562 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 563 | return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error)) |
| 564 | } |
| 565 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
Cosmos Nicolaou | 9c9918d | 2014-09-23 08:45:56 -0700 | [diff] [blame] | 566 | return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 567 | } |
| 568 | for ix, r := range resultptrs { |
| 569 | if err := fc.dec.Decode(r); err != nil { |
| 570 | return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err)) |
| 571 | } |
| 572 | } |
| 573 | return fc.close(nil) |
| 574 | } |
| 575 | |
| 576 | func (fc *flowClient) Cancel() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 577 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 578 | vtrace.FromContext(fc.ctx).Annotate("Cancelled") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 579 | fc.flow.Cancel() |
| 580 | } |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 581 | |
| 582 | func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 583 | return fc.server, fc.flow.RemoteBlessings() |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 584 | } |