Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 4 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | "io" |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 6 | "math" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 7 | "math/rand" |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 8 | "regexp" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 9 | "strings" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 10 | "sync" |
| 11 | "time" |
| 12 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 13 | "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 14 | "veyron.io/veyron/veyron/runtimes/google/ipc/version" |
| 15 | inaming "veyron.io/veyron/veyron/runtimes/google/naming" |
| 16 | "veyron.io/veyron/veyron/runtimes/google/vtrace" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 17 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 18 | "veyron.io/veyron/veyron2/context" |
| 19 | "veyron.io/veyron/veyron2/ipc" |
| 20 | "veyron.io/veyron/veyron2/ipc/stream" |
| 21 | "veyron.io/veyron/veyron2/naming" |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 22 | "veyron.io/veyron/veyron2/options" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 23 | "veyron.io/veyron/veyron2/security" |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 24 | "veyron.io/veyron/veyron2/vdl/vdlutil" |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 25 | "veyron.io/veyron/veyron2/verror" |
| 26 | "veyron.io/veyron/veyron2/vlog" |
| 27 | "veyron.io/veyron/veyron2/vom" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 28 | ) |
| 29 | |
| 30 | var ( |
Tilak Sharma | 492e8e9 | 2014-09-18 10:58:14 -0700 | [diff] [blame] | 31 | errNoServers = verror.NoExistf("ipc: no servers") |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 32 | errNoAccess = verror.NoAccessf("ipc: client unwilling to access to server") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 33 | errFlowClosed = verror.Abortedf("ipc: flow closed") |
| 34 | errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results") |
| 35 | errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name") |
| 36 | ) |
| 37 | |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 38 | var serverPatternRegexp = regexp.MustCompile("^\\[([^\\]]+)\\](.*)") |
| 39 | |
| 40 | // TODO(ribrdb): Flip this to true once everything is updated. |
| 41 | const enableSecureServerAuth = false |
| 42 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 43 | type client struct { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 44 | streamMgr stream.Manager |
| 45 | ns naming.Namespace |
| 46 | vcOpts []stream.VCOpt // vc opts passed to dial |
| 47 | preferredProtocols []string |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 48 | |
| 49 | // We support concurrent calls to StartCall and Close, so we must protect the |
| 50 | // vcMap. Everything else is initialized upon client construction, and safe |
| 51 | // to use concurrently. |
| 52 | vcMapMu sync.Mutex |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 53 | // TODO(ashankar): The key should be a function of the blessings shared with the server? |
| 54 | vcMap map[string]*vcInfo // map key is endpoint.String |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 55 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 56 | dc vc.DischargeClient |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 57 | } |
| 58 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 59 | var _ ipc.Client = (*client)(nil) |
| 60 | var _ ipc.BindOpt = (*client)(nil) |
| 61 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 62 | type vcInfo struct { |
| 63 | vc stream.VC |
| 64 | remoteEP naming.Endpoint |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 65 | } |
| 66 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 67 | func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 68 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 69 | c := &client{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 70 | streamMgr: streamMgr, |
| 71 | ns: ns, |
| 72 | vcMap: make(map[string]*vcInfo), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 73 | } |
| 74 | for _, opt := range opts { |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 75 | if dc, ok := opt.(vc.DischargeClient); ok { |
| 76 | c.dc = dc |
| 77 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 78 | // Collect all client opts that are also vc opts. |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 79 | switch v := opt.(type) { |
| 80 | case stream.VCOpt: |
| 81 | c.vcOpts = append(c.vcOpts, v) |
| 82 | case options.PreferredProtocols: |
| 83 | c.preferredProtocols = v |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 84 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 85 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 86 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 87 | return c, nil |
| 88 | } |
| 89 | |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 90 | func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 91 | c.vcMapMu.Lock() |
| 92 | defer c.vcMapMu.Unlock() |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 93 | if c.vcMap == nil { |
| 94 | return nil, fmt.Errorf("client has been closed") |
| 95 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 96 | if vcinfo := c.vcMap[ep.String()]; vcinfo != nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 97 | if flow, err := vcinfo.vc.Connect(); err == nil { |
| 98 | return flow, nil |
| 99 | } |
| 100 | // If the vc fails to establish a new flow, we assume it's |
| 101 | // broken, remove it from the map, and proceed to establishing |
| 102 | // a new vc. |
| 103 | // TODO(caprita): Should we distinguish errors due to vc being |
| 104 | // closed from other errors? If not, should we call vc.Close() |
| 105 | // before removing the vc from the map? |
| 106 | delete(c.vcMap, ep.String()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 107 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 108 | sm := c.streamMgr |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 109 | c.vcMapMu.Unlock() |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 110 | vc, err := sm.Dial(ep, c.vcOpts...) |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 111 | c.vcMapMu.Lock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 112 | if err != nil { |
| 113 | return nil, err |
| 114 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 115 | if c.vcMap == nil { |
| 116 | sm.ShutdownEndpoint(ep) |
| 117 | return nil, fmt.Errorf("client has been closed") |
| 118 | } |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 119 | if othervc, exists := c.vcMap[ep.String()]; exists { |
| 120 | vc = othervc.vc |
| 121 | // TODO(ashankar,toddw): Figure out how to close up the VC that |
| 122 | // is discarded. vc.Close? |
| 123 | } else { |
| 124 | c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep} |
| 125 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 126 | return vc.Connect() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 127 | } |
| 128 | |
| 129 | // connectFlow parses an endpoint and a suffix out of the server and establishes |
| 130 | // a flow to the endpoint, returning the parsed suffix. |
| 131 | // The server name passed in should be a rooted name, of the form "/ep/suffix" or |
| 132 | // "/ep//suffix", or just "/ep". |
| 133 | func (c *client) connectFlow(server string) (stream.Flow, string, error) { |
| 134 | address, suffix := naming.SplitAddressName(server) |
| 135 | if len(address) == 0 { |
| 136 | return nil, "", errNonRootedName |
| 137 | } |
| 138 | ep, err := inaming.NewEndpoint(address) |
| 139 | if err != nil { |
| 140 | return nil, "", err |
| 141 | } |
| 142 | if err = version.CheckCompatibility(ep); err != nil { |
| 143 | return nil, "", err |
| 144 | } |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 145 | flow, err := c.createFlow(ep) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 146 | if err != nil { |
| 147 | return nil, "", err |
| 148 | } |
| 149 | return flow, suffix, nil |
| 150 | } |
| 151 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 152 | // A randomized exponential backoff. The randomness deters error convoys from forming. |
| 153 | func backoff(n int, deadline time.Time) bool { |
| 154 | b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second)) |
| 155 | if b > maxBackoff { |
| 156 | b = maxBackoff |
| 157 | } |
| 158 | r := deadline.Sub(time.Now()) |
| 159 | if b > r { |
| 160 | // We need to leave a little time for the call to start or |
| 161 | // we'll just timeout in startCall before we actually do |
| 162 | // anything. If we just have a millisecond left, give up. |
| 163 | if r <= time.Millisecond { |
| 164 | return false |
| 165 | } |
| 166 | b = r - time.Millisecond |
| 167 | } |
| 168 | time.Sleep(b) |
| 169 | return true |
| 170 | } |
| 171 | |
| 172 | // TODO(p): replace these checks with m3b's retry bit when it exists. This is currently a colossal hack. |
| 173 | func retriable(err error) bool { |
| 174 | e := err.Error() |
| 175 | // Authentication errors are permanent. |
| 176 | if strings.Contains(e, "authorized") { |
| 177 | return false |
| 178 | } |
| 179 | // Resolution errors are retriable. |
| 180 | if strings.Contains(e, "ipc: Resolve") { |
| 181 | return true |
| 182 | } |
| 183 | // Kernel level errors are retriable. |
| 184 | if strings.Contains(e, "errno") { |
| 185 | return true |
| 186 | } |
Matt Rosencrantz | 0c4032e | 2014-09-23 12:43:24 -0700 | [diff] [blame] | 187 | // Connection refused is retriable. |
| 188 | if strings.Contains(e, "connection refused") { |
| 189 | return true |
| 190 | } |
| 191 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 192 | return false |
| 193 | } |
| 194 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 195 | func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) { |
| 196 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 197 | if r, ok := o.(options.RetryTimeout); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 198 | return time.Duration(r), true |
| 199 | } |
| 200 | } |
| 201 | return 0, false |
| 202 | } |
| 203 | |
Asim Shankar | ddc0c22 | 2014-07-29 15:47:00 -0700 | [diff] [blame] | 204 | func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 205 | defer vlog.LogCall()() |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 206 | return c.startCall(ctx, name, method, args, opts) |
| 207 | } |
| 208 | |
| 209 | func getNoResolveOpt(opts []ipc.CallOpt) bool { |
| 210 | for _, o := range opts { |
| 211 | if r, ok := o.(options.NoResolve); ok { |
| 212 | return bool(r) |
| 213 | } |
| 214 | } |
| 215 | return false |
| 216 | } |
| 217 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 218 | func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) security.DischargeImpetus { |
| 219 | var impetus security.DischargeImpetus |
| 220 | if len(serverBlessings) > 0 { |
| 221 | impetus.Server = make([]security.BlessingPattern, len(serverBlessings)) |
| 222 | for i, b := range serverBlessings { |
| 223 | impetus.Server[i] = security.BlessingPattern(b) |
| 224 | } |
| 225 | } |
| 226 | impetus.Method = method |
| 227 | if len(args) > 0 { |
| 228 | impetus.Arguments = make([]vdlutil.Any, len(args)) |
| 229 | for i, a := range args { |
| 230 | impetus.Arguments[i] = vdlutil.Any(a) |
| 231 | } |
| 232 | } |
| 233 | return impetus |
| 234 | } |
| 235 | |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 236 | // startCall ensures StartCall always returns verror.E. |
| 237 | func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) { |
| 238 | if ctx == nil { |
| 239 | return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method) |
| 240 | } |
| 241 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 242 | // Context specified deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 243 | deadline, hasDeadline := ctx.Deadline() |
| 244 | if !hasDeadline { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 245 | // Default deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 246 | deadline = time.Now().Add(defaultCallTimeout) |
| 247 | } |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 248 | if r, ok := getRetryTimeoutOpt(opts); ok { |
| 249 | // Caller specified deadline. |
| 250 | deadline = time.Now().Add(time.Duration(r)) |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 251 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 252 | var lastErr verror.E |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 253 | for retries := 0; ; retries++ { |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 254 | if retries != 0 { |
| 255 | if !backoff(retries, deadline) { |
| 256 | break |
| 257 | } |
| 258 | } |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 259 | call, err := c.tryCall(ctx, name, method, args, opts) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 260 | if err == nil { |
| 261 | return call, nil |
| 262 | } |
| 263 | lastErr = err |
Nicolas LaCasse | 27f7041 | 2014-10-03 16:51:55 -0700 | [diff] [blame] | 264 | if time.Now().After(deadline) || !retriable(err) { |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 265 | break |
| 266 | } |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 267 | } |
| 268 | return nil, lastErr |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 269 | } |
| 270 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 271 | type serverStatus struct { |
| 272 | index int |
| 273 | suffix string |
| 274 | flow stream.Flow |
| 275 | processed bool |
| 276 | err verror.E |
| 277 | } |
| 278 | |
| 279 | func (c *client) tryServer(index int, server string, ch chan<- *serverStatus, done <-chan struct{}) { |
| 280 | select { |
| 281 | case <-done: |
| 282 | return |
| 283 | default: |
| 284 | } |
| 285 | status := &serverStatus{index: index} |
| 286 | flow, suffix, err := c.connectFlow(server) |
| 287 | if err != nil { |
| 288 | vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err) |
| 289 | status.err = verror.NoExistf("ipc: %q: %s", server, err) |
| 290 | ch <- status |
| 291 | return |
| 292 | } |
| 293 | status.suffix = suffix |
| 294 | status.flow = flow |
| 295 | select { |
| 296 | case <-done: |
| 297 | flow.Close() |
| 298 | default: |
| 299 | ch <- status |
| 300 | } |
| 301 | } |
| 302 | |
| 303 | // tryCall makes a single attempt at a call, against possibly multiple servers. |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 304 | func (c *client) tryCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) { |
Matt Rosencrantz | 3197d6c | 2014-11-06 09:53:22 -0800 | [diff] [blame] | 305 | ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>\"%s\".%s", name, method)) |
Ryan Brown | bc2c87c | 2014-11-17 18:55:25 +0000 | [diff] [blame^] | 306 | _, serverPattern, name := splitObjectName(name) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 307 | // Resolve name unless told not to. |
| 308 | var servers []string |
| 309 | if getNoResolveOpt(opts) { |
| 310 | servers = []string{name} |
| 311 | } else { |
Ryan Brown | bc2c87c | 2014-11-17 18:55:25 +0000 | [diff] [blame^] | 312 | if resolved, err := c.ns.Resolve(ctx, name); err != nil { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 313 | return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 314 | } else { |
| 315 | // An empty set of protocols means all protocols... |
| 316 | ordered, err := filterAndOrderServers(resolved, c.preferredProtocols) |
| 317 | if len(ordered) == 0 { |
| 318 | return nil, verror.NoExistf("ipc: %q: %s", name, err) |
| 319 | } |
| 320 | servers = ordered |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 321 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 322 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 323 | // servers is now orderd by the priority heurestic implemented in |
| 324 | // filterAndOrderServers. |
| 325 | attempts := len(servers) |
| 326 | if attempts == 0 { |
| 327 | return nil, errNoServers |
| 328 | } |
Cosmos Nicolaou | 9388ae4 | 2014-11-10 10:57:15 -0800 | [diff] [blame] | 329 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 330 | // Try to connect to all servers in parallel. |
| 331 | responses := make([]*serverStatus, attempts) |
| 332 | |
| 333 | // Provide sufficient buffering for all of the connections to finish |
| 334 | // instantaneously. This is important because we want to process |
| 335 | // the responses in priority order; that order is indicated by the |
| 336 | // order of entries in servers. So, if two respones come in at the |
| 337 | // same 'instant', we prefer the first in the slice. |
| 338 | ch := make(chan *serverStatus, attempts) |
| 339 | |
| 340 | // Read as many responses as we can before we would block. |
| 341 | gatherResponses := func() { |
| 342 | for { |
| 343 | select { |
| 344 | default: |
| 345 | return |
| 346 | case s := <-ch: |
| 347 | responses[s.index] = s |
Suharsh Sivakumar | cd743f7 | 2014-10-27 10:03:42 -0700 | [diff] [blame] | 348 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 349 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 350 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 351 | |
| 352 | delay := time.Duration(ipc.NoTimeout) |
| 353 | if dl, set := ctx.Deadline(); set { |
| 354 | delay = dl.Sub(time.Now()) |
| 355 | } |
| 356 | timeoutChan := time.After(delay) |
| 357 | |
| 358 | // We'll close this channel when an RPC has been started and we've |
| 359 | // irrevocably selected a server. |
| 360 | done := make(chan struct{}) |
| 361 | // Try all of the servers in parallel. |
| 362 | for i, server := range servers { |
| 363 | go c.tryServer(i, server, ch, done) |
| 364 | } |
| 365 | |
| 366 | select { |
| 367 | case <-timeoutChan: |
| 368 | // All calls failed if we get here. |
| 369 | close(done) |
David Why Use Two When One Will Do Presotto | f3f39ae | 2014-08-27 11:13:27 -0700 | [diff] [blame] | 370 | c.ns.FlushCacheEntry(name) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 371 | return nil, verror.NoExistf("ipc: couldn't connect to server %v", name) |
| 372 | case s := <-ch: |
| 373 | responses[s.index] = s |
| 374 | gatherResponses() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 375 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 376 | |
| 377 | accessErrs := []error{} |
| 378 | connErrs := []error{} |
| 379 | for { |
| 380 | |
| 381 | for _, r := range responses { |
| 382 | if r == nil || r.err != nil { |
| 383 | if r != nil && r.err != nil && !r.processed { |
| 384 | connErrs = append(connErrs, r.err) |
| 385 | r.processed = true |
| 386 | } |
| 387 | continue |
| 388 | } |
| 389 | |
| 390 | flow := r.flow |
| 391 | suffix := r.suffix |
| 392 | flow.SetDeadline(ctx.Done()) |
| 393 | |
| 394 | var ( |
| 395 | serverB []string |
| 396 | grantedB security.Blessings |
| 397 | ) |
| 398 | |
| 399 | // LocalPrincipal is nil means that the client wanted to avoid |
| 400 | // authentication, and thus wanted to skip authorization as well. |
| 401 | if flow.LocalPrincipal() != nil { |
| 402 | // Validate caveats on the server's identity for the context associated with this call. |
| 403 | var err error |
| 404 | if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil { |
| 405 | vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err) |
| 406 | if !r.processed { |
| 407 | accessErrs = append(accessErrs, err) |
| 408 | r.err = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err) |
| 409 | r.processed = true |
| 410 | } |
| 411 | flow.Close() |
| 412 | continue |
| 413 | } |
| 414 | } |
| 415 | |
| 416 | // This is the 'point of no return', so we tell the tryServer |
| 417 | // goroutines to not bother sending us any more flows. |
| 418 | // Once the RPC is started (fc.start below) we can't be sure |
| 419 | // if it makes it to the server or not so, this code will |
| 420 | // never call fc.start more than once to ensure that we |
| 421 | // provide 'at-most-once' rpc semantics at this level. Retrying |
| 422 | // the network connections (i.e. creating flows) is fine since |
| 423 | // we can cleanup that state if we abort a call (i.e. close the |
| 424 | // flow). |
| 425 | close(done) |
| 426 | |
| 427 | fc := newFlowClient(ctx, serverB, flow, c.dc) |
| 428 | |
| 429 | if doneChan := ctx.Done(); doneChan != nil { |
| 430 | go func() { |
| 431 | select { |
| 432 | case <-ctx.Done(): |
| 433 | fc.Cancel() |
| 434 | case <-fc.flow.Closed(): |
| 435 | } |
| 436 | }() |
| 437 | } |
| 438 | |
| 439 | timeout := time.Duration(ipc.NoTimeout) |
| 440 | if deadline, hasDeadline := ctx.Deadline(); hasDeadline { |
| 441 | timeout = deadline.Sub(time.Now()) |
| 442 | } |
| 443 | if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil { |
| 444 | return nil, verr |
| 445 | } |
| 446 | return fc, nil |
| 447 | } |
| 448 | |
| 449 | // Quit if we've seen an error from all parallel connection attempts |
| 450 | handled := 0 |
| 451 | for _, r := range responses { |
| 452 | if r != nil && r.err != nil { |
| 453 | handled++ |
| 454 | } |
| 455 | } |
| 456 | if handled == len(responses) { |
| 457 | break |
| 458 | } |
| 459 | |
| 460 | select { |
| 461 | case <-timeoutChan: |
| 462 | // All remaining calls failed if we get here. |
| 463 | vlog.VI(2).Infof("ipc: couldn't connect to server %v", name) |
| 464 | goto quit |
| 465 | case s := <-ch: |
| 466 | responses[s.index] = s |
| 467 | gatherResponses() |
| 468 | } |
| 469 | } |
| 470 | quit: |
| 471 | close(done) |
| 472 | c.ns.FlushCacheEntry(name) |
| 473 | // TODO(cnicolaou): introduce a third error code here for mixed |
| 474 | // conn/access errors. |
| 475 | return nil, verror.NoExistf("ipc: client failed to invoke %q.%q: on %v", name, method, servers, append(connErrs, accessErrs...)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 476 | } |
| 477 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 478 | // authorizeServer validates that the server (remote end of flow) has the credentials to serve |
| 479 | // the RPC name.method for the client (local end of the flow). It returns the blessings at the |
| 480 | // server that are authorized for this purpose and any blessings that are to be granted to |
| 481 | // the server (via ipc.Granter implementations in opts.) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 482 | func (c *client) authorizeServer(flow stream.Flow, name, method string, serverPattern security.BlessingPattern, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err error) { |
Asim Shankar | 220a015 | 2014-10-30 21:21:09 -0700 | [diff] [blame] | 483 | if flow.RemoteBlessings() == nil { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 484 | return nil, nil, fmt.Errorf("server has not presented any blessings") |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 485 | } |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 486 | ctxt := security.NewContext(&security.ContextParams{ |
| 487 | LocalPrincipal: flow.LocalPrincipal(), |
| 488 | LocalBlessings: flow.LocalBlessings(), |
| 489 | RemoteBlessings: flow.RemoteBlessings(), |
| 490 | LocalEndpoint: flow.LocalEndpoint(), |
| 491 | RemoteEndpoint: flow.RemoteEndpoint(), |
| 492 | RemoteDischarges: flow.RemoteDischarges(), |
| 493 | Method: method, |
| 494 | Name: name}) |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 495 | serverBlessings = flow.RemoteBlessings().ForContext(ctxt) |
| 496 | if serverPattern != "" { |
| 497 | if !serverPattern.MatchedBy(serverBlessings...) { |
| 498 | return nil, nil, fmt.Errorf("server %v does not match the provided pattern %q", serverBlessings, serverPattern) |
| 499 | } |
| 500 | } else if enableSecureServerAuth { |
| 501 | if err := (defaultAuthorizer{}).Authorize(ctxt); err != nil { |
| 502 | return nil, nil, fmt.Errorf("default authorization precludes talking to server %v", serverBlessings) |
| 503 | } |
| 504 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 505 | for _, o := range opts { |
| 506 | switch v := o.(type) { |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 507 | case ipc.Granter: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 508 | if b, err := v.Grant(flow.RemoteBlessings()); err != nil { |
| 509 | return nil, nil, fmt.Errorf("failed to grant blessing to server %v: %v", serverBlessings, err) |
| 510 | } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil { |
| 511 | return nil, nil, fmt.Errorf("failed to add blessing granted to server %v: %v", serverBlessings, err) |
| 512 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 513 | } |
| 514 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 515 | return serverBlessings, grantedBlessings, nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 516 | } |
| 517 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 518 | func (c *client) Close() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 519 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 520 | c.vcMapMu.Lock() |
| 521 | for _, v := range c.vcMap { |
| 522 | c.streamMgr.ShutdownEndpoint(v.remoteEP) |
| 523 | } |
| 524 | c.vcMap = nil |
| 525 | c.vcMapMu.Unlock() |
| 526 | } |
| 527 | |
| 528 | // IPCBindOpt makes client implement BindOpt. |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 529 | func (c *client) IPCBindOpt() { |
| 530 | //nologcall |
| 531 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 532 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 533 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 534 | // flow that's already connected to the server. |
| 535 | type flowClient struct { |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 536 | ctx context.T // context to annotate with call details |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 537 | dec *vom.Decoder // to decode responses and results from the server |
| 538 | enc *vom.Encoder // to encode requests and args to the server |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 539 | server []string // Blessings bound to the server that authorize it to receive the IPC request from the client. |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 540 | flow stream.Flow // the underlying flow |
| 541 | response ipc.Response // each decoded response message is kept here |
| 542 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 543 | discharges []security.Discharge // discharges used for this request |
| 544 | dc vc.DischargeClient // client-global discharge-client |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 545 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 546 | sendClosedMu sync.Mutex |
| 547 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 548 | |
| 549 | finished bool // has Finish() already been called? |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 550 | } |
| 551 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 552 | var _ ipc.Call = (*flowClient)(nil) |
| 553 | var _ ipc.Stream = (*flowClient)(nil) |
| 554 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 555 | func newFlowClient(ctx context.T, server []string, flow stream.Flow, dc vc.DischargeClient) *flowClient { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 556 | return &flowClient{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 557 | ctx: ctx, |
| 558 | dec: vom.NewDecoder(flow), |
| 559 | enc: vom.NewEncoder(flow), |
| 560 | server: server, |
| 561 | flow: flow, |
| 562 | dc: dc, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 563 | } |
| 564 | } |
| 565 | |
| 566 | func (fc *flowClient) close(verr verror.E) verror.E { |
| 567 | if err := fc.flow.Close(); err != nil && verr == nil { |
| 568 | verr = verror.Internalf("ipc: flow close failed: %v", err) |
| 569 | } |
| 570 | return verr |
| 571 | } |
| 572 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 573 | func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E { |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 574 | // Fetch any discharges for third-party caveats on the client's blessings |
| 575 | // if this client owns a discharge-client. |
| 576 | if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil { |
| 577 | fc.discharges = fc.dc.PrepareDischarges(self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args)) |
| 578 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 579 | req := ipc.Request{ |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 580 | Suffix: suffix, |
| 581 | Method: method, |
| 582 | NumPosArgs: uint64(len(args)), |
| 583 | Timeout: int64(timeout), |
| 584 | GrantedBlessings: security.MarshalBlessings(blessings), |
| 585 | NumDischarges: uint64(len(fc.discharges)), |
| 586 | TraceRequest: vtrace.Request(fc.ctx), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 587 | } |
| 588 | if err := fc.enc.Encode(req); err != nil { |
| 589 | return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err)) |
| 590 | } |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 591 | for _, d := range fc.discharges { |
| 592 | if err := fc.enc.Encode(d); err != nil { |
Ankur | f044a8d | 2014-09-05 17:05:24 -0700 | [diff] [blame] | 593 | return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.ID(), err)) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 594 | } |
| 595 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 596 | for ix, arg := range args { |
| 597 | if err := fc.enc.Encode(arg); err != nil { |
| 598 | return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err)) |
| 599 | } |
| 600 | } |
| 601 | return nil |
| 602 | } |
| 603 | |
| 604 | func (fc *flowClient) Send(item interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 605 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 606 | if fc.sendClosed { |
| 607 | return errFlowClosed |
| 608 | } |
| 609 | |
| 610 | // The empty request header indicates what follows is a streaming arg. |
| 611 | if err := fc.enc.Encode(ipc.Request{}); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 612 | return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 613 | } |
| 614 | if err := fc.enc.Encode(item); err != nil { |
| 615 | return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err)) |
| 616 | } |
| 617 | return nil |
| 618 | } |
| 619 | |
| 620 | func (fc *flowClient) Recv(itemptr interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 621 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 622 | switch { |
| 623 | case fc.response.Error != nil: |
| 624 | return fc.response.Error |
| 625 | case fc.response.EndStreamResults: |
| 626 | return io.EOF |
| 627 | } |
| 628 | |
| 629 | // Decode the response header and handle errors and EOF. |
| 630 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 631 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 632 | } |
| 633 | if fc.response.Error != nil { |
| 634 | return fc.response.Error |
| 635 | } |
| 636 | if fc.response.EndStreamResults { |
| 637 | // Return EOF to indicate to the caller that there are no more stream |
| 638 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 639 | // returned to the user in Finish. |
| 640 | return io.EOF |
| 641 | } |
| 642 | // Decode the streaming result. |
| 643 | if err := fc.dec.Decode(itemptr); err != nil { |
| 644 | return fc.close(verror.BadProtocolf("ipc: streaming result decoding failed: %v", err)) |
| 645 | } |
| 646 | return nil |
| 647 | } |
| 648 | |
| 649 | func (fc *flowClient) CloseSend() error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 650 | defer vlog.LogCall()() |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 651 | return fc.closeSend() |
| 652 | } |
| 653 | |
| 654 | // closeSend ensures CloseSend always returns verror.E. |
| 655 | func (fc *flowClient) closeSend() verror.E { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 656 | fc.sendClosedMu.Lock() |
| 657 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 658 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 659 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 660 | } |
| 661 | if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 662 | // TODO(caprita): Indiscriminately closing the flow below causes |
| 663 | // a race as described in: |
| 664 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 665 | // |
| 666 | // There should be a finer grained way to fix this (for example, |
| 667 | // encoding errors should probably still result in closing the |
| 668 | // flow); on the flip side, there may exist other instances |
| 669 | // where we are closing the flow but should not. |
| 670 | // |
| 671 | // For now, commenting out the line below removes the flakiness |
| 672 | // from our existing unit tests, but this needs to be revisited |
| 673 | // and fixed correctly. |
| 674 | // |
| 675 | // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 676 | } |
| 677 | fc.sendClosed = true |
| 678 | return nil |
| 679 | } |
| 680 | |
| 681 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 682 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 683 | err := fc.finish(resultptrs...) |
Matt Rosencrantz | 1fa3277 | 2014-10-28 11:31:46 -0700 | [diff] [blame] | 684 | vtrace.FromContext(fc.ctx).Finish() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 685 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 686 | } |
| 687 | |
| 688 | // finish ensures Finish always returns verror.E. |
| 689 | func (fc *flowClient) finish(resultptrs ...interface{}) verror.E { |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 690 | if fc.finished { |
| 691 | return fc.close(verror.BadProtocolf("ipc: multiple calls to Finish not allowed")) |
| 692 | } |
| 693 | fc.finished = true |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 694 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 695 | // three cases: |
| 696 | // 1) Server is blocked on Recv waiting for the final request message. |
| 697 | // 2) Server has already finished processing, the final response message and |
| 698 | // out args are queued up on the client, and the flow is closed. |
| 699 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 700 | // response and args aren't queued up yet, and the flow isn't closed. |
| 701 | // |
| 702 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 703 | // we'll deadlock with both client and server waiting for each other. We must |
| 704 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 705 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 706 | // always return an error. But this isn't a "real" error; the client should |
| 707 | // read the rest of the results and succeed. |
| 708 | _ = fc.closeSend() |
| 709 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 710 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 711 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 712 | if err := fc.dec.Decode(&fc.response); err != nil { |
Benjamin Prosnitz | 3db40a1 | 2014-06-09 10:10:59 -0700 | [diff] [blame] | 713 | return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 714 | } |
| 715 | // The response header must indicate the streaming results have ended. |
| 716 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 717 | return fc.close(errRemainingStreamResults) |
| 718 | } |
| 719 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 720 | |
| 721 | // Incorporate any VTrace info that was returned. |
| 722 | vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse) |
| 723 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 724 | if fc.response.Error != nil { |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 725 | if verror.Is(fc.response.Error, verror.NoAccess) && fc.dc != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 726 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 727 | // with retrying again and again with this discharge. As there is no direct way |
| 728 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 729 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Ankur | 57444f3 | 2014-08-13 11:03:39 -0700 | [diff] [blame] | 730 | vlog.VI(3).Infof("Discarging %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 731 | fc.dc.Invalidate(fc.discharges...) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 732 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 733 | return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error)) |
| 734 | } |
| 735 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
Cosmos Nicolaou | 9c9918d | 2014-09-23 08:45:56 -0700 | [diff] [blame] | 736 | return fc.close(verror.BadProtocolf("ipc: server sent %d results, client expected %d (%#v)", got, want, resultptrs)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 737 | } |
| 738 | for ix, r := range resultptrs { |
| 739 | if err := fc.dec.Decode(r); err != nil { |
| 740 | return fc.close(verror.BadProtocolf("ipc: result #%d decoding failed: %v", ix, err)) |
| 741 | } |
| 742 | } |
| 743 | return fc.close(nil) |
| 744 | } |
| 745 | |
| 746 | func (fc *flowClient) Cancel() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 747 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 748 | vtrace.FromContext(fc.ctx).Annotate("Cancelled") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 749 | fc.flow.Cancel() |
| 750 | } |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 751 | |
| 752 | func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 753 | return fc.server, fc.flow.RemoteBlessings() |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 754 | } |
Asim Shankar | 220a015 | 2014-10-30 21:21:09 -0700 | [diff] [blame] | 755 | |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 756 | func splitObjectName(name string) (mtPattern, serverPattern security.BlessingPattern, objectName string) { |
| 757 | objectName = name |
| 758 | match := serverPatternRegexp.FindSubmatch([]byte(name)) |
| 759 | if match != nil { |
| 760 | objectName = string(match[2]) |
| 761 | if naming.Rooted(objectName) { |
| 762 | mtPattern = security.BlessingPattern(match[1]) |
| 763 | } else { |
| 764 | serverPattern = security.BlessingPattern(match[1]) |
| 765 | return |
| 766 | } |
| 767 | } |
| 768 | if !naming.Rooted(objectName) { |
| 769 | return |
| 770 | } |
| 771 | |
| 772 | address, relative := naming.SplitAddressName(objectName) |
| 773 | match = serverPatternRegexp.FindSubmatch([]byte(relative)) |
| 774 | if match != nil { |
| 775 | serverPattern = security.BlessingPattern(match[1]) |
| 776 | objectName = naming.JoinAddressName(address, string(match[2])) |
| 777 | } |
| 778 | return |
| 779 | } |