Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 4 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | "io" |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 6 | "math" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 7 | "math/rand" |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 8 | "net" |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 9 | "strings" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 10 | "sync" |
| 11 | "time" |
| 12 | |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 13 | "v.io/core/veyron2/context" |
| 14 | "v.io/core/veyron2/i18n" |
| 15 | "v.io/core/veyron2/ipc" |
| 16 | "v.io/core/veyron2/ipc/stream" |
| 17 | "v.io/core/veyron2/naming" |
| 18 | "v.io/core/veyron2/options" |
| 19 | "v.io/core/veyron2/security" |
Todd Wang | b86b352 | 2015-01-22 13:34:20 -0800 | [diff] [blame] | 20 | "v.io/core/veyron2/vdl" |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 21 | old_verror "v.io/core/veyron2/verror" |
| 22 | verror "v.io/core/veyron2/verror2" |
| 23 | "v.io/core/veyron2/vlog" |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 24 | "v.io/core/veyron2/vom" |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 25 | "v.io/core/veyron2/vtrace" |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 26 | |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 27 | "v.io/core/veyron/runtimes/google/ipc/stream/vc" |
| 28 | "v.io/core/veyron/runtimes/google/ipc/version" |
| 29 | inaming "v.io/core/veyron/runtimes/google/naming" |
| 30 | ivtrace "v.io/core/veyron/runtimes/google/vtrace" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 31 | ) |
| 32 | |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 33 | const pkgPath = "v.io/core/veyron/runtimes/google/ipc" |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 34 | |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 35 | // TODO(cnicolaou): for local errors, automatically assign a new 'id', |
| 36 | // don't use pkgPath etc. Can then move them into being defined on each line |
| 37 | // and not here. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 38 | var ( |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 39 | // Local errs that are used to provide details to the public ones. |
| 40 | errClientCloseAlreadyCalled = verror.Register(pkgPath+".closeAlreadyCalled", verror.NoRetry, |
| 41 | "ipc.Client.Close has already been called") |
| 42 | |
| 43 | errClientFinishAlreadyCalled = verror.Register(pkgPath+".finishAlreadyCalled", verror.NoRetry, "ipc.Call.Finish has already been called") |
| 44 | |
| 45 | errNonRootedName = verror.Register(pkgPath+".nonRootedName", verror.NoRetry, "{3} does not appear to contain an address") |
| 46 | |
| 47 | errInvalidEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an invalid endpoint") |
| 48 | |
| 49 | errIncompatibleEndpoint = verror.Register(pkgPath+".invalidEndpoint", verror.RetryRefetch, "{3} is an incompatible endpoint") |
| 50 | |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 51 | errNotTrusted = verror.Register(pkgPath+".notTrusted", verror.NoRetry, "name {3} not trusted using blessings {4}{:5}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 52 | |
| 53 | errAuthError = verror.Register(pkgPath+".authError", verror.RetryRefetch, "authentication error from server {3}{:4}") |
| 54 | |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 55 | errSystemRetry = verror.Register(pkgPath+".sysErrorRetryConnection", verror.RetryConnection, "{:3:}") |
| 56 | errClosingFlow = verror.Register(pkgPath+".errClosingFlow", verror.NoRetry, "{:3:}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 57 | |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 58 | errVomEncoder = verror.Register(pkgPath+".vomEncoder", verror.NoRetry, "failed to create vom encoder {:3}") |
| 59 | errVomDecoder = verror.Register(pkgPath+".vomDecoder", verror.NoRetry, "failed to create vom decoder {:3}") |
| 60 | |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 61 | errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}") |
| 62 | |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 63 | errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}") |
| 64 | |
| 65 | errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 66 | |
| 67 | errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}") |
| 68 | |
Benjamin Prosnitz | 0db77a2 | 2015-01-20 14:25:15 -0800 | [diff] [blame] | 69 | errMismatchedResults = verror.Register(pkgPath+".mismatchedResults", verror.NoRetry, "got {3} results, but want {4}") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 70 | |
| 71 | errResultDecoding = verror.Register(pkgPath+".resultDecoding", verror.NoRetry, "failed to decode result #{3}{:4}") |
| 72 | |
| 73 | errResponseDecoding = verror.Register(pkgPath+".responseDecoding", verror.NoRetry, "failed to decode response{:3}") |
| 74 | |
| 75 | errRemainingStreamResults = verror.Register(pkgPath+".remaingStreamResults", verror.NoRetry, "stream closed with remaining stream results") |
| 76 | |
| 77 | errNoBlessings = verror.Register(pkgPath+".noBlessings", verror.NoRetry, "server has not presented any blessings") |
| 78 | |
| 79 | errAuthNoPatternMatch = verror.Register(pkgPath+".authNoPatternMatch", |
| 80 | verror.NoRetry, "server blessings {3} do not match pattern {4}") |
| 81 | |
| 82 | errDefaultAuthDenied = verror.Register(pkgPath+".defaultAuthDenied", verror.NoRetry, "default authorization precludes talking to server with blessings{:3}") |
| 83 | |
| 84 | errBlessingGrant = verror.Register(pkgPath+".blessingGrantFailed", verror.NoRetry, "failed to grant blessing to server with blessings {3}{:4}") |
| 85 | |
| 86 | errBlessingAdd = verror.Register(pkgPath+".blessingAddFailed", verror.NoRetry, "failed to add blessing granted to server {3}{:4}") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 87 | ) |
| 88 | |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 89 | // TODO(ribrdb): Flip this to true once everything is updated. |
| 90 | const enableSecureServerAuth = false |
| 91 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 92 | type client struct { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 93 | streamMgr stream.Manager |
| 94 | ns naming.Namespace |
| 95 | vcOpts []stream.VCOpt // vc opts passed to dial |
| 96 | preferredProtocols []string |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 97 | |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame^] | 98 | // We cache the IP networks on the device since it is not that cheap to read |
| 99 | // network interfaces through os syscall. |
| 100 | // TODO(jhahn): Add monitoring the network interface changes. |
| 101 | ipNets []*net.IPNet |
| 102 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 103 | // We support concurrent calls to StartCall and Close, so we must protect the |
| 104 | // vcMap. Everything else is initialized upon client construction, and safe |
| 105 | // to use concurrently. |
| 106 | vcMapMu sync.Mutex |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 107 | vcMap map[vcMapKey]*vcInfo |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 108 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 109 | dc vc.DischargeClient |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 110 | } |
| 111 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 112 | var _ ipc.Client = (*client)(nil) |
| 113 | var _ ipc.BindOpt = (*client)(nil) |
| 114 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 115 | type vcInfo struct { |
| 116 | vc stream.VC |
| 117 | remoteEP naming.Endpoint |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 118 | } |
| 119 | |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 120 | type vcMapKey struct { |
| 121 | endpoint string |
| 122 | encrypted bool |
| 123 | } |
| 124 | |
Cosmos Nicolaou | 4e02997 | 2014-06-13 14:53:08 -0700 | [diff] [blame] | 125 | func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 126 | c := &client{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 127 | streamMgr: streamMgr, |
| 128 | ns: ns, |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame^] | 129 | ipNets: ipNetworks(), |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 130 | vcMap: make(map[vcMapKey]*vcInfo), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 131 | } |
Suharsh Sivakumar | 1b6683e | 2014-12-30 13:00:38 -0800 | [diff] [blame] | 132 | c.dc = InternalNewDischargeClient(nil, c) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 133 | for _, opt := range opts { |
| 134 | // Collect all client opts that are also vc opts. |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 135 | switch v := opt.(type) { |
| 136 | case stream.VCOpt: |
| 137 | c.vcOpts = append(c.vcOpts, v) |
| 138 | case options.PreferredProtocols: |
| 139 | c.preferredProtocols = v |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 140 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 141 | } |
Suharsh Sivakumar | 1b6683e | 2014-12-30 13:00:38 -0800 | [diff] [blame] | 142 | c.vcOpts = append(c.vcOpts, c.dc) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 143 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 144 | return c, nil |
| 145 | } |
| 146 | |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 147 | func vcEncrypted(vcOpts []stream.VCOpt) bool { |
| 148 | encrypted := true |
| 149 | for _, o := range vcOpts { |
| 150 | switch o { |
| 151 | case options.VCSecurityNone: |
| 152 | encrypted = false |
| 153 | case options.VCSecurityConfidential: |
| 154 | encrypted = true |
| 155 | } |
| 156 | } |
| 157 | return encrypted |
| 158 | } |
| 159 | |
| 160 | func (c *client) createFlow(ctx *context.T, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, verror.E) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 161 | c.vcMapMu.Lock() |
| 162 | defer c.vcMapMu.Unlock() |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 163 | if c.vcMap == nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 164 | return nil, verror.Make(errClientCloseAlreadyCalled, ctx) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 165 | } |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 166 | vcKey := vcMapKey{ep.String(), vcEncrypted(vcOpts)} |
| 167 | if vcinfo := c.vcMap[vcKey]; vcinfo != nil { |
Bogdan Caprita | 783f779 | 2014-05-15 09:29:17 -0700 | [diff] [blame] | 168 | if flow, err := vcinfo.vc.Connect(); err == nil { |
| 169 | return flow, nil |
| 170 | } |
| 171 | // If the vc fails to establish a new flow, we assume it's |
| 172 | // broken, remove it from the map, and proceed to establishing |
| 173 | // a new vc. |
| 174 | // TODO(caprita): Should we distinguish errors due to vc being |
| 175 | // closed from other errors? If not, should we call vc.Close() |
| 176 | // before removing the vc from the map? |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 177 | delete(c.vcMap, vcKey) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 178 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 179 | sm := c.streamMgr |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 180 | c.vcMapMu.Unlock() |
Asim Shankar | 7049475 | 2015-01-23 16:10:23 -0800 | [diff] [blame] | 181 | // Include the context when Dial-ing. This is currently done via an |
| 182 | // option, and for thread-safety reasons - cannot append directly to |
| 183 | // vcOpts. |
| 184 | // TODO(ashankar,mattr): Revisit the API in ipc/stream and explicitly |
| 185 | // provide a context to Dial and other relevant operations. |
| 186 | cpy := make([]stream.VCOpt, len(vcOpts)+1) |
| 187 | cpy[copy(cpy, vcOpts)] = vc.DialContext{ctx} |
| 188 | vcOpts = cpy |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 189 | vc, err := sm.Dial(ep, vcOpts...) |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 190 | c.vcMapMu.Lock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 191 | if err != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 192 | if strings.Contains(err.Error(), "authentication failed") { |
| 193 | return nil, verror.Make(errAuthError, ctx, ep, err) |
| 194 | } else { |
| 195 | return nil, verror.Make(errSystemRetry, ctx, err) |
| 196 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 197 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 198 | if c.vcMap == nil { |
| 199 | sm.ShutdownEndpoint(ep) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 200 | return nil, verror.Make(errClientCloseAlreadyCalled, ctx) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 201 | } |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 202 | if othervc, exists := c.vcMap[vcKey]; exists { |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 203 | vc = othervc.vc |
| 204 | // TODO(ashankar,toddw): Figure out how to close up the VC that |
| 205 | // is discarded. vc.Close? |
| 206 | } else { |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 207 | c.vcMap[vcKey] = &vcInfo{vc: vc, remoteEP: ep} |
Robin Thellend | ee43964 | 2014-10-20 14:39:17 -0700 | [diff] [blame] | 208 | } |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 209 | flow, err := vc.Connect() |
| 210 | if err != nil { |
| 211 | |
| 212 | return nil, verror.Make(errAuthError, ctx, ep, err) |
| 213 | } |
| 214 | return flow, nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 215 | } |
| 216 | |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 217 | // A randomized exponential backoff. The randomness deters error convoys from forming. |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 218 | // TODO(cnicolaou): rationalize this and the backoff in ipc.Server. Note |
| 219 | // that rand is not thread safe and may crash. |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 220 | func backoff(n int, deadline time.Time) bool { |
| 221 | b := time.Duration(math.Pow(1.5+(rand.Float64()/2.0), float64(n)) * float64(time.Second)) |
| 222 | if b > maxBackoff { |
| 223 | b = maxBackoff |
| 224 | } |
| 225 | r := deadline.Sub(time.Now()) |
| 226 | if b > r { |
| 227 | // We need to leave a little time for the call to start or |
| 228 | // we'll just timeout in startCall before we actually do |
| 229 | // anything. If we just have a millisecond left, give up. |
| 230 | if r <= time.Millisecond { |
| 231 | return false |
| 232 | } |
| 233 | b = r - time.Millisecond |
| 234 | } |
| 235 | time.Sleep(b) |
| 236 | return true |
| 237 | } |
| 238 | |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 239 | func (c *client) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) { |
| 240 | defer vlog.LogCall()() |
| 241 | return c.startCall(ctx, name, method, args, opts) |
| 242 | } |
| 243 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 244 | func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) { |
| 245 | for _, o := range opts { |
Asim Shankar | cc04421 | 2014-10-15 23:25:26 -0700 | [diff] [blame] | 246 | if r, ok := o.(options.RetryTimeout); ok { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 247 | return time.Duration(r), true |
| 248 | } |
| 249 | } |
| 250 | return 0, false |
| 251 | } |
| 252 | |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 253 | func getNoResolveOpt(opts []ipc.CallOpt) bool { |
| 254 | for _, o := range opts { |
Suharsh Sivakumar | b59a96d | 2015-01-09 16:39:54 -0800 | [diff] [blame] | 255 | if _, ok := o.(options.NoResolve); ok { |
| 256 | return true |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 257 | } |
| 258 | } |
| 259 | return false |
| 260 | } |
| 261 | |
Suharsh Sivakumar | 1131687 | 2014-11-25 15:57:00 -0800 | [diff] [blame] | 262 | func shouldNotFetchDischarges(opts []ipc.CallOpt) bool { |
| 263 | for _, o := range opts { |
| 264 | if _, ok := o.(vc.NoDischarges); ok { |
| 265 | return true |
| 266 | } |
| 267 | } |
| 268 | return false |
| 269 | } |
| 270 | |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 271 | func getVCOpts(opts []ipc.CallOpt) (vcOpts []stream.VCOpt) { |
| 272 | for _, o := range opts { |
| 273 | if v, ok := o.(stream.VCOpt); ok { |
| 274 | vcOpts = append(vcOpts, v) |
| 275 | } |
| 276 | } |
| 277 | return |
| 278 | } |
| 279 | |
| 280 | func getResolveOpts(opts []ipc.CallOpt) (resolveOpts []naming.ResolveOpt) { |
| 281 | for _, o := range opts { |
| 282 | if r, ok := o.(naming.ResolveOpt); ok { |
| 283 | resolveOpts = append(resolveOpts, r) |
| 284 | } |
| 285 | } |
| 286 | return |
| 287 | } |
| 288 | |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 289 | func allowCancel(opts []ipc.CallOpt) bool { |
| 290 | for _, o := range opts { |
| 291 | if _, ok := o.(inaming.NoCancel); ok { |
| 292 | return false |
| 293 | } |
| 294 | } |
| 295 | return true |
| 296 | } |
| 297 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 298 | func mkDischargeImpetus(serverBlessings []string, method string, args []interface{}) security.DischargeImpetus { |
| 299 | var impetus security.DischargeImpetus |
| 300 | if len(serverBlessings) > 0 { |
| 301 | impetus.Server = make([]security.BlessingPattern, len(serverBlessings)) |
| 302 | for i, b := range serverBlessings { |
| 303 | impetus.Server[i] = security.BlessingPattern(b) |
| 304 | } |
| 305 | } |
| 306 | impetus.Method = method |
| 307 | if len(args) > 0 { |
Todd Wang | b86b352 | 2015-01-22 13:34:20 -0800 | [diff] [blame] | 308 | impetus.Arguments = make([]vdl.AnyRep, len(args)) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 309 | for i, a := range args { |
Todd Wang | b86b352 | 2015-01-22 13:34:20 -0800 | [diff] [blame] | 310 | impetus.Arguments[i] = vdl.AnyRep(a) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 311 | } |
| 312 | } |
| 313 | return impetus |
| 314 | } |
| 315 | |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 316 | // startCall ensures StartCall always returns verror.E. |
Matt Rosencrantz | 4f8ac60 | 2014-12-29 14:42:48 -0800 | [diff] [blame] | 317 | func (c *client) startCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) { |
| 318 | if !ctx.Initialized() { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 319 | return nil, verror.ExplicitMake(verror.BadArg, i18n.NoLangID, "ipc.Client", "StartCall") |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 320 | } |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 321 | |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 322 | ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method)) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 323 | ctx = verror.ContextWithComponentName(ctx, "ipc.Client") |
Matt Rosencrantz | 321a51d | 2014-10-30 10:37:56 -0700 | [diff] [blame] | 324 | |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 325 | // Context specified deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 326 | deadline, hasDeadline := ctx.Deadline() |
| 327 | if !hasDeadline { |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 328 | // Default deadline. |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 329 | deadline = time.Now().Add(defaultCallTimeout) |
| 330 | } |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 331 | if r, ok := getRetryTimeoutOpt(opts); ok { |
| 332 | // Caller specified deadline. |
| 333 | deadline = time.Now().Add(time.Duration(r)) |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 334 | } |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 335 | |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 336 | var lastErr verror.E |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 337 | for retries := 0; ; retries++ { |
David Why Use Two When One Will Do Presotto | b02cf90 | 2014-09-12 17:22:56 -0700 | [diff] [blame] | 338 | if retries != 0 { |
| 339 | if !backoff(retries, deadline) { |
| 340 | break |
| 341 | } |
| 342 | } |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 343 | call, action, err := c.tryCall(ctx, name, method, args, opts) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 344 | if err == nil { |
| 345 | return call, nil |
| 346 | } |
| 347 | lastErr = err |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 348 | shouldRetry := true |
| 349 | switch { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 350 | case action != verror.RetryConnection && action != verror.RetryRefetch: |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 351 | shouldRetry = false |
| 352 | case time.Now().After(deadline): |
| 353 | shouldRetry = false |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 354 | case action == verror.RetryRefetch && getNoResolveOpt(opts): |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 355 | // If we're skipping resolution and there are no servers for |
| 356 | // this call retrying is not going to help, we can't come up |
| 357 | // with new servers if there is no resolution. |
| 358 | shouldRetry = false |
| 359 | } |
Matt Rosencrantz | cc922c1 | 2014-11-28 20:28:59 -0800 | [diff] [blame] | 360 | if !shouldRetry { |
Matt Rosencrantz | abacd43 | 2014-11-24 10:44:31 -0800 | [diff] [blame] | 361 | span.Annotatef("Cannot retry after error: %s", err) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 362 | break |
| 363 | } |
Matt Rosencrantz | abacd43 | 2014-11-24 10:44:31 -0800 | [diff] [blame] | 364 | span.Annotatef("Retrying due to error: %s", err) |
David Why Use Two When One Will Do Presotto | 06c60f0 | 2014-09-11 14:52:19 -0700 | [diff] [blame] | 365 | } |
| 366 | return nil, lastErr |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 367 | } |
| 368 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 369 | type serverStatus struct { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 370 | index int |
| 371 | suffix string |
| 372 | flow stream.Flow |
| 373 | err verror.E |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 374 | } |
| 375 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 376 | // tryCreateFlow attempts to establish a Flow to "server" (which must be a |
| 377 | // rooted name), over which a method invocation request could be sent. |
Cosmos Nicolaou | 00a0f80 | 2014-11-16 22:44:55 -0800 | [diff] [blame] | 378 | // TODO(cnicolaou): implement real, configurable load balancing. |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 379 | func (c *client) tryCreateFlow(ctx *context.T, index int, server string, ch chan<- *serverStatus, vcOpts []stream.VCOpt) { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 380 | status := &serverStatus{index: index} |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 381 | var span vtrace.Span |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 382 | ctx, span = vtrace.SetNewSpan(ctx, "<client>tryCreateFlow") |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 383 | span.Annotatef("address:%v", server) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 384 | defer func() { |
| 385 | ch <- status |
| 386 | span.Finish() |
| 387 | }() |
| 388 | address, suffix := naming.SplitAddressName(server) |
| 389 | if len(address) == 0 { |
| 390 | status.err = verror.Make(errNonRootedName, ctx, server) |
| 391 | return |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 392 | } |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 393 | ep, err := inaming.NewEndpoint(address) |
| 394 | if err != nil { |
| 395 | status.err = verror.Make(errInvalidEndpoint, ctx, address) |
| 396 | return |
| 397 | } |
| 398 | if err = version.CheckCompatibility(ep); err != nil { |
| 399 | status.err = verror.Make(errIncompatibleEndpoint, ctx, ep) |
| 400 | return |
| 401 | } |
| 402 | if status.flow, status.err = c.createFlow(ctx, ep, vcOpts); status.err != nil { |
| 403 | vlog.VI(2).Infof("ipc: connect to %v: %v", server, status.err) |
| 404 | return |
| 405 | } |
| 406 | status.suffix = suffix |
| 407 | return |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 408 | } |
| 409 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 410 | // tryCall makes a single attempt at a call. It may connect to multiple servers |
| 411 | // (all that serve "name"), but will invoke the method on at most one of them |
| 412 | // (the server running on the most preferred protcol and network amongst all |
| 413 | // the servers that were successfully connected to and authorized). |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 414 | func (c *client) tryCall(ctx *context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.ActionCode, verror.E) { |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 415 | var resolved *naming.MountEntry |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 416 | var pattern security.BlessingPattern |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 417 | var err error |
| 418 | if resolved, err = c.ns.Resolve(ctx, name, getResolveOpts(opts)...); err != nil { |
David Why Use Two When One Will Do Presotto | 8de8585 | 2015-01-21 11:05:09 -0800 | [diff] [blame] | 419 | vlog.Errorf("Resolve: %v", err) |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 420 | if verror.Is(err, naming.ErrNoSuchName.ID) { |
| 421 | return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name) |
| 422 | } |
| 423 | return nil, verror.NoRetry, verror.Make(verror.NoExist, ctx, name, err) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 424 | } else { |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 425 | pattern = security.BlessingPattern(resolved.Pattern) |
| 426 | if len(resolved.Servers) == 0 { |
| 427 | return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name) |
Suharsh Sivakumar | 65e44c2 | 2014-12-10 17:15:19 -0800 | [diff] [blame] | 428 | } |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 429 | // An empty set of protocols means all protocols... |
Jungho Ahn | 25545d3 | 2015-01-26 15:14:14 -0800 | [diff] [blame^] | 430 | if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil { |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 431 | return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, name, err) |
David Why Use Two When One Will Do Presotto | 3da1c79 | 2014-10-03 11:15:53 -0700 | [diff] [blame] | 432 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 433 | } |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 434 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 435 | // servers is now orderd by the priority heurestic implemented in |
| 436 | // filterAndOrderServers. |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 437 | // |
| 438 | // Try to connect to all servers in parallel. Provide sufficient |
| 439 | // buffering for all of the connections to finish instantaneously. This |
| 440 | // is important because we want to process the responses in priority |
| 441 | // order; that order is indicated by the order of entries in servers. |
| 442 | // So, if two respones come in at the same 'instant', we prefer the |
| 443 | // first in the resolved.Servers) |
| 444 | attempts := len(resolved.Servers) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 445 | responses := make([]*serverStatus, attempts) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 446 | ch := make(chan *serverStatus, attempts) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 447 | vcOpts := append(getVCOpts(opts), c.vcOpts...) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 448 | for i, server := range resolved.Names() { |
| 449 | go c.tryCreateFlow(ctx, i, server, ch, vcOpts) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 450 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 451 | |
| 452 | delay := time.Duration(ipc.NoTimeout) |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 453 | if dl, ok := ctx.Deadline(); ok { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 454 | delay = dl.Sub(time.Now()) |
| 455 | } |
| 456 | timeoutChan := time.After(delay) |
| 457 | |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 458 | for { |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 459 | // Block for at least one new response from the server, or the timeout. |
| 460 | select { |
| 461 | case r := <-ch: |
| 462 | responses[r.index] = r |
| 463 | // Read as many more responses as we can without blocking. |
| 464 | LoopNonBlocking: |
| 465 | for { |
| 466 | select { |
| 467 | default: |
| 468 | break LoopNonBlocking |
| 469 | case r := <-ch: |
| 470 | responses[r.index] = r |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 471 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 472 | } |
| 473 | case <-timeoutChan: |
| 474 | vlog.VI(2).Infof("ipc: timeout on connection to server %v ", name) |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 475 | _, _, err := c.failedTryCall(ctx, name, method, responses, ch) |
Cosmos Nicolaou | 38209d4 | 2014-12-09 16:50:38 -0800 | [diff] [blame] | 476 | if !verror.Is(err, verror.Timeout.ID) { |
| 477 | return nil, verror.NoRetry, verror.Make(verror.Timeout, ctx, err) |
| 478 | } |
| 479 | return nil, verror.NoRetry, err |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 480 | } |
| 481 | |
| 482 | // Process new responses, in priority order. |
| 483 | numResponses := 0 |
Suharsh Sivakumar | ae774a5 | 2015-01-09 14:26:32 -0800 | [diff] [blame] | 484 | noDischarges := shouldNotFetchDischarges(opts) |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 485 | for _, r := range responses { |
| 486 | if r != nil { |
| 487 | numResponses++ |
| 488 | } |
| 489 | if r == nil || r.flow == nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 490 | continue |
| 491 | } |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 492 | |
| 493 | var doneChan <-chan struct{} |
| 494 | if allowCancel(opts) { |
| 495 | doneChan = ctx.Done() |
| 496 | } |
| 497 | r.flow.SetDeadline(doneChan) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 498 | |
| 499 | var ( |
| 500 | serverB []string |
| 501 | grantedB security.Blessings |
| 502 | ) |
| 503 | |
| 504 | // LocalPrincipal is nil means that the client wanted to avoid |
| 505 | // authentication, and thus wanted to skip authorization as well. |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 506 | if r.flow.LocalPrincipal() != nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 507 | // Validate caveats on the server's identity for the context associated with this call. |
| 508 | var err error |
Ryan Brown | 6153c6c | 2014-12-11 13:10:09 -0800 | [diff] [blame] | 509 | if serverB, grantedB, err = c.authorizeServer(ctx, r.flow, name, method, pattern, opts); err != nil { |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 510 | r.err = verror.Make(errNotTrusted, ctx, name, r.flow.RemoteBlessings(), err) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 511 | vlog.VI(2).Infof("ipc: err: %s", r.err) |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 512 | r.flow.Close() |
| 513 | r.flow = nil |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 514 | continue |
| 515 | } |
| 516 | } |
| 517 | |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 518 | // This is the 'point of no return'; once the RPC is started (fc.start |
| 519 | // below) we can't be sure if it makes it to the server or not so, this |
| 520 | // code will never call fc.start more than once to ensure that we provide |
| 521 | // 'at-most-once' rpc semantics at this level. Retrying the network |
| 522 | // connections (i.e. creating flows) is fine since we can cleanup that |
| 523 | // state if we abort a call (i.e. close the flow). |
| 524 | // |
| 525 | // We must ensure that all flows other than r.flow are closed. |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 526 | // |
| 527 | // TODO(cnicolaou): all errors below are marked as NoRetry |
| 528 | // because we want to provide at-most-once rpc semantics so |
| 529 | // we only ever attempt an RPC once. In the future, we'll cache |
| 530 | // responses on the server and then we can retry in-process |
| 531 | // RPCs. |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 532 | go cleanupTryCall(r, responses, ch) |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 533 | fc, err := newFlowClient(ctx, serverB, r.flow, c.dc) |
| 534 | if err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 535 | return nil, verror.NoRetry, err.(verror.E) |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 536 | } |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 537 | |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 538 | if doneChan != nil { |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 539 | go func() { |
| 540 | select { |
Matt Rosencrantz | fa3082c | 2015-01-22 21:39:04 -0800 | [diff] [blame] | 541 | case <-doneChan: |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 542 | vtrace.GetSpan(fc.ctx).Annotate("Cancelled") |
Matt Rosencrantz | 9346b41 | 2014-12-18 15:59:19 -0800 | [diff] [blame] | 543 | fc.flow.Cancel() |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 544 | case <-fc.flow.Closed(): |
| 545 | } |
| 546 | }() |
| 547 | } |
| 548 | |
| 549 | timeout := time.Duration(ipc.NoTimeout) |
| 550 | if deadline, hasDeadline := ctx.Deadline(); hasDeadline { |
| 551 | timeout = deadline.Sub(time.Now()) |
| 552 | } |
Suharsh Sivakumar | 1131687 | 2014-11-25 15:57:00 -0800 | [diff] [blame] | 553 | if noDischarges { |
| 554 | fc.dc = nil |
| 555 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 556 | if verr := fc.start(r.suffix, method, args, timeout, grantedB); verr != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 557 | return nil, verror.NoRetry, verr |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 558 | } |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 559 | return fc, verror.NoRetry, nil |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 560 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 561 | if numResponses == len(responses) { |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 562 | return c.failedTryCall(ctx, name, method, responses, ch) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 563 | } |
| 564 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 565 | } |
| 566 | |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 567 | // cleanupTryCall ensures we've waited for every response from the tryCreateFlow |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 568 | // goroutines, and have closed the flow from each one except skip. This is a |
| 569 | // blocking function; it should be called in its own goroutine. |
| 570 | func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) { |
| 571 | numPending := 0 |
| 572 | for _, r := range responses { |
| 573 | switch { |
| 574 | case r == nil: |
| 575 | // The response hasn't arrived yet. |
| 576 | numPending++ |
| 577 | case r == skip || r.flow == nil: |
| 578 | // Either we should skip this flow, or we've closed the flow for this |
| 579 | // response already; nothing more to do. |
| 580 | default: |
| 581 | // We received the response, but haven't closed the flow yet. |
| 582 | r.flow.Close() |
| 583 | } |
| 584 | } |
| 585 | // Now we just need to wait for the pending responses and close their flows. |
| 586 | for i := 0; i < numPending; i++ { |
| 587 | if r := <-ch; r.flow != nil { |
| 588 | r.flow.Close() |
| 589 | } |
| 590 | } |
| 591 | } |
| 592 | |
| 593 | // failedTryCall performs asynchronous cleanup for tryCall, and returns an |
| 594 | // appropriate error from the responses we've already received. All parallel |
| 595 | // calls in tryCall failed or we timed out if we get here. |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 596 | func (c *client) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (ipc.Call, verror.ActionCode, verror.E) { |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 597 | go cleanupTryCall(nil, responses, ch) |
Cosmos Nicolaou | 4e8da64 | 2014-11-13 08:32:05 -0800 | [diff] [blame] | 598 | c.ns.FlushCacheEntry(name) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 599 | noconn, untrusted := []string{}, []string{} |
Asim Shankar | aae3180 | 2015-01-22 11:59:42 -0800 | [diff] [blame] | 600 | for _, r := range responses { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 601 | if r != nil && r.err != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 602 | switch { |
| 603 | case verror.Is(r.err, errNotTrusted.ID) || verror.Is(r.err, errAuthError.ID): |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 604 | untrusted = append(untrusted, "("+r.err.Error()+") ") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 605 | default: |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 606 | noconn = append(noconn, "("+r.err.Error()+") ") |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 607 | } |
Todd Wang | ef05c06 | 2014-11-15 09:51:43 -0800 | [diff] [blame] | 608 | } |
| 609 | } |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 610 | // TODO(cnicolaou): we get system errors for things like dialing using |
| 611 | // the 'ws' protocol which can never succeed even if we retry the connection, |
| 612 | // hence we return RetryRefetch in all cases below. In the future, we'll |
| 613 | // pick out this error and then we can retry the connection also. This also |
| 614 | // plays into the 'at-most-once' rpc semantics change that's needed in order |
| 615 | // to retry an in-flight RPC. |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 616 | switch { |
| 617 | case len(untrusted) > 0 && len(noconn) > 0: |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 618 | return nil, verror.RetryRefetch, verror.Make(verror.NoServersAndAuth, ctx, append(noconn, untrusted...)) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 619 | case len(noconn) > 0: |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 620 | return nil, verror.RetryRefetch, verror.Make(verror.NoServers, ctx, noconn) |
| 621 | case len(untrusted) > 0: |
| 622 | return nil, verror.NoRetry, verror.Make(verror.NotTrusted, ctx, untrusted) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 623 | default: |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 624 | return nil, verror.RetryRefetch, verror.Make(verror.Timeout, ctx) |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 625 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 626 | } |
| 627 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 628 | // authorizeServer validates that the server (remote end of flow) has the credentials to serve |
| 629 | // the RPC name.method for the client (local end of the flow). It returns the blessings at the |
| 630 | // server that are authorized for this purpose and any blessings that are to be granted to |
| 631 | // the server (via ipc.Granter implementations in opts.) |
Matt Rosencrantz | 4f8ac60 | 2014-12-29 14:42:48 -0800 | [diff] [blame] | 632 | func (c *client) authorizeServer(ctx *context.T, flow stream.Flow, name, method string, serverPattern security.BlessingPattern, opts []ipc.CallOpt) (serverBlessings []string, grantedBlessings security.Blessings, err verror.E) { |
Asim Shankar | 220a015 | 2014-10-30 21:21:09 -0700 | [diff] [blame] | 633 | if flow.RemoteBlessings() == nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 634 | return nil, nil, verror.Make(errNoBlessings, ctx) |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 635 | } |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 636 | ctxt := security.NewContext(&security.ContextParams{ |
| 637 | LocalPrincipal: flow.LocalPrincipal(), |
| 638 | LocalBlessings: flow.LocalBlessings(), |
| 639 | RemoteBlessings: flow.RemoteBlessings(), |
| 640 | LocalEndpoint: flow.LocalEndpoint(), |
| 641 | RemoteEndpoint: flow.RemoteEndpoint(), |
| 642 | RemoteDischarges: flow.RemoteDischarges(), |
| 643 | Method: method, |
Todd Wang | 9a7f516 | 2014-11-13 13:24:33 -0800 | [diff] [blame] | 644 | Suffix: name}) |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 645 | serverBlessings = flow.RemoteBlessings().ForContext(ctxt) |
| 646 | if serverPattern != "" { |
| 647 | if !serverPattern.MatchedBy(serverBlessings...) { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 648 | return nil, nil, verror.Make(errAuthNoPatternMatch, ctx, serverBlessings, serverPattern) |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 649 | } |
| 650 | } else if enableSecureServerAuth { |
| 651 | if err := (defaultAuthorizer{}).Authorize(ctxt); err != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 652 | return nil, nil, verror.Make(errDefaultAuthDenied, ctx, serverBlessings) |
Ryan Brown | 2726b40 | 2014-11-04 17:13:27 -0800 | [diff] [blame] | 653 | } |
| 654 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 655 | for _, o := range opts { |
| 656 | switch v := o.(type) { |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 657 | case ipc.Granter: |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 658 | if b, err := v.Grant(flow.RemoteBlessings()); err != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 659 | return nil, nil, verror.Make(errBlessingGrant, ctx, serverBlessings, err) |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 660 | } else if grantedBlessings, err = security.UnionOfBlessings(grantedBlessings, b); err != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 661 | return nil, nil, verror.Make(errBlessingAdd, ctx, serverBlessings, err) |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 662 | } |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 663 | } |
| 664 | } |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 665 | return serverBlessings, grantedBlessings, nil |
Asim Shankar | b54d764 | 2014-06-05 13:08:04 -0700 | [diff] [blame] | 666 | } |
| 667 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 668 | func (c *client) Close() { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 669 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 670 | c.vcMapMu.Lock() |
| 671 | for _, v := range c.vcMap { |
| 672 | c.streamMgr.ShutdownEndpoint(v.remoteEP) |
| 673 | } |
| 674 | c.vcMap = nil |
| 675 | c.vcMapMu.Unlock() |
| 676 | } |
| 677 | |
| 678 | // IPCBindOpt makes client implement BindOpt. |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 679 | func (c *client) IPCBindOpt() { |
| 680 | //nologcall |
| 681 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 682 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 683 | // flowClient implements the RPC client-side protocol for a single RPC, over a |
| 684 | // flow that's already connected to the server. |
| 685 | type flowClient struct { |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 686 | ctx *context.T // context to annotate with call details |
| 687 | dec *vom.Decoder // to decode responses and results from the server |
| 688 | enc *vom.Encoder // to encode requests and args to the server |
| 689 | server []string // Blessings bound to the server that authorize it to receive the IPC request from the client. |
| 690 | flow stream.Flow // the underlying flow |
| 691 | response ipc.Response // each decoded response message is kept here |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 692 | |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 693 | discharges []security.Discharge // discharges used for this request |
| 694 | dc vc.DischargeClient // client-global discharge-client |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 695 | |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 696 | blessings security.Blessings // the local blessings for the current RPC. |
| 697 | |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 698 | sendClosedMu sync.Mutex |
| 699 | sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 700 | finished bool // has Finish() already been called? |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 701 | } |
| 702 | |
Benjamin Prosnitz | fdfbf7b | 2014-10-08 09:47:21 -0700 | [diff] [blame] | 703 | var _ ipc.Call = (*flowClient)(nil) |
| 704 | var _ ipc.Stream = (*flowClient)(nil) |
| 705 | |
Matt Rosencrantz | 4f8ac60 | 2014-12-29 14:42:48 -0800 | [diff] [blame] | 706 | func newFlowClient(ctx *context.T, server []string, flow stream.Flow, dc vc.DischargeClient) (*flowClient, error) { |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 707 | fc := &flowClient{ |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 708 | ctx: ctx, |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 709 | server: server, |
| 710 | flow: flow, |
| 711 | dc: dc, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 712 | } |
Todd Wang | f519f8f | 2015-01-21 10:07:41 -0800 | [diff] [blame] | 713 | var err error |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 714 | if fc.enc, err = vom.NewBinaryEncoder(flow); err != nil { |
Todd Wang | f519f8f | 2015-01-21 10:07:41 -0800 | [diff] [blame] | 715 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errVomEncoder, fc.ctx, err)) |
| 716 | return nil, fc.close(berr) |
| 717 | } |
Todd Wang | 3425a90 | 2015-01-21 18:43:59 -0800 | [diff] [blame] | 718 | if fc.dec, err = vom.NewDecoder(flow); err != nil { |
Todd Wang | f519f8f | 2015-01-21 10:07:41 -0800 | [diff] [blame] | 719 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errVomDecoder, fc.ctx, err)) |
| 720 | return nil, fc.close(berr) |
Todd Wang | 34ed4c6 | 2014-11-26 15:15:52 -0800 | [diff] [blame] | 721 | } |
| 722 | return fc, nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 723 | } |
| 724 | |
| 725 | func (fc *flowClient) close(verr verror.E) verror.E { |
| 726 | if err := fc.flow.Close(); err != nil && verr == nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 727 | verr = verror.Make(errClosingFlow, fc.ctx, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 728 | } |
| 729 | return verr |
| 730 | } |
| 731 | |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 732 | func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessings security.Blessings) verror.E { |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 733 | // Fetch any discharges for third-party caveats on the client's blessings |
| 734 | // if this client owns a discharge-client. |
| 735 | if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil { |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 736 | fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args)) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 737 | } |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 738 | // Encode the Blessings information for the client to authorize the flow. |
| 739 | var blessingsRequest ipc.BlessingsRequest |
| 740 | if fc.flow.LocalPrincipal() != nil { |
Jungho Ahn | 44d8daf | 2015-01-16 10:39:15 -0800 | [diff] [blame] | 741 | fc.blessings = fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...) |
| 742 | blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), fc.blessings) |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 743 | } |
| 744 | // TODO(suharshs, ataly): Make security.Discharge a vdl type. |
Todd Wang | b86b352 | 2015-01-22 13:34:20 -0800 | [diff] [blame] | 745 | anyDischarges := make([]vdl.AnyRep, len(fc.discharges)) |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 746 | for i, d := range fc.discharges { |
| 747 | anyDischarges[i] = d |
| 748 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 749 | req := ipc.Request{ |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 750 | Suffix: suffix, |
| 751 | Method: method, |
| 752 | NumPosArgs: uint64(len(args)), |
| 753 | Timeout: int64(timeout), |
| 754 | GrantedBlessings: security.MarshalBlessings(blessings), |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 755 | Blessings: blessingsRequest, |
| 756 | Discharges: anyDischarges, |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 757 | TraceRequest: ivtrace.Request(fc.ctx), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 758 | } |
| 759 | if err := fc.enc.Encode(req); err != nil { |
Todd Wang | bc4875f | 2014-12-12 10:30:26 -0800 | [diff] [blame] | 760 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err)) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 761 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 762 | } |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 763 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 764 | for ix, arg := range args { |
| 765 | if err := fc.enc.Encode(arg); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 766 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, ix, err)) |
| 767 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 768 | } |
| 769 | } |
| 770 | return nil |
| 771 | } |
| 772 | |
| 773 | func (fc *flowClient) Send(item interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 774 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 775 | if fc.sendClosed { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 776 | return verror.Make(verror.Aborted, fc.ctx) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 777 | } |
| 778 | |
| 779 | // The empty request header indicates what follows is a streaming arg. |
| 780 | if err := fc.enc.Encode(ipc.Request{}); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 781 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, ipc.Request{}, err)) |
| 782 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 783 | } |
| 784 | if err := fc.enc.Encode(item); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 785 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, -1, err)) |
| 786 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 787 | } |
| 788 | return nil |
| 789 | } |
| 790 | |
Matt Rosencrantz | 4f8ac60 | 2014-12-29 14:42:48 -0800 | [diff] [blame] | 791 | func decodeNetError(ctx *context.T, err error) verror.IDAction { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 792 | if neterr, ok := err.(net.Error); ok { |
| 793 | if neterr.Timeout() || neterr.Temporary() { |
| 794 | // If a read is cancelled in the lower levels we see |
| 795 | // a timeout error - see readLocked in vc/reader.go |
| 796 | if ctx.Err() == context.Canceled { |
| 797 | return verror.Cancelled |
| 798 | } |
| 799 | return verror.Timeout |
| 800 | } |
| 801 | } |
| 802 | return verror.BadProtocol |
| 803 | } |
| 804 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 805 | func (fc *flowClient) Recv(itemptr interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 806 | defer vlog.LogCall()() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 807 | switch { |
| 808 | case fc.response.Error != nil: |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 809 | // TODO(cnicolaou): this will become a verror2.E when we convert the |
| 810 | // server. |
| 811 | return verror.Make(verror.BadProtocol, fc.ctx, fc.response.Error) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 812 | case fc.response.EndStreamResults: |
| 813 | return io.EOF |
| 814 | } |
| 815 | |
| 816 | // Decode the response header and handle errors and EOF. |
| 817 | if err := fc.dec.Decode(&fc.response); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 818 | berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err)) |
| 819 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 820 | } |
| 821 | if fc.response.Error != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 822 | // TODO(cnicolaou): this will become a verror2.E when we convert the |
| 823 | // server. |
| 824 | return verror.Make(verror.BadProtocol, fc.ctx, fc.response.Error) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 825 | } |
| 826 | if fc.response.EndStreamResults { |
| 827 | // Return EOF to indicate to the caller that there are no more stream |
| 828 | // results. Any error sent by the server is kept in fc.response.Error, and |
| 829 | // returned to the user in Finish. |
| 830 | return io.EOF |
| 831 | } |
| 832 | // Decode the streaming result. |
| 833 | if err := fc.dec.Decode(itemptr); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 834 | berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err)) |
| 835 | // TODO(cnicolaou): should we be caching this? |
| 836 | fc.response.Error = berr |
| 837 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 838 | } |
| 839 | return nil |
| 840 | } |
| 841 | |
| 842 | func (fc *flowClient) CloseSend() error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 843 | defer vlog.LogCall()() |
Tilak Sharma | 0c76611 | 2014-05-20 17:47:27 -0700 | [diff] [blame] | 844 | return fc.closeSend() |
| 845 | } |
| 846 | |
| 847 | // closeSend ensures CloseSend always returns verror.E. |
| 848 | func (fc *flowClient) closeSend() verror.E { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 849 | fc.sendClosedMu.Lock() |
| 850 | defer fc.sendClosedMu.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 851 | if fc.sendClosed { |
Asim Shankar | 1707e43 | 2014-05-29 19:42:41 -0700 | [diff] [blame] | 852 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 853 | } |
| 854 | if err := fc.enc.Encode(ipc.Request{EndStreamArgs: true}); err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 855 | // TODO(caprita): Indiscriminately closing the flow below causes |
| 856 | // a race as described in: |
| 857 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 858 | // |
| 859 | // There should be a finer grained way to fix this (for example, |
| 860 | // encoding errors should probably still result in closing the |
| 861 | // flow); on the flip side, there may exist other instances |
| 862 | // where we are closing the flow but should not. |
| 863 | // |
| 864 | // For now, commenting out the line below removes the flakiness |
| 865 | // from our existing unit tests, but this needs to be revisited |
| 866 | // and fixed correctly. |
| 867 | // |
| 868 | // return fc.close(verror.BadProtocolf("ipc: end stream args encoding failed: %v", err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 869 | } |
| 870 | fc.sendClosed = true |
| 871 | return nil |
| 872 | } |
| 873 | |
| 874 | func (fc *flowClient) Finish(resultptrs ...interface{}) error { |
Mehrdad Afshari | cd9852b | 2014-09-26 11:07:35 -0700 | [diff] [blame] | 875 | defer vlog.LogCall()() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 876 | err := fc.finish(resultptrs...) |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 877 | vtrace.GetSpan(fc.ctx).Finish() |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 878 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 879 | } |
| 880 | |
| 881 | // finish ensures Finish always returns verror.E. |
| 882 | func (fc *flowClient) finish(resultptrs ...interface{}) verror.E { |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 883 | if fc.finished { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 884 | err := verror.Make(errClientFinishAlreadyCalled, fc.ctx) |
| 885 | return fc.close(verror.Make(verror.BadState, fc.ctx, err)) |
Ken Ashcraft | 2b8309a | 2014-09-09 10:44:43 -0700 | [diff] [blame] | 886 | } |
| 887 | fc.finished = true |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 888 | |
Todd Wang | ce3033b | 2014-05-23 17:04:44 -0700 | [diff] [blame] | 889 | // Call closeSend implicitly, if the user hasn't already called it. There are |
| 890 | // three cases: |
| 891 | // 1) Server is blocked on Recv waiting for the final request message. |
| 892 | // 2) Server has already finished processing, the final response message and |
| 893 | // out args are queued up on the client, and the flow is closed. |
| 894 | // 3) Between 1 and 2: the server isn't blocked on Recv, but the final |
| 895 | // response and args aren't queued up yet, and the flow isn't closed. |
| 896 | // |
| 897 | // We must call closeSend to handle case (1) and unblock the server; otherwise |
| 898 | // we'll deadlock with both client and server waiting for each other. We must |
| 899 | // ignore the error (if any) to handle case (2). In that case the flow is |
| 900 | // closed, meaning writes will fail and reads will succeed, and closeSend will |
| 901 | // always return an error. But this isn't a "real" error; the client should |
| 902 | // read the rest of the results and succeed. |
| 903 | _ = fc.closeSend() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 904 | // Decode the response header, if it hasn't already been decoded by Recv. |
| 905 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
| 906 | if err := fc.dec.Decode(&fc.response); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 907 | berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResponseDecoding, fc.ctx, err)) |
| 908 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 909 | } |
| 910 | // The response header must indicate the streaming results have ended. |
| 911 | if fc.response.Error == nil && !fc.response.EndStreamResults { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 912 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRemainingStreamResults, fc.ctx)) |
| 913 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 914 | } |
| 915 | } |
Suharsh Sivakumar | 720b704 | 2014-12-22 17:33:23 -0800 | [diff] [blame] | 916 | if fc.response.AckBlessings { |
| 917 | clientAckBlessings(fc.flow.VCDataCache(), fc.blessings) |
| 918 | } |
Matt Rosencrantz | 9fe6082 | 2014-09-12 10:09:53 -0700 | [diff] [blame] | 919 | // Incorporate any VTrace info that was returned. |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 920 | ivtrace.Merge(fc.ctx, fc.response.TraceResponse) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 921 | if fc.response.Error != nil { |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 922 | // TODO(cnicolaou): remove verror.NoAccess with verror version |
| 923 | // when ipc.Server is converted. |
| 924 | if verror.Is(fc.response.Error, old_verror.NoAccess) && fc.dc != nil { |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 925 | // In case the error was caused by a bad discharge, we do not want to get stuck |
| 926 | // with retrying again and again with this discharge. As there is no direct way |
| 927 | // to detect it, we conservatively flush all discharges we used from the cache. |
| 928 | // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly? |
Asim Shankar | 77befba | 2015-01-09 12:49:04 -0800 | [diff] [blame] | 929 | vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error) |
Ankur | e49a86a | 2014-11-11 18:52:43 -0800 | [diff] [blame] | 930 | fc.dc.Invalidate(fc.discharges...) |
Andres Erbsen | b7f95f3 | 2014-07-07 12:07:56 -0700 | [diff] [blame] | 931 | } |
Cosmos Nicolaou | 112bf1c | 2014-11-21 15:43:11 -0800 | [diff] [blame] | 932 | return fc.close(verror.Convert(verror.Internal, fc.ctx, fc.response.Error)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 933 | } |
| 934 | if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 935 | berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errMismatchedResults, fc.ctx, got, want)) |
| 936 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 937 | } |
| 938 | for ix, r := range resultptrs { |
| 939 | if err := fc.dec.Decode(r); err != nil { |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 940 | berr := verror.Make(decodeNetError(fc.ctx, err), fc.ctx, verror.Make(errResultDecoding, fc.ctx, ix, err)) |
| 941 | return fc.close(berr) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 942 | } |
| 943 | } |
| 944 | return fc.close(nil) |
| 945 | } |
| 946 | |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 947 | func (fc *flowClient) RemoteBlessings() ([]string, security.Blessings) { |
Asim Shankar | 8f05c22 | 2014-10-06 22:08:19 -0700 | [diff] [blame] | 948 | return fc.server, fc.flow.RemoteBlessings() |
Asim Shankar | 2d731a9 | 2014-09-29 17:46:38 -0700 | [diff] [blame] | 949 | } |