blob: 6fbdd4efef8c1bf7c52ceaba8aa002944009b9ae [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron2/config"
13 "veyron.io/veyron/veyron2/context"
14 "veyron.io/veyron/veyron2/ipc"
15 "veyron.io/veyron/veyron2/ipc/stream"
16 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070017 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070018 "veyron.io/veyron/veyron2/security"
Asim Shankar68885192014-11-26 12:48:35 -080019 "veyron.io/veyron/veyron2/services/security/access"
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080020 old_verror "veyron.io/veyron/veyron2/verror"
21 verror "veyron.io/veyron/veyron2/verror2"
Jiri Simsa519c5072014-09-17 21:37:57 -070022 "veyron.io/veyron/veyron2/vlog"
23 "veyron.io/veyron/veyron2/vom"
Todd Wang34ed4c62014-11-26 15:15:52 -080024 "veyron.io/veyron/veyron2/vom2"
Jiri Simsa519c5072014-09-17 21:37:57 -070025 "veyron.io/veyron/veyron2/vtrace"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070026
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070027 "veyron.io/veyron/veyron/lib/netstate"
Bogdan Capritae7376312014-11-10 13:13:17 -080028 "veyron.io/veyron/veyron/lib/stats"
29 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070030 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
31 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070032 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080033
34 // TODO(cnicolaou): finish verror -> verror2 transition, in particular
35 // for communicating from server to client.
36 // TODO(cnicolaou): remove the vom1 code now that vom2 is in place.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070037)
38
39var (
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080040 // TODO(cnicolaou): this should be BadState in verror2.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080041 errServerStopped = old_verror.Abortedf("ipc: server is stopped")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070042)
43
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044type server struct {
45 sync.Mutex
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080046 ctx context.T // context used by the server to make internal RPCs.
47 streamMgr stream.Manager // stream manager to listen for new flows.
48 publisher publisher.Publisher // publisher to publish mounttable mounts.
49 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
50 listeners map[stream.Listener]struct{} // listeners created by Listen.
51 dhcpListeners map[*dhcpListener]struct{} // dhcpListeners created by Listen.
52
53 disp ipc.Dispatcher // dispatcher to serve RPCs
54 dispReserved ipc.Dispatcher // dispatcher for reserved methods
55 active sync.WaitGroup // active goroutines we've spawned.
56 stopped bool // whether the server has been stopped.
57 stoppedChan chan struct{} // closed when the server has been stopped.
58 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Nicolas LaCasse55a10f32014-11-26 13:25:53 -080059 ns naming.Namespace
60 servesMountTable bool
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080061 // TODO(cnicolaou): remove this when the publisher tracks published names
62 // and can return an appropriate error for RemoveName on a name that
63 // wasn't 'Added' for this server.
Todd Wang5739dda2014-11-16 22:44:02 -080064 names map[string]struct{}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070065 // TODO(cnicolaou): add roaming stats to ipcStats
Matt Rosencrantz3e76f282014-11-10 09:38:57 -080066 stats *ipcStats // stats for this server.
67 traceStore *ivtrace.Store // store for vtrace traces.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070068}
69
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070070var _ ipc.Server = (*server)(nil)
71
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070072type dhcpListener struct {
73 sync.Mutex
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080074 publisher *config.Publisher // publisher used to fork the stream
75 name string // name of the publisher stream
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080076 eps []*inaming.Endpoint // endpoint returned after listening
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080077 pubAddrs []ipc.Address // addresses to publish
78 pubPort string // port to use with the publish addresses
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070079 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070080}
81
Nicolas LaCasse55a10f32014-11-26 13:25:53 -080082// This option is used to sort and filter the endpoints when resolving the
83// proxy name from a mounttable.
84type PreferredServerResolveProtocols []string
85
86func (PreferredServerResolveProtocols) IPCServerOpt() {}
87
Matt Rosencrantz3e76f282014-11-10 09:38:57 -080088func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, store *ivtrace.Store, opts ...ipc.ServerOpt) (ipc.Server, error) {
89 ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
Bogdan Capritae7376312014-11-10 13:13:17 -080090 statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070091 s := &server{
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080092 ctx: ctx,
93 streamMgr: streamMgr,
94 publisher: publisher.New(ctx, ns, publishPeriod),
95 listeners: make(map[stream.Listener]struct{}),
96 dhcpListeners: make(map[*dhcpListener]struct{}),
97 stoppedChan: make(chan struct{}),
98 ns: ns,
99 stats: newIPCStats(statsPrefix),
100 traceStore: store,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700101 }
Bogdan Capritae7376312014-11-10 13:13:17 -0800102 var (
103 principal security.Principal
104 blessings security.Blessings
105 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700106 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700107 switch opt := opt.(type) {
108 case stream.ListenerOpt:
109 // Collect all ServerOpts that are also ListenerOpts.
110 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800111 switch opt := opt.(type) {
112 case vc.LocalPrincipal:
113 principal = opt.Principal
114 case options.ServerBlessings:
115 blessings = opt.Blessings
116 }
Asim Shankarcc044212014-10-15 23:25:26 -0700117 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700118 s.servesMountTable = bool(opt)
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700119 case options.ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800120 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800121 case PreferredServerResolveProtocols:
122 s.preferredProtocols = []string(opt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700123 }
124 }
Bogdan Capritae7376312014-11-10 13:13:17 -0800125 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
126 if blessings != nil {
127 // TODO(caprita): revist printing the blessings with %s, and
128 // instead expose them as a list.
129 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", blessings))
130 } else if principal != nil { // principal should have been passed in, but just in case.
131 stats.NewStringFunc(blessingsStatsName, func() string {
132 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
133 })
134 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700135 return s, nil
136}
137
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700138func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700139 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700140 s.Lock()
141 defer s.Unlock()
142 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800143 return nil, s.newBadState("ipc.Server.Stop already called")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700144 }
145 return s.publisher.Published(), nil
146}
147
Robin Thellend92b65a42014-12-17 14:30:16 -0800148// resolveToEndpoint resolves an object name or address to an endpoint.
149func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankardee311d2014-08-01 17:41:31 -0700150 var names []string
151 if s.ns != nil {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800152 var entry *naming.MountEntry
Asim Shankardee311d2014-08-01 17:41:31 -0700153 var err error
Ryan Brown6153c6c2014-12-11 13:10:09 -0800154 if entry, err = s.ns.ResolveX(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700155 return "", err
156 }
Ryan Brown6153c6c2014-12-11 13:10:09 -0800157 names = naming.ToStringSlice(entry)
Asim Shankardee311d2014-08-01 17:41:31 -0700158 } else {
159 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700160 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800161 // An empty set of protocols means all protocols...
162 ordered, err := filterAndOrderServers(names, s.preferredProtocols)
163 if err != nil {
164 return "", err
165 }
166 for _, n := range ordered {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800168 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700169 continue
170 }
171 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700172 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700173 }
174 }
Asim Shankardee311d2014-08-01 17:41:31 -0700175 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700176}
177
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800178func addrFromIP(ip net.IP) ipc.Address {
179 return &netstate.AddrIfc{
180 Addr: &net.IPAddr{IP: ip},
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800181 }
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800182}
183
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800184/*
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800185// getIPRoamingAddrs finds an appropriate set of addresss to publish
186// externally and also determines if it's sensible to allow roaming.
187// It returns the host address of the first suitable address that
188// can be used and the port number that can be used with all addresses.
189// The host is required to allow the caller to construct an endpoint
190// that can be returned to the caller of Listen.
191func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
192 host, port, err = net.SplitHostPort(iep.Address)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800193 if err != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800194 return nil, "", "", false, err
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800195 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800196 ip := net.ParseIP(host)
197 if ip == nil {
198 return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
199 }
200 if ip.IsUnspecified() && chooser != nil {
201 // Need to find a usable IP address since the call to listen
202 // didn't specify one.
203 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
204 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
205 phost := a[0].Address().String()
206 iep.Address = net.JoinHostPort(phost, port)
207 return a, phost, port, true, nil
208 }
209 }
210 return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
211 }
212 // Listen used a fixed IP address, which we take to mean that
213 // roaming is not desired.
214 return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
215}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800216*/
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800217
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800218// getPossbileAddrs returns an appropriate set of addresses that could be used
219// to contact the supplied protocol, host, port parameters using the supplied
220// chooser function. It returns an indication of whether the supplied address
221// was fully specified or not, returning false if the address was fully
222// specified, and true if it was not.
223func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
224 ip := net.ParseIP(host)
225 if ip == nil {
226 return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
227 }
228 if ip.IsUnspecified() {
229 if chooser != nil {
230 // Need to find a usable IP address since the call to listen
231 // didn't specify one.
232 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800233 a, err := chooser(protocol, addrs)
234 if err == nil && len(a) > 0 {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800235 return a, true, nil
236 }
237 }
238 }
239 // We don't have a chooser, so we just return the address the
240 // underlying system has chosen.
241 return []ipc.Address{addrFromIP(ip)}, true, nil
242 }
243 return []ipc.Address{addrFromIP(ip)}, false, nil
244}
245
246// createEndpoints creates appropriate inaming.Endpoint instances for
247// all of the externally accessible networrk addresses that can be used
248// to reach this server.
249func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
250 iep, ok := lep.(*inaming.Endpoint)
251 if !ok {
252 return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
253 }
254 if !strings.HasPrefix(iep.Protocol, "tcp") &&
255 !strings.HasPrefix(iep.Protocol, "ws") {
256 // If not tcp or ws, just return the endpoint we were given.
257 return []*inaming.Endpoint{iep}, false, nil
258 }
259
260 host, port, err := net.SplitHostPort(iep.Address)
261 if err != nil {
262 return nil, false, err
263 }
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800264 addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800265 if err != nil {
266 return nil, false, err
267 }
268 ieps := make([]*inaming.Endpoint, 0, len(addrs))
269 for _, addr := range addrs {
270 n, err := inaming.NewEndpoint(lep.String())
271 if err != nil {
272 return nil, false, err
273 }
274 n.IsMountTable = s.servesMountTable
275 //n.Protocol = addr.Address().Network()
276 n.Address = net.JoinHostPort(addr.Address().String(), port)
277 ieps = append(ieps, n)
278 }
279 return ieps, unspecified, nil
280}
281
282/*
283// configureEPAndRoaming configures the endpoint by filling in its Address
284// portion with the appropriately selected network address, it also
285// returns an indication of whether this endpoint is appropriate for
286// roaming and the set of addresses that should be published.
287func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800288 iep, ok := ep.(*inaming.Endpoint)
289 if !ok {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800290 return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800291 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800292 if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
293 !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
294 return false, nil, iep, nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800295 }
296 pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
297 if err != nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800298 return false, nil, iep, err
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800299 }
300 iep.Address = net.JoinHostPort(pubHost, pubPort)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800301 return roaming, pubAddrs, iep, nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700302}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800303*/
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700304
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800305type listenError struct {
306 err verror.E
307 errors map[struct{ Protocol, Address string }]error
308}
309
310func newError() *listenError {
311 return &listenError{errors: make(map[struct{ Protocol, Address string }]error)}
312}
313
314func ErrorDetails(le *listenError) map[struct{ Protocol, Address string }]error {
315 return le.errors
316}
317
318// Implements error
319func (le *listenError) Error() string {
320 s := le.err.Error()
321 for k, v := range le.errors {
322 s += fmt.Sprintf("(%s,%s:%s) ", k.Protocol, k.Address, v)
323 }
324 return strings.TrimRight(s, " ")
325}
326
327func (le *listenError) ErrorID() old_verror.ID {
328 return le.err.ErrorID()
329}
330
331func (le *listenError) Action() verror.ActionCode {
332 return le.err.Action()
333}
334
335func (le *listenError) Params() []interface{} {
336 return le.err.Params()
337}
338
339func (le *listenError) HasMessage() bool {
340 return le.err.HasMessage()
341}
342
343func (le *listenError) Stack() verror.PCs {
344 return le.err.Stack()
345}
346
347func (s *server) newBadState(m string) *listenError {
348 return &listenError{err: verror.Make(verror.BadState, s.ctx, m)}
349}
350
351func (s *server) newBadArg(m string) *listenError {
352 return &listenError{err: verror.Make(verror.BadArg, s.ctx, m)}
353}
354
355func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700356 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700357 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800358
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700359 // Shortcut if the server is stopped, to avoid needlessly creating a
360 // listener.
361 if s.stopped {
362 s.Unlock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800363 return nil, s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700364 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700365
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800366 useProxy := len(listenSpec.Proxy) > 0
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800367
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800368 // Start the proxy as early as possible.
369 if useProxy {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800370 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800371 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800372 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800373 s.proxyListenLoop(listenSpec.Proxy)
374 s.active.Done()
375 }()
376 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800377 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700378
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800379 var ieps []*inaming.Endpoint
380
381 type lnInfo struct {
382 ln stream.Listener
383 ep naming.Endpoint
384 }
385 linfo := []lnInfo{}
386 closeAll := func(lni []lnInfo) {
387 for _, li := range lni {
388 li.ln.Close()
389 }
390 }
391
392 roaming := false
393 for _, addr := range listenSpec.Addrs {
394 if len(addr.Address) > 0 {
395 // Listen if we have a local address to listen on. Some situations
396 // just need a proxy (e.g. a browser extension).
397 tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
398 if err != nil {
399 closeAll(linfo)
400 vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
401 return nil, err
402 }
403 linfo = append(linfo, lnInfo{tmpln, lep})
404 tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
405 if err != nil {
406 closeAll(linfo)
407 return nil, err
408 }
409 ieps = append(ieps, tmpieps...)
410 if tmpRoaming {
411 roaming = true
412 }
413 }
414 }
415
416 // TODO(cnicolaou): write a test for all of these error cases.
417 if len(ieps) == 0 {
418 if useProxy {
419 return nil, nil
420 }
421 // no proxy.
422 if len(listenSpec.Addrs) > 0 {
423 return nil, fmt.Errorf("no endpoints")
424 }
425 return nil, fmt.Errorf("no proxy and no addresses requested")
426 }
427
428 // TODO(cnicolaou): return all of the eps and their errors....
429 s.Lock()
430 defer s.Unlock()
431 if s.stopped {
432 closeAll(linfo)
433 return nil, errServerStopped
434 }
435
436 if roaming && listenSpec.StreamPublisher != nil {
437 // TODO(cnicolaou): renable roaming in a followup CL.
438 /*
439 var dhcpl *dhcpListener
440 streamName := listenSpec.StreamName
441 ch := make(chan config.Setting)
442 if _, err := publisher.ForkStream(streamName, ch); err != nil {
443 return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
444 }
445 dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
446 // We have a goroutine to listen for dhcp changes.
447 s.active.Add(1)
448 go func() {
449 s.dhcpLoop(dhcpl)
450 s.active.Done()
451 }()
452 s.dhcpListeners[dhcpl] = struct{}{}
453 */
454 }
455
456 for _, li := range linfo {
457 s.listeners[li.ln] = struct{}{}
458 // We have a goroutine per listener to accept new flows.
459 // Each flow is served from its own goroutine.
460 s.active.Add(1)
461 go func(ln stream.Listener, ep naming.Endpoint) {
462 s.listenLoop(ln, ep)
463 s.active.Done()
464 }(li.ln, li.ep)
465 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800466 eps := make([]naming.Endpoint, len(ieps))
467 for i, iep := range ieps {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800468 s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800469 eps[i] = iep
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800470 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800471 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700472}
473
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800474func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800475 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800476 if err != nil {
477 return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
478 }
479 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
480 if err != nil {
481 return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
482 }
483 iep, ok := ep.(*inaming.Endpoint)
484 if !ok {
485 ln.Close()
486 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
487 }
488 s.Lock()
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800489 s.listeners[ln] = struct{}{}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800490 s.Unlock()
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800491 s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800492 return iep, ln, nil
493}
494
495func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700496 const (
497 min = 5 * time.Millisecond
498 max = 5 * time.Minute
499 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800500
501 iep, ln, err := s.reconnectAndPublishProxy(proxy)
502 if err != nil {
503 vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
504 }
505 // the initial connection maybe have failed, but we enter the retry
506 // loop anyway so that we will continue to try and connect to the
507 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800508 s.Lock()
509 if s.stopped {
510 s.Unlock()
511 return
512 }
513 s.Unlock()
514
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700515 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800516 if ln != nil && iep != nil {
517 s.listenLoop(ln, iep)
518 // The listener is done, so:
519 // (1) Unpublish its name
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800520 s.publisher.RemoveServer(naming.JoinAddressName(iep.String(), ""))
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800521 }
522
523 s.Lock()
524 if s.stopped {
525 s.Unlock()
526 return
527 }
528 s.Unlock()
529
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700530 // (2) Reconnect to the proxy unless the server has been stopped
531 backoff := min
532 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800533 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700534 select {
535 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700536 if backoff = backoff * 2; backoff > max {
537 backoff = max
538 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700539 case <-s.stoppedChan:
540 return
541 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800542 // (3) reconnect, publish new address
543 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
544 vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
545 } else {
546 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
547 break
548 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700549 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700550 }
551}
552
553func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800554 defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800555 var calls sync.WaitGroup
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700556 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800557 calls.Wait()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700558 s.Lock()
559 delete(s.listeners, ln)
560 s.Unlock()
561 }()
562 for {
563 flow, err := ln.Accept()
564 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800565 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ep, err)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700566 return
567 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800568 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700569 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800570 defer calls.Done()
571 fs, err := newFlowServer(flow, s)
572 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800573 vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800574 return
575 }
576 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800577 // TODO(caprita): Logging errors here is too spammy. For example, "not
578 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800579 if err != io.EOF {
580 vlog.Errorf("Flow serve on %v failed: %v", ep, err)
581 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700582 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700583 }(flow)
584 }
585}
586
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800587/*
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700588func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
589 dhcpl.Lock()
590 defer dhcpl.Unlock()
591 for _, a := range addrs {
592 if ip := netstate.AsIP(a); ip != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800593 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800594 fn(dhcpl.ep.String())
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700595 }
596 }
597}
598
599func (s *server) dhcpLoop(dhcpl *dhcpListener) {
600 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
601 vlog.VI(2).Infof("ipc: dhcp loop")
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800602
603 ep := *dhcpl.ep
604 // Publish all of the addresses
605 for _, pubAddr := range dhcpl.pubAddrs {
606 ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800607 s.publisher.AddServer(naming.JoinAddressName(ep.String(), ""), s.servesMountTable)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800608 }
609
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700610 for setting := range dhcpl.ch {
611 if setting == nil {
612 return
613 }
614 switch v := setting.Value().(type) {
615 case bool:
616 return
617 case []net.Addr:
618 s.Lock()
619 if s.stopped {
620 s.Unlock()
621 return
622 }
623 publisher := s.publisher
624 s.Unlock()
625 switch setting.Name() {
626 case ipc.NewAddrsSetting:
627 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700628 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700629 case ipc.RmAddrsSetting:
630 vlog.Infof("Removed some addresses: %q", v)
631 s.applyChange(dhcpl, v, publisher.RemoveServer)
632 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700633 }
634 }
635}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800636*/
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700637
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800638func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800639 if obj == nil {
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800640 // The ReflectInvoker inside the LeafDispatcher will panic
641 // if called for a nil value.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800642 return s.newBadArg("nil object")
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800643 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800644 return s.ServeDispatcher(name, ipc.LeafDispatcher(obj, authorizer))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800645}
646
647func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700648 s.Lock()
649 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800650 ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
651
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700652 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800653 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700654 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800655 if disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800656 return s.newBadArg("nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700657 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800658 if s.disp != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800659 return s.newBadState("ipc.Server.Serve/ServeDispatcher already called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700660 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800661 s.disp = disp
662 s.names = make(map[string]struct{})
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700663 if len(name) > 0 {
664 s.publisher.AddName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800665 s.names[name] = struct{}{}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700666 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700667 return nil
668}
669
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800670func (s *server) AddName(name string) error {
671 s.Lock()
672 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800673 ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800674 if len(name) == 0 {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800675 return s.newBadArg("name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800676 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800677 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800678 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800679 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800680 if s.disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800681 return s.newBadState("adding a name before calling Serve or ServeDispatcher is not allowed")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800682 }
683 s.publisher.AddName(name)
684 // TODO(cnicolaou): remove this map when the publisher's RemoveName
685 // method returns an error.
686 s.names[name] = struct{}{}
687 return nil
688}
689
690func (s *server) RemoveName(name string) error {
691 s.Lock()
692 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800693 ivtrace.FromContext(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800694 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800695 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800696 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800697 if s.disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800698 return s.newBadState("removing name before calling Serve or ServeDispatcher is not allowed")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800699 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800700 if _, present := s.names[name]; !present {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800701 return s.newBadArg(fmt.Sprintf("%q has not been previously used for this server", name))
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800702 }
703 s.publisher.RemoveName(name)
704 delete(s.names, name)
705 return nil
706}
707
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700708func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700709 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700710 s.Lock()
711 if s.stopped {
712 s.Unlock()
713 return nil
714 }
715 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700716 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700717 s.Unlock()
718
Robin Thellenddf428232014-10-06 12:50:44 -0700719 // Delete the stats object.
720 s.stats.stop()
721
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700722 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
723 // server lock, since publisher is safe for concurrent access.
724
725 // Stop the publisher, which triggers unmounting of published names.
726 s.publisher.Stop()
727 // Wait for the publisher to be done unmounting before we can proceed to
728 // close the listeners (to minimize the number of mounted names pointing
729 // to endpoint that are no longer serving).
730 //
731 // TODO(caprita): See if make sense to fail fast on rejecting
732 // connections once listeners are closed, and parallelize the publisher
733 // and listener shutdown.
734 s.publisher.WaitForStop()
735
736 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800737
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700738 // Close all listeners. No new flows will be accepted, while in-flight
739 // flows will continue until they terminate naturally.
740 nListeners := len(s.listeners)
741 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700742
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800743 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700744 go func(ln stream.Listener) {
745 errCh <- ln.Close()
746 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800747 }
748 for dhcpl, _ := range s.dhcpListeners {
749 dhcpl.Lock()
750 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
751 dhcpl.ch <- config.NewBool("EOF", "stop", true)
752 dhcpl.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700753 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800754
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700755 s.Unlock()
756 var firstErr error
757 for i := 0; i < nListeners; i++ {
758 if err := <-errCh; err != nil && firstErr == nil {
759 firstErr = err
760 }
761 }
762 // At this point, we are guaranteed that no new requests are going to be
763 // accepted.
764
765 // Wait for the publisher and active listener + flows to finish.
766 s.active.Wait()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800767
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700768 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800769 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700770 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800771 if firstErr != nil {
772 return verror.Make(verror.Internal, s.ctx, firstErr)
773 }
774 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700775}
776
Todd Wang34ed4c62014-11-26 15:15:52 -0800777// TODO(toddw): Remove these interfaces after the vom2 transition.
778type vomEncoder interface {
779 Encode(v interface{}) error
780}
781
782type vomDecoder interface {
783 Decode(v interface{}) error
784}
785
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700786// flowServer implements the RPC server-side protocol for a single RPC, over a
787// flow that's already connected to the client.
788type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700789 context.T
Todd Wang5739dda2014-11-16 22:44:02 -0800790 server *server // ipc.Server that this flow server belongs to
791 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
Todd Wang34ed4c62014-11-26 15:15:52 -0800792 dec vomDecoder // to decode requests and args from the client
793 enc vomEncoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -0800794 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700795
Asim Shankar220a0152014-10-30 21:21:09 -0700796 // Fields filled in during the server invocation.
797 blessings security.Blessings
798 method, suffix string
Asim Shankar0cad0832014-11-04 01:27:38 -0800799 tags []interface{}
Asim Shankar220a0152014-10-30 21:21:09 -0700800 discharges map[string]security.Discharge
Asim Shankar0cad0832014-11-04 01:27:38 -0800801 starttime time.Time
Asim Shankar220a0152014-10-30 21:21:09 -0700802 endStreamArgs bool // are the stream args at EOF?
803 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700804}
805
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700806var _ ipc.Stream = (*flowServer)(nil)
807
Todd Wang34ed4c62014-11-26 15:15:52 -0800808func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700809 server.Lock()
810 disp := server.disp
811 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700812
Todd Wang34ed4c62014-11-26 15:15:52 -0800813 fs := &flowServer{
814 T: server.ctx,
815 server: server,
816 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -0800817 flow: flow,
818 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700819 }
Todd Wang34ed4c62014-11-26 15:15:52 -0800820 if vom2.IsEnabled() {
821 var err error
822 if fs.dec, err = vom2.NewDecoder(flow); err != nil {
823 flow.Close()
824 return nil, err
825 }
826 if fs.enc, err = vom2.NewBinaryEncoder(flow); err != nil {
827 flow.Close()
828 return nil, err
829 }
830 } else {
831 fs.dec = vom.NewDecoder(flow)
832 fs.enc = vom.NewEncoder(flow)
833 }
834 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700835}
836
837// Vom does not encode untyped nils.
838// Consequently, the ipc system does not allow nil results with an interface
839// type from server methods. The one exception being errors.
840//
841// For now, the following hacky assumptions are made, which will be revisited when
842// a decision is made on how untyped nils should be encoded/decoded in
843// vom/vom2:
844//
845// - Server methods return 0 or more results
846// - Any values returned by the server that have an interface type are either
847// non-nil or of type error.
Todd Wang34ed4c62014-11-26 15:15:52 -0800848func vomErrorHack(res interface{}) vom.Value {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700849 v := vom.ValueOf(res)
850 if !v.IsValid() {
851 // Untyped nils are assumed to be nil-errors.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800852 var boxed old_verror.E
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700853 return vom.ValueOf(&boxed).Elem()
854 }
855 if err, iserr := res.(error); iserr {
856 // Convert errors to verror since errors are often not
857 // serializable via vom/gob (errors.New and fmt.Errorf return a
858 // type with no exported fields).
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800859 return vom.ValueOf(old_verror.Convert(err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700860 }
861 return v
862}
863
Todd Wang34ed4c62014-11-26 15:15:52 -0800864// TODO(toddw): Remove this function and encodeValueHack after the vom2 transition.
865func vom2ErrorHack(res interface{}) interface{} {
866 if err, ok := res.(error); ok {
867 return &err
868 }
869 return res
870}
871
872// TODO(toddw): Remove this function and vom2ErrorHack after the vom2 transition.
873func (fs *flowServer) encodeValueHack(res interface{}) error {
874 if vom2.IsEnabled() {
875 return fs.enc.Encode(vom2ErrorHack(res))
876 }
877 return fs.enc.(*vom.Encoder).EncodeValue(vomErrorHack(res))
878}
879
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700880func (fs *flowServer) serve() error {
881 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700882
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700883 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700884
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700885 ivtrace.FromContext(fs).Finish()
886
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700887 var traceResponse vtrace.Response
888 if fs.allowDebug {
889 traceResponse = ivtrace.Response(fs)
890 }
891
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700892 // Respond to the client with the response header and positional results.
893 response := ipc.Response{
894 Error: err,
895 EndStreamResults: true,
896 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700897 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700898 }
899 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800900 if err == io.EOF {
901 return err
902 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800903 return old_verror.BadProtocolf("ipc: response encoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700904 }
905 if response.Error != nil {
906 return response.Error
907 }
908 for ix, res := range results {
Todd Wang34ed4c62014-11-26 15:15:52 -0800909 if err := fs.encodeValueHack(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800910 if err == io.EOF {
911 return err
912 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800913 return old_verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700914 }
915 }
916 // TODO(ashankar): Should unread data from the flow be drained?
917 //
918 // Reason to do so:
919 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
920 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
921 // slices will not be returned to the pool leading to possibly increased memory usage.
922 //
923 // Reason to not do so:
924 // Draining here will conflict with any Reads on the flow in a separate goroutine
925 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
926 //
927 // For now, go with the reason to not do so as having unread data in the stream
928 // should be a rare case.
929 return nil
930}
931
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800932func (fs *flowServer) readIPCRequest() (*ipc.Request, old_verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700933 // Set a default timeout before reading from the flow. Without this timeout,
934 // a client that sends no request or a partial request will retain the flow
935 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700936 initTimer := newTimer(defaultCallTimeout)
937 defer initTimer.Stop()
938 fs.flow.SetDeadline(initTimer.C)
939
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700940 // Decode the initial request.
941 var req ipc.Request
942 if err := fs.dec.Decode(&req); err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800943 return nil, old_verror.BadProtocolf("ipc: request decoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700944 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700945 return &req, nil
946}
947
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800948func (fs *flowServer) processRequest() ([]interface{}, old_verror.E) {
Asim Shankar0cad0832014-11-04 01:27:38 -0800949 fs.starttime = time.Now()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700950 req, verr := fs.readIPCRequest()
951 if verr != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700952 // We don't know what the ipc call was supposed to be, but we'll create
953 // a placeholder span so we can capture annotations.
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800954 fs.T, _ = ivtrace.WithNewSpan(fs, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
Matt Rosencrantz86897932014-10-02 09:34:34 -0700955 return nil, verr
956 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700957 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -0800958 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -0700959
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700960 // TODO(mattr): Currently this allows users to trigger trace collection
961 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800962 // results later. This might be considered a DOS vector.
963 spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800964 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest, fs.server.traceStore)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700965
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700966 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700967 if req.Timeout != ipc.NoTimeout {
Asim Shankar0cad0832014-11-04 01:27:38 -0800968 fs.T, cancel = fs.WithDeadline(fs.starttime.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700969 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700970 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700971 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700972 fs.flow.SetDeadline(fs.Done())
Todd Wang5739dda2014-11-16 22:44:02 -0800973 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700974
Todd Wang5739dda2014-11-16 22:44:02 -0800975 // Initialize security: blessings, discharges, etc.
976 if verr := fs.initSecurity(req); verr != nil {
977 return nil, verr
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700978 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700979 // Lookup the invoker.
Todd Wang5739dda2014-11-16 22:44:02 -0800980 invoker, auth, verr := fs.lookup(fs.suffix, &fs.method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700981 if verr != nil {
982 return nil, verr
983 }
984 // Prepare invoker and decode args.
985 numArgs := int(req.NumPosArgs)
Robin Thellendb16d7162014-11-07 13:47:26 -0800986 argptrs, tags, err := invoker.Prepare(fs.method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -0800987 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700988 if err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800989 return nil, old_verror.Makef(old_verror.ErrorID(err), "%s: name: %q", err, fs.suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700990 }
991 if len(argptrs) != numArgs {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800992 return nil, old_verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", fs.method, fs.suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700993 }
994 for ix, argptr := range argptrs {
995 if err := fs.dec.Decode(argptr); err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800996 return nil, old_verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700997 }
998 }
Todd Wang5739dda2014-11-16 22:44:02 -0800999 // Check application's authorization policy.
1000 if verr := authorize(fs, auth); verr != nil {
1001 return nil, verr
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001002 }
Todd Wang5739dda2014-11-16 22:44:02 -08001003 // Check if the caller is permitted to view debug information.
Asim Shankar68885192014-11-26 12:48:35 -08001004 // TODO(mattr): Is access.Debug the right thing to check?
Todd Wang5739dda2014-11-16 22:44:02 -08001005 fs.allowDebug = authorize(debugContext{fs}, auth) == nil
1006 // Invoke the method.
Robin Thellendb16d7162014-11-07 13:47:26 -08001007 results, err := invoker.Invoke(fs.method, fs, argptrs)
1008 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001009 return results, old_verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001010}
1011
Todd Wang5739dda2014-11-16 22:44:02 -08001012func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1013 // Ensure that the context gets cancelled if the flow is closed
1014 // due to a network error, or client cancellation.
1015 select {
1016 case <-fs.flow.Closed():
1017 // Here we remove the contexts channel as a deadline to the flow.
1018 // We do this to ensure clients get a consistent error when they read/write
1019 // after the flow is closed. Since the flow is already closed, it doesn't
1020 // matter that the context is also cancelled.
1021 fs.flow.SetDeadline(nil)
1022 cancel()
1023 case <-fs.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001024 }
Todd Wang5739dda2014-11-16 22:44:02 -08001025}
1026
1027// lookup returns the invoker and authorizer responsible for serving the given
1028// name and method. The suffix is stripped of any leading slashes. If it begins
1029// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
1030// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1031// value may be modified to match the actual suffix and method to use.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001032func (fs *flowServer) lookup(suffix string, method *string) (ipc.Invoker, security.Authorizer, old_verror.E) {
Todd Wang5739dda2014-11-16 22:44:02 -08001033 if naming.IsReserved(*method) {
1034 // All reserved methods are trapped and handled here, by removing the
1035 // reserved prefix and invoking them on reservedMethods. E.g. "__Glob"
1036 // invokes reservedMethods.Glob.
1037 *method = naming.StripReserved(*method)
1038 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
1039 }
1040 disp := fs.disp
1041 if naming.IsReserved(suffix) {
1042 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -07001043 }
1044 if disp != nil {
Robin Thellenda02fe8f2014-11-19 09:58:29 -08001045 obj, auth, err := disp.Lookup(suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001046 switch {
1047 case err != nil:
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001048 return nil, nil, old_verror.Convert(err)
Todd Wang5739dda2014-11-16 22:44:02 -08001049 case obj != nil:
1050 return objectToInvoker(obj), auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001051 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001052 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001053 return nil, nil, old_verror.NoExistf("ipc: invoker not found for %q", suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001054}
1055
1056func objectToInvoker(obj interface{}) ipc.Invoker {
1057 if obj == nil {
1058 return nil
1059 }
1060 if invoker, ok := obj.(ipc.Invoker); ok {
1061 return invoker
1062 }
1063 return ipc.ReflectInvoker(obj)
1064}
1065
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001066func (fs *flowServer) initSecurity(req *ipc.Request) old_verror.E {
Todd Wang5739dda2014-11-16 22:44:02 -08001067 // If additional credentials are provided, make them available in the context
1068 blessings, err := security.NewBlessings(req.GrantedBlessings)
1069 if err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001070 return old_verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
Todd Wang5739dda2014-11-16 22:44:02 -08001071 }
1072 fs.blessings = blessings
1073 // Detect unusable blessings now, rather then discovering they are unusable on
1074 // first use.
1075 //
1076 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1077 // the server's identity as the blessing. Figure out what we want to do about
1078 // this - should servers be able to assume that a blessing is something that
1079 // does not have the authorizations that the server's own identity has?
1080 if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001081 return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Todd Wang5739dda2014-11-16 22:44:02 -08001082 }
1083 // Receive third party caveat discharges the client sent
1084 for i := uint64(0); i < req.NumDischarges; i++ {
1085 var d security.Discharge
1086 if err := fs.dec.Decode(&d); err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001087 return old_verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
Todd Wang5739dda2014-11-16 22:44:02 -08001088 }
1089 fs.discharges[d.ID()] = d
1090 }
1091 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001092}
1093
1094type acceptAllAuthorizer struct{}
1095
1096func (acceptAllAuthorizer) Authorize(security.Context) error {
1097 return nil
1098}
1099
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001100func authorize(ctx security.Context, auth security.Authorizer) old_verror.E {
Todd Wang5739dda2014-11-16 22:44:02 -08001101 if ctx.LocalPrincipal() == nil {
1102 // LocalPrincipal is nil means that the server wanted to avoid
1103 // authentication, and thus wanted to skip authorization as well.
1104 return nil
1105 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001106 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001107 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001108 }
Todd Wang5739dda2014-11-16 22:44:02 -08001109 if err := auth.Authorize(ctx); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -07001110 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001111 return old_verror.NoAccessf("ipc: not authorized to call %q.%q (%v)", ctx.Suffix(), ctx.Method(), err)
Asim Shankara5457f02014-10-24 23:23:07 -07001112 }
1113 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001114}
1115
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001116// debugContext is a context which wraps another context but always returns
Asim Shankar68885192014-11-26 12:48:35 -08001117// the debug tag.
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001118type debugContext struct {
1119 security.Context
1120}
1121
Asim Shankar68885192014-11-26 12:48:35 -08001122func (debugContext) MethodTags() []interface{} {
1123 return []interface{}{access.Debug}
1124}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001125
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001126// Send implements the ipc.Stream method.
1127func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001128 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001129 // The empty response header indicates what follows is a streaming result.
1130 if err := fs.enc.Encode(ipc.Response{}); err != nil {
1131 return err
1132 }
1133 return fs.enc.Encode(item)
1134}
1135
1136// Recv implements the ipc.Stream method.
1137func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001138 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001139 var req ipc.Request
1140 if err := fs.dec.Decode(&req); err != nil {
1141 return err
1142 }
1143 if req.EndStreamArgs {
1144 fs.endStreamArgs = true
1145 return io.EOF
1146 }
1147 return fs.dec.Decode(itemptr)
1148}
1149
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001150// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001151
Asim Shankar2519cc12014-11-10 21:16:53 -08001152func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001153 //nologcall
1154 return fs.discharges
1155}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001156func (fs *flowServer) Server() ipc.Server {
1157 //nologcall
1158 return fs.server
1159}
Asim Shankar0cad0832014-11-04 01:27:38 -08001160func (fs *flowServer) Timestamp() time.Time {
1161 //nologcall
1162 return fs.starttime
1163}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001164func (fs *flowServer) Method() string {
1165 //nologcall
1166 return fs.method
1167}
Asim Shankar0cad0832014-11-04 01:27:38 -08001168func (fs *flowServer) MethodTags() []interface{} {
1169 //nologcall
1170 return fs.tags
1171}
Matt Rosencrantz04d197c2014-12-12 08:39:25 -08001172func (fs *flowServer) Context() context.T {
1173 return fs.T
1174}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001175
1176// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1177// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001178func (fs *flowServer) Name() string {
1179 //nologcall
1180 return fs.suffix
1181}
1182func (fs *flowServer) Suffix() string {
1183 //nologcall
1184 return fs.suffix
1185}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001186func (fs *flowServer) LocalPrincipal() security.Principal {
1187 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001188 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001189}
1190func (fs *flowServer) LocalBlessings() security.Blessings {
1191 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001192 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001193}
1194func (fs *flowServer) RemoteBlessings() security.Blessings {
1195 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001196 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001197}
Asim Shankar8f05c222014-10-06 22:08:19 -07001198func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001199 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001200 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001201}
1202func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1203 //nologcall
1204 return fs.flow.LocalEndpoint()
1205}
1206func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1207 //nologcall
1208 return fs.flow.RemoteEndpoint()
1209}