blob: 115615f3e6e211a58086029b410c7c8cf82582ae [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron2"
13 "veyron.io/veyron/veyron2/config"
14 "veyron.io/veyron/veyron2/context"
15 "veyron.io/veyron/veyron2/ipc"
16 "veyron.io/veyron/veyron2/ipc/stream"
17 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070018 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070019 "veyron.io/veyron/veyron2/security"
Robin Thellendc26c32e2014-10-06 17:44:04 -070020 mttypes "veyron.io/veyron/veyron2/services/mounttable/types"
Jiri Simsa519c5072014-09-17 21:37:57 -070021 "veyron.io/veyron/veyron2/verror"
22 "veyron.io/veyron/veyron2/vlog"
23 "veyron.io/veyron/veyron2/vom"
24 "veyron.io/veyron/veyron2/vtrace"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070025
26 "veyron.io/veyron/veyron/lib/glob"
27 "veyron.io/veyron/veyron/lib/netstate"
28 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
29 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070030 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070031)
32
33var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070034 errServerStopped = verror.Abortedf("ipc: server is stopped")
35)
36
Jiri Simsa5293dcb2014-05-10 09:56:38 -070037type server struct {
38 sync.Mutex
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070039 ctx context.T // context used by the server to make internal RPCs.
40 streamMgr stream.Manager // stream manager to listen for new flows.
41 publisher publisher.Publisher // publisher to publish mounttable mounts.
42 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
43 listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
44 disp ipc.Dispatcher // dispatcher to serve RPCs
45 active sync.WaitGroup // active goroutines we've spawned.
46 stopped bool // whether the server has been stopped.
47 stoppedChan chan struct{} // closed when the server has been stopped.
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070048 ns naming.Namespace
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070049 servesMountTable bool
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -070050 reservedOpt options.ReservedNameDispatcher
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070051 // TODO(cnicolaou): add roaming stats to ipcStats
52 stats *ipcStats // stats for this server.
53}
54
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070055var _ ipc.Server = (*server)(nil)
56
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070057type dhcpListener struct {
58 sync.Mutex
59 publisher *config.Publisher // publisher used to fork the stream
60 name string // name of the publisher stream
61 ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
62 port string
63 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070064}
65
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070066func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070067 s := &server{
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070068 ctx: ctx,
69 streamMgr: streamMgr,
70 publisher: publisher.New(ctx, ns, publishPeriod),
71 listeners: make(map[stream.Listener]*dhcpListener),
72 stoppedChan: make(chan struct{}),
73 ns: ns,
74 stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070075 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070076 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -070077 switch opt := opt.(type) {
78 case stream.ListenerOpt:
79 // Collect all ServerOpts that are also ListenerOpts.
80 s.listenerOpts = append(s.listenerOpts, opt)
Asim Shankarcc044212014-10-15 23:25:26 -070081 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070082 s.servesMountTable = bool(opt)
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -070083 case options.ReservedNameDispatcher:
84 s.reservedOpt = opt
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085 }
86 }
87 return s, nil
88}
89
Jiri Simsa5293dcb2014-05-10 09:56:38 -070090func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -070091 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092 s.Lock()
93 defer s.Unlock()
94 if s.stopped {
95 return nil, errServerStopped
96 }
97 return s.publisher.Published(), nil
98}
99
100// resolveToAddress will try to resolve the input to an address using the
101// mount table, if the input is not already an address.
Asim Shankardee311d2014-08-01 17:41:31 -0700102func (s *server) resolveToAddress(address string) (string, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700104 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700105 }
Asim Shankardee311d2014-08-01 17:41:31 -0700106 var names []string
107 if s.ns != nil {
108 var err error
109 if names, err = s.ns.Resolve(s.ctx, address); err != nil {
110 return "", err
111 }
112 } else {
113 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700114 }
115 for _, n := range names {
116 address, suffix := naming.SplitAddressName(n)
Asim Shankardee311d2014-08-01 17:41:31 -0700117 if suffix != "" && suffix != "//" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700118 continue
119 }
120 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700121 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700122 }
123 }
Asim Shankardee311d2014-08-01 17:41:31 -0700124 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700125}
126
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700127/*
Cosmos Nicolaouf889c732014-10-16 20:46:54 -0700128// ipAddressChooser returns the preferred IP address, which is,
129// a public IPv4 address, then any non-loopback IPv4, then a public
130// IPv6 address and finally any non-loopback/link-local IPv6
131// It is replicated here to avoid a circular dependency and will, in any case,
132// go away when we transition away from Listen to the ListenX API.
133func ipAddressChooser(network string, addrs []ipc.Address) ([]ipc.Address, error) {
134 if !netstate.IsIPProtocol(network) {
135 return nil, fmt.Errorf("can't support network protocol %q", network)
136 }
137 accessible := netstate.AddrList(addrs)
138
139 // Try and find an address on a interface with a default route.
140 predicates := []netstate.AddressPredicate{netstate.IsPublicUnicastIPv4,
141 netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6}
142 for _, predicate := range predicates {
143 if addrs := accessible.Filter(predicate); len(addrs) > 0 {
144 onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
145 if len(onDefaultRoutes) > 0 {
146 return onDefaultRoutes, nil
147 }
148 }
149 }
150
151 // We failed to find any addresses with default routes, try again
152 // but without the default route requirement.
153 for _, predicate := range predicates {
154 if addrs := accessible.Filter(predicate); len(addrs) > 0 {
155 return addrs, nil
156 }
157 }
158
159 return nil, fmt.Errorf("failed to find any usable address for %q", network)
160}
161
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700162func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700163 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700164 s.Lock()
165 // Shortcut if the server is stopped, to avoid needlessly creating a
166 // listener.
167 if s.stopped {
168 s.Unlock()
169 return nil, errServerStopped
170 }
171 s.Unlock()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700172 var proxyName string
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700173 if protocol == inaming.Network {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700174 proxyName = address
Asim Shankardee311d2014-08-01 17:41:31 -0700175 var err error
176 if address, err = s.resolveToAddress(address); err != nil {
177 return nil, err
178 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700179 }
Asim Shankarcc044212014-10-15 23:25:26 -0700180 // TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700181 // it can more cleanly set the IsMountTable bit in the endpoint.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700182 ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
183 if err != nil {
184 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
185 return nil, err
186 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700187 iep, ok := ep.(*inaming.Endpoint)
188 if !ok {
189 return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
190 }
191
Robin Thellend7f424272014-09-04 10:42:14 -0700192 if protocol != inaming.Network {
193 // We know the endpoint format, so we crack it open...
194 switch iep.Protocol {
195 case "tcp", "tcp4", "tcp6":
196 host, port, err := net.SplitHostPort(iep.Address)
197 if err != nil {
198 return nil, err
199 }
200 ip := net.ParseIP(host)
201 if ip == nil {
202 return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
203 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700204 if ip.IsUnspecified() {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700205 addrs, err := netstate.GetAccessibleIPs()
206 if err == nil {
Cosmos Nicolaouf889c732014-10-16 20:46:54 -0700207 if a, err := ipAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700208 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
Cosmos Nicolaou9a246552014-08-29 13:07:29 -0700209 }
Cosmos Nicolaouf7a11d92014-08-29 09:56:07 -0700210 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700211 }
212 }
213 }
214
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215 s.Lock()
216 if s.stopped {
217 s.Unlock()
218 // Ignore error return since we can't really do much about it.
219 ln.Close()
220 return nil, errServerStopped
221 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700222 s.listeners[ln] = nil
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700223 // We have a single goroutine per listener to accept new flows.
224 // Each flow is served from its own goroutine.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700225 s.active.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700226 if protocol == inaming.Network {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700227 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700228 s.proxyListenLoop(ln, ep, proxy)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700229 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700230 }(ln, iep, proxyName)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700231 } else {
232 go func(ln stream.Listener, ep naming.Endpoint) {
233 s.listenLoop(ln, ep)
234 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700235 }(ln, iep)
Bogdan Caprita187269b2014-05-13 19:59:46 -0700236 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700237 s.Unlock()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700238 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700239 return ep, nil
240}
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700241*/
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700242
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700243// externalEndpoint examines the endpoint returned by the stream listen call
244// and fills in the address to publish to the mount table. It also returns the
245// IP host address that it selected for publishing to the mount table.
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700246func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700247 // We know the endpoint format, so we crack it open...
248 iep, ok := lep.(*inaming.Endpoint)
249 if !ok {
250 return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
251 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700252 switch iep.Protocol {
253 case "tcp", "tcp4", "tcp6":
254 host, port, err := net.SplitHostPort(iep.Address)
255 if err != nil {
256 return nil, nil, err
257 }
258 ip := net.ParseIP(host)
259 if ip == nil {
260 return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
261 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700262 if ip.IsUnspecified() && chooser != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700263 // Need to find a usable IP address since the call to listen
264 // didn't specify one.
265 addrs, err := netstate.GetAccessibleIPs()
266 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700267 // TODO(cnicolaou): we could return multiple addresses here,
268 // all of which can be exported to the mount table. Look at
269 // this after we transition fully to ListenX.
270 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
271 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
272 return iep, a[0].Address().(*net.IPAddr), nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700273 }
274 }
275 } else {
276 // Listen used a fixed IP address, which essentially disables
277 // roaming.
278 return iep, nil, nil
279 }
280 }
281 return iep, nil, nil
282}
283
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700284func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700285 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700286 s.Lock()
287 // Shortcut if the server is stopped, to avoid needlessly creating a
288 // listener.
289 if s.stopped {
290 s.Unlock()
291 return nil, errServerStopped
292 }
293 s.Unlock()
294
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700295 protocol := listenSpec.Protocol
296 address := listenSpec.Address
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700297 proxyAddress := ""
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700298 if len(listenSpec.Proxy) > 0 {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700299 if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
300 return nil, err
301 } else {
302 proxyAddress = address
303 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700304 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700305
306 ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
307 if err != nil {
308 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
309 return nil, err
310 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700311 ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700312 if err != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700313 ln.Close()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700314 return nil, err
315 }
316
317 s.Lock()
318 if s.stopped {
319 s.Unlock()
320 // Ignore error return since we can't really do much about it.
321 ln.Close()
322 return nil, errServerStopped
323 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700324
Srdjan Petrovic69381a92014-10-30 13:20:55 -0700325 var ip net.IP
326 if ipaddr != nil {
327 ip = net.ParseIP(ipaddr.String())
328 } else {
329 vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
330 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700331 publisher := listenSpec.StreamPublisher
Srdjan Petrovic69381a92014-10-30 13:20:55 -0700332 if ip != nil && !ip.IsLoopback() && publisher != nil {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700333 streamName := listenSpec.StreamName
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700334 ch := make(chan config.Setting)
335 _, err := publisher.ForkStream(streamName, ch)
336 if err != nil {
337 return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
338 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700339 _, port, _ := net.SplitHostPort(ep.Address)
340 dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
341
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700342 // We have a goroutine to listen for dhcp changes.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700343 s.active.Add(1)
344 // goroutine to listen for address changes.
345 go func(dl *dhcpListener) {
346 s.dhcpLoop(dl)
347 s.active.Done()
348 }(dhcpl)
349 s.listeners[ln] = dhcpl
350 } else {
351 s.listeners[ln] = nil
352 }
353
354 // We have a goroutine per listener to accept new flows.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700355 // Each flow is served from its own goroutine.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700356 s.active.Add(1)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700357
358 // goroutine to listen for connections
359 go func(ln stream.Listener, ep naming.Endpoint) {
360 s.listenLoop(ln, ep)
361 s.active.Done()
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700362 }(ln, lep)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700363
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700364 if len(proxyAddress) > 0 {
365 pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
366 if err != nil {
367 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
368 return nil, err
369 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700370 ipep, ok := pep.(*inaming.Endpoint)
371 if !ok {
372 return nil, fmt.Errorf("failed translating internal endpoint data types")
373 }
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700374 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700375 s.active.Add(1)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700376 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700377 s.proxyListenLoop(ln, ep, proxy)
378 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700379 }(pln, ipep, listenSpec.Proxy)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700380 s.listeners[pln] = nil
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700381 // TODO(cnicolaou,p): AddServer no longer needs to take the
382 // servesMountTable bool since it can be extracted from the endpoint.
383 s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700384 } else {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700385 s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700386 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700387 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700388 return ep, nil
389}
390
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700391func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700392 var name string
393 if !s.servesMountTable {
394 // Make sure that client MountTable code doesn't try and
395 // ResolveStep past this final address.
396 name = "//"
397 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700398 ep.IsMountTable = servesMountTable
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700399 return naming.JoinAddressName(ep.String(), name)
400}
401
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700402func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700403 const (
404 min = 5 * time.Millisecond
405 max = 5 * time.Minute
406 )
407 for {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700408 s.listenLoop(ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700409 // The listener is done, so:
410 // (1) Unpublish its name
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700411 s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700412 // (2) Reconnect to the proxy unless the server has been stopped
413 backoff := min
414 ln = nil
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700415 // TODO(ashankar,cnicolaou): this code is way too confusing and should
416 // be cleaned up.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700417 for ln == nil {
418 select {
419 case <-time.After(backoff):
Asim Shankardee311d2014-08-01 17:41:31 -0700420 resolved, err := s.resolveToAddress(proxy)
421 if err != nil {
422 vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700423 if backoff = backoff * 2; backoff > max {
424 backoff = max
425 }
Asim Shankardee311d2014-08-01 17:41:31 -0700426 break
427 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700428 var ep naming.Endpoint
Asim Shankardee311d2014-08-01 17:41:31 -0700429 ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700430 if err == nil {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700431 var ok bool
432 iep, ok = ep.(*inaming.Endpoint)
433 if !ok {
434 vlog.Errorf("failed translating internal endpoint data types")
435 ln = nil
436 continue
437 }
438 vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700439 break
440 }
441 if backoff = backoff * 2; backoff > max {
442 backoff = max
443 }
444 vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
445 case <-s.stoppedChan:
446 return
447 }
448 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700449 // TODO(cnicolaou,ashankar): this won't work when we are both
450 // proxying and publishing locally, which is the common case.
451 // listenLoop, dhcpLoop and the original publish are all publishing
452 // addresses to the same name, but the client is not smart enough
453 // to choose sensibly between them.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700454 // (3) reconnected, publish new address
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700455 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700456 s.Lock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700457 s.listeners[ln] = nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700458 s.Unlock()
459 }
460}
461
462func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
463 defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
464 defer func() {
465 s.Lock()
466 delete(s.listeners, ln)
467 s.Unlock()
468 }()
469 for {
470 flow, err := ln.Accept()
471 if err != nil {
472 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
473 return
474 }
475 s.active.Add(1)
476 go func(flow stream.Flow) {
477 if err := newFlowServer(flow, s).serve(); err != nil {
478 // TODO(caprita): Logging errors here is
479 // too spammy. For example, "not
480 // authorized" errors shouldn't be
481 // logged as server errors.
482 vlog.Errorf("Flow serve on %v failed: %v", ln, err)
483 }
484 s.active.Done()
485 }(flow)
486 }
487}
488
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700489func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
490 dhcpl.Lock()
491 defer dhcpl.Unlock()
492 for _, a := range addrs {
493 if ip := netstate.AsIP(a); ip != nil {
494 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700495 fn(s.publishEP(dhcpl.ep, s.servesMountTable))
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700496 }
497 }
498}
499
500func (s *server) dhcpLoop(dhcpl *dhcpListener) {
501 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
502 vlog.VI(2).Infof("ipc: dhcp loop")
503 for setting := range dhcpl.ch {
504 if setting == nil {
505 return
506 }
507 switch v := setting.Value().(type) {
508 case bool:
509 return
510 case []net.Addr:
511 s.Lock()
512 if s.stopped {
513 s.Unlock()
514 return
515 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700516 // TODO(cnicolaou,ashankar): this won't work when we are both
517 // proxying and publishing locally, which is the common case.
518 // listenLoop, dhcpLoop and the original publish are all publishing
519 // addresses to the same name, but the client is not smart enough
520 // to choose sensibly between them.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700521 publisher := s.publisher
522 s.Unlock()
523 switch setting.Name() {
524 case ipc.NewAddrsSetting:
525 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700526 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700527 case ipc.RmAddrsSetting:
528 vlog.Infof("Removed some addresses: %q", v)
529 s.applyChange(dhcpl, v, publisher.RemoveServer)
530 }
531
532 }
533 }
534}
535
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800536func (s *server) Serve(name string, obj interface{}) error {
537 if obj == nil {
538 // nil is an allowed value for obj.
539 return s.ServeDispatcher(name, ipc.Dispatcher(nil))
540 }
541 // TRANSITION: this will be disallowed when the transition is complete.
542 if disp, ok := obj.(ipc.Dispatcher); ok {
543 return s.ServeDispatcher(name, disp)
544 }
545 // TRANSITION: We may fail the dispatcher type test, but still be a
546 // dispatcher becase our Lookup method returns ipc.Invoker and not a
547 // raw object. This code here will detect that case and panic as an aid
548 // to catching these cases early.
549 typ := reflect.TypeOf(obj)
550 if lookup, found := typ.MethodByName("Lookup"); found {
551 if lookup.Type.NumIn() == 3 && lookup.Type.NumOut() == 3 {
552 inv := lookup.Type.Out(0)
553 if inv.Name() == "Invoker" {
554 panic(fmt.Sprintf("%q has a Lookup that returns an Invoker", lookup.Name))
555 }
556 }
557 }
558 // TRANSITION: this will go away in the transition.
559 panic("should never get here")
560}
561
562func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700563 defer vlog.LogCall()()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700564 s.Lock()
565 defer s.Unlock()
566 if s.stopped {
567 return errServerStopped
568 }
569 if s.disp != nil && disp != nil && s.disp != disp {
570 return fmt.Errorf("attempt to change dispatcher")
571 }
572 if disp != nil {
573 s.disp = disp
574 }
575 if len(name) > 0 {
576 s.publisher.AddName(name)
577 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700578 return nil
579}
580
581func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700582 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700583 s.Lock()
584 if s.stopped {
585 s.Unlock()
586 return nil
587 }
588 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700589 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700590 s.Unlock()
591
Robin Thellenddf428232014-10-06 12:50:44 -0700592 // Delete the stats object.
593 s.stats.stop()
594
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700595 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
596 // server lock, since publisher is safe for concurrent access.
597
598 // Stop the publisher, which triggers unmounting of published names.
599 s.publisher.Stop()
600 // Wait for the publisher to be done unmounting before we can proceed to
601 // close the listeners (to minimize the number of mounted names pointing
602 // to endpoint that are no longer serving).
603 //
604 // TODO(caprita): See if make sense to fail fast on rejecting
605 // connections once listeners are closed, and parallelize the publisher
606 // and listener shutdown.
607 s.publisher.WaitForStop()
608
609 s.Lock()
610 // Close all listeners. No new flows will be accepted, while in-flight
611 // flows will continue until they terminate naturally.
612 nListeners := len(s.listeners)
613 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700614
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700615 for ln, dhcpl := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700616 go func(ln stream.Listener) {
617 errCh <- ln.Close()
618 }(ln)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700619 if dhcpl != nil {
620 dhcpl.Lock()
621 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
622 dhcpl.ch <- config.NewBool("EOF", "stop", true)
623 dhcpl.Unlock()
624 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700625 }
626 s.Unlock()
627 var firstErr error
628 for i := 0; i < nListeners; i++ {
629 if err := <-errCh; err != nil && firstErr == nil {
630 firstErr = err
631 }
632 }
633 // At this point, we are guaranteed that no new requests are going to be
634 // accepted.
635
636 // Wait for the publisher and active listener + flows to finish.
637 s.active.Wait()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700638 s.Lock()
639 s.disp = nil
640 s.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700641 return firstErr
642}
643
644// flowServer implements the RPC server-side protocol for a single RPC, over a
645// flow that's already connected to the client.
646type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700647 context.T
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700648 server *server // ipc.Server that this flow server belongs to
649 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
650 dec *vom.Decoder // to decode requests and args from the client
651 enc *vom.Encoder // to encode responses and results to the client
652 flow stream.Flow // underlying flow
653 reservedOpt options.ReservedNameDispatcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700654
Asim Shankar220a0152014-10-30 21:21:09 -0700655 // Fields filled in during the server invocation.
656 blessings security.Blessings
657 method, suffix string
658 label security.Label
659 discharges map[string]security.Discharge
660 deadline time.Time
661 endStreamArgs bool // are the stream args at EOF?
662 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700663}
664
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700665var _ ipc.Stream = (*flowServer)(nil)
666
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700667func newFlowServer(flow stream.Flow, server *server) *flowServer {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700668 server.Lock()
669 disp := server.disp
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700670 runtime := veyron2.RuntimeFromContext(server.ctx)
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700671 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700672
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700673 return &flowServer{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700674 T: InternalNewContext(runtime),
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700675 server: server,
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700676 disp: disp,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700677 // TODO(toddw): Support different codecs
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700678 dec: vom.NewDecoder(flow),
679 enc: vom.NewEncoder(flow),
680 flow: flow,
681 reservedOpt: server.reservedOpt,
682 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700683 }
684}
685
686// Vom does not encode untyped nils.
687// Consequently, the ipc system does not allow nil results with an interface
688// type from server methods. The one exception being errors.
689//
690// For now, the following hacky assumptions are made, which will be revisited when
691// a decision is made on how untyped nils should be encoded/decoded in
692// vom/vom2:
693//
694// - Server methods return 0 or more results
695// - Any values returned by the server that have an interface type are either
696// non-nil or of type error.
697func result2vom(res interface{}) vom.Value {
698 v := vom.ValueOf(res)
699 if !v.IsValid() {
700 // Untyped nils are assumed to be nil-errors.
701 var boxed verror.E
702 return vom.ValueOf(&boxed).Elem()
703 }
704 if err, iserr := res.(error); iserr {
705 // Convert errors to verror since errors are often not
706 // serializable via vom/gob (errors.New and fmt.Errorf return a
707 // type with no exported fields).
708 return vom.ValueOf(verror.Convert(err))
709 }
710 return v
711}
712
713func (fs *flowServer) serve() error {
714 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700715
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700716 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700717
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700718 ivtrace.FromContext(fs).Finish()
719
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700720 var traceResponse vtrace.Response
721 if fs.allowDebug {
722 traceResponse = ivtrace.Response(fs)
723 }
724
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700725 // Respond to the client with the response header and positional results.
726 response := ipc.Response{
727 Error: err,
728 EndStreamResults: true,
729 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700730 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700731 }
732 if err := fs.enc.Encode(response); err != nil {
733 return verror.BadProtocolf("ipc: response encoding failed: %v", err)
734 }
735 if response.Error != nil {
736 return response.Error
737 }
738 for ix, res := range results {
739 if err := fs.enc.EncodeValue(result2vom(res)); err != nil {
740 return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
741 }
742 }
743 // TODO(ashankar): Should unread data from the flow be drained?
744 //
745 // Reason to do so:
746 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
747 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
748 // slices will not be returned to the pool leading to possibly increased memory usage.
749 //
750 // Reason to not do so:
751 // Draining here will conflict with any Reads on the flow in a separate goroutine
752 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
753 //
754 // For now, go with the reason to not do so as having unread data in the stream
755 // should be a rare case.
756 return nil
757}
758
Matt Rosencrantz86897932014-10-02 09:34:34 -0700759func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700760 // Set a default timeout before reading from the flow. Without this timeout,
761 // a client that sends no request or a partial request will retain the flow
762 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700763 initTimer := newTimer(defaultCallTimeout)
764 defer initTimer.Stop()
765 fs.flow.SetDeadline(initTimer.C)
766
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700767 // Decode the initial request.
768 var req ipc.Request
769 if err := fs.dec.Decode(&req); err != nil {
770 return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
771 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700772 return &req, nil
773}
774
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800775func lookupInvoker(d ipc.Dispatcher, name, method string) (ipc.Invoker, security.Authorizer, error) {
776 obj, auth, err := d.Lookup(name, method)
777 switch {
778 case err != nil:
779 return nil, nil, err
780 case obj == nil:
781 return nil, auth, nil
782 }
783 if invoker, ok := obj.(ipc.Invoker); ok {
784 return invoker, auth, nil
785 }
786 return ipc.ReflectInvoker(obj), auth, nil
787}
788
Matt Rosencrantz86897932014-10-02 09:34:34 -0700789func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
790 start := time.Now()
791
792 req, verr := fs.readIPCRequest()
793 if verr != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700794 // We don't know what the ipc call was supposed to be, but we'll create
795 // a placeholder span so we can capture annotations.
796 fs.T, _ = ivtrace.WithNewSpan(fs, "Failed IPC Call")
Matt Rosencrantz86897932014-10-02 09:34:34 -0700797 return nil, verr
798 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700799 fs.method = req.Method
Matt Rosencrantz86897932014-10-02 09:34:34 -0700800
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700801 // TODO(mattr): Currently this allows users to trigger trace collection
802 // on the server even if they will not be allowed to collect the
803 // results later. This might be consider a DOS vector.
804 spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
805 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700806
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700807 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700808 if req.Timeout != ipc.NoTimeout {
809 fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700810 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700811 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700812 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700813 fs.flow.SetDeadline(fs.Done())
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700814
Matt Rosencrantz86897932014-10-02 09:34:34 -0700815 // Ensure that the context gets cancelled if the flow is closed
816 // due to a network error, or client cancellation.
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700817 go func() {
Matt Rosencrantzbae08212014-10-03 08:04:17 -0700818 select {
819 case <-fs.flow.Closed():
820 // Here we remove the contexts channel as a deadline to the flow.
821 // We do this to ensure clients get a consistent error when they read/write
822 // after the flow is closed. Since the flow is already closed, it doesn't
823 // matter that the context is also cancelled.
824 fs.flow.SetDeadline(nil)
825 cancel()
826 case <-fs.Done():
827 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700828 }()
829
Asim Shankarb54d7642014-06-05 13:08:04 -0700830 // If additional credentials are provided, make them available in the context
Asim Shankar8f05c222014-10-06 22:08:19 -0700831 var err error
832 if fs.blessings, err = security.NewBlessings(req.GrantedBlessings); err != nil {
833 return nil, verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
834 }
835 // Detect unusable blessings now, rather then discovering they are unusable on first use.
836 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides the
837 // server's identity as the blessing. Figure out what we want to do about this -
838 // should servers be able to assume that a blessing is something that does not
839 // have the authorizations that the server's own identity has?
840 if fs.blessings != nil && !reflect.DeepEqual(fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
841 return nil, verror.BadProtocolf("ipc: blessing granted not bound to this server(%v vs %v)", fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Asim Shankarb54d7642014-06-05 13:08:04 -0700842 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700843 // Receive third party caveat discharges the client sent
844 for i := uint64(0); i < req.NumDischarges; i++ {
Ankurf044a8d2014-09-05 17:05:24 -0700845 var d security.Discharge
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700846 if err := fs.dec.Decode(&d); err != nil {
847 return nil, verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
848 }
Ankurf044a8d2014-09-05 17:05:24 -0700849 fs.discharges[d.ID()] = d
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700850 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700851 // Lookup the invoker.
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800852 invoker, auth, suffix, verr := fs.lookup(req.Suffix, req.Method)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700853 fs.suffix = suffix // with leading /'s stripped
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700854 if verr != nil {
855 return nil, verr
856 }
857 // Prepare invoker and decode args.
858 numArgs := int(req.NumPosArgs)
859 argptrs, label, err := invoker.Prepare(req.Method, numArgs)
860 fs.label = label
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700861 if err != nil {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700862 return nil, verror.Makef(verror.ErrorID(err), "%s: name: %q", err, req.Suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700863 }
864 if len(argptrs) != numArgs {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700865 return nil, verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", req.Method, req.Suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700866 }
867 for ix, argptr := range argptrs {
868 if err := fs.dec.Decode(argptr); err != nil {
869 return nil, verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
870 }
871 }
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700872 fs.allowDebug = fs.LocalPrincipal() == nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700873 // Check application's authorization policy and invoke the method.
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700874 // LocalPrincipal is nil means that the server wanted to avoid authentication,
875 // and thus wanted to skip authorization as well.
Asim Shankar220a0152014-10-30 21:21:09 -0700876 if fs.LocalPrincipal() != nil {
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700877 // Check if the caller is permitted to view debug information.
878 if err := fs.authorize(auth); err != nil {
879 return nil, err
880 }
881 fs.allowDebug = fs.authorizeForDebug(auth) == nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700882 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700883
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700884 results, err := invoker.Invoke(req.Method, fs, argptrs)
Robin Thellend8eb77522014-08-28 14:12:01 -0700885 fs.server.stats.record(req.Method, time.Since(start))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700886 return results, verror.Convert(err)
887}
888
889// lookup returns the invoker and authorizer responsible for serving the given
Robin Thellendd24f0842014-09-23 10:27:29 -0700890// name and method. The name is stripped of any leading slashes. If it begins
891// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
892// invoker. Otherwise, and we use the server's dispatcher. The (stripped) name
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700893// and dispatch suffix are also returned.
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800894// TODO(cnicolaou): change this back returning in ipc.Invoker in the pt2 CL.
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800895func (fs *flowServer) lookup(name, method string) (ipc.Invoker, security.Authorizer, string, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700896 name = strings.TrimLeft(name, "/")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700897 if method == "Glob" && len(name) == 0 {
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700898 return ipc.ReflectInvoker(&globInvoker{fs.reservedOpt.Prefix, fs}), &acceptAllAuthorizer{}, name, nil
Robin Thellendc26c32e2014-10-06 17:44:04 -0700899 }
Robin Thellendd24f0842014-09-23 10:27:29 -0700900 disp := fs.disp
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700901 prefix := fs.reservedOpt.Prefix
902 if len(prefix) > 0 && (name == prefix || strings.HasPrefix(name, prefix+"/")) {
903 name = strings.TrimPrefix(name, prefix)
Robin Thellendd24f0842014-09-23 10:27:29 -0700904 name = strings.TrimLeft(name, "/")
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700905 disp = fs.reservedOpt.Dispatcher
Robin Thellendd24f0842014-09-23 10:27:29 -0700906 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700907
Robin Thellendd24f0842014-09-23 10:27:29 -0700908 if disp != nil {
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800909 invoker, auth, err := lookupInvoker(disp, name, method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700910 switch {
911 case err != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700912 return nil, nil, "", verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700913 case invoker != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700914 return invoker, auth, name, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700915 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700916 }
Robin Thellendc26c32e2014-10-06 17:44:04 -0700917 return nil, nil, "", verror.NoExistf("ipc: invoker not found for %q", name)
918}
919
920type acceptAllAuthorizer struct{}
921
922func (acceptAllAuthorizer) Authorize(security.Context) error {
923 return nil
924}
925
926type globInvoker struct {
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700927 prefix string
928 fs *flowServer
Robin Thellendc26c32e2014-10-06 17:44:04 -0700929}
930
931// Glob matches the pattern against internal object names if the double-
932// underscore prefix is explicitly part of the pattern. Otherwise, it invokes
933// the service's Glob method.
934func (i *globInvoker) Glob(call ipc.ServerCall, pattern string) error {
935 g, err := glob.Parse(pattern)
936 if err != nil {
937 return err
938 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700939 if strings.HasPrefix(pattern, naming.ReservedNamePrefix) {
Robin Thellendc26c32e2014-10-06 17:44:04 -0700940 var err error
941 // Match against internal object names.
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700942 if ok, _, left := g.MatchInitialSegment(i.prefix); ok {
943 if ierr := i.invokeGlob(call, i.fs.reservedOpt.Dispatcher, i.prefix, left.String()); ierr != nil {
944 err = ierr
Robin Thellendc26c32e2014-10-06 17:44:04 -0700945 }
946 }
947 return err
948 }
949 // Invoke the service's method.
950 return i.invokeGlob(call, i.fs.disp, "", pattern)
951}
952
953func (i *globInvoker) invokeGlob(call ipc.ServerCall, d ipc.Dispatcher, prefix, pattern string) error {
954 if d == nil {
955 return nil
956 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800957 obj, auth, err := d.Lookup("", "Glob")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700958 if err != nil {
959 return err
960 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800961 // TODO(cnicolaou): ipc.Serve TRANSITION
962 invoker, ok := obj.(ipc.Invoker)
963 if !ok {
964 panic("Lookup should have returned an ipc.Invoker")
965 }
966 if obj == nil || !ok {
Robin Thellendc26c32e2014-10-06 17:44:04 -0700967 return verror.NoExistf("ipc: invoker not found for Glob")
968 }
969
970 argptrs, label, err := invoker.Prepare("Glob", 1)
971 i.fs.label = label
972 if err != nil {
973 return verror.Makef(verror.ErrorID(err), "%s", err)
974 }
975 if err := i.fs.authorize(auth); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -0700976 return err
Robin Thellendc26c32e2014-10-06 17:44:04 -0700977 }
978 leafCall := &localServerCall{call, prefix}
979 argptrs[0] = &pattern
980 results, err := invoker.Invoke("Glob", leafCall, argptrs)
981 if err != nil {
982 return err
983 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700984
Robin Thellendc26c32e2014-10-06 17:44:04 -0700985 if len(results) != 1 {
986 return verror.BadArgf("unexpected number of results. Got %d, want 1", len(results))
987 }
988 res := results[0]
989 if res == nil {
990 return nil
991 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800992 err, ok = res.(error)
Robin Thellendc26c32e2014-10-06 17:44:04 -0700993 if !ok {
994 return verror.BadArgf("unexpected result type. Got %T, want error", res)
995 }
996 return err
997}
998
999// An ipc.ServerCall that prepends a prefix to all the names in the streamed
1000// MountEntry objects.
1001type localServerCall struct {
1002 ipc.ServerCall
1003 prefix string
1004}
1005
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -07001006var _ ipc.ServerCall = (*localServerCall)(nil)
1007var _ ipc.Stream = (*localServerCall)(nil)
1008var _ ipc.ServerContext = (*localServerCall)(nil)
1009
Robin Thellendc26c32e2014-10-06 17:44:04 -07001010func (c *localServerCall) Send(v interface{}) error {
1011 me, ok := v.(mttypes.MountEntry)
1012 if !ok {
1013 return verror.BadArgf("unexpected stream type. Got %T, want MountEntry", v)
1014 }
1015 me.Name = naming.Join(c.prefix, me.Name)
1016 return c.ServerCall.Send(me)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001017}
1018
Asim Shankara5457f02014-10-24 23:23:07 -07001019func (fs *flowServer) authorize(auth security.Authorizer) verror.E {
Asim Shankar8f05c222014-10-06 22:08:19 -07001020 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001021 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001022 }
Asim Shankara5457f02014-10-24 23:23:07 -07001023 if err := auth.Authorize(fs); err != nil {
1024 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001025 return verror.NoAccessf("ipc: not authorized to call %q.%q (%v)", fs.Name(), fs.Method(), err)
Asim Shankara5457f02014-10-24 23:23:07 -07001026 }
1027 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001028}
1029
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001030// debugContext is a context which wraps another context but always returns
1031// the debug label.
1032type debugContext struct {
1033 security.Context
1034}
1035
1036func (debugContext) Label() security.Label { return security.DebugLabel }
1037
1038// TODO(mattr): Is DebugLabel the right thing to check?
1039func (fs *flowServer) authorizeForDebug(auth security.Authorizer) error {
1040 dc := debugContext{fs}
Asim Shankar8f05c222014-10-06 22:08:19 -07001041 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001042 auth = defaultAuthorizer{}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001043 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001044 return auth.Authorize(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001045}
1046
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001047// Send implements the ipc.Stream method.
1048func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001049 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001050 // The empty response header indicates what follows is a streaming result.
1051 if err := fs.enc.Encode(ipc.Response{}); err != nil {
1052 return err
1053 }
1054 return fs.enc.Encode(item)
1055}
1056
1057// Recv implements the ipc.Stream method.
1058func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001059 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001060 var req ipc.Request
1061 if err := fs.dec.Decode(&req); err != nil {
1062 return err
1063 }
1064 if req.EndStreamArgs {
1065 fs.endStreamArgs = true
1066 return io.EOF
1067 }
1068 return fs.dec.Decode(itemptr)
1069}
1070
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001071// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001072
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001073func (fs *flowServer) Discharges() map[string]security.Discharge {
1074 //nologcall
1075 return fs.discharges
1076}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001077
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001078func (fs *flowServer) Server() ipc.Server {
1079 //nologcall
1080 return fs.server
1081}
1082func (fs *flowServer) Method() string {
1083 //nologcall
1084 return fs.method
1085}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001086
1087// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1088// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001089func (fs *flowServer) Name() string {
1090 //nologcall
1091 return fs.suffix
1092}
1093func (fs *flowServer) Suffix() string {
1094 //nologcall
1095 return fs.suffix
1096}
1097func (fs *flowServer) Label() security.Label {
1098 //nologcall
1099 return fs.label
1100}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001101func (fs *flowServer) LocalPrincipal() security.Principal {
1102 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001103 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001104}
1105func (fs *flowServer) LocalBlessings() security.Blessings {
1106 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001107 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001108}
1109func (fs *flowServer) RemoteBlessings() security.Blessings {
1110 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001111 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001112}
Asim Shankar8f05c222014-10-06 22:08:19 -07001113func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001114 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001115 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001116}
1117func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1118 //nologcall
1119 return fs.flow.LocalEndpoint()
1120}
1121func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1122 //nologcall
1123 return fs.flow.RemoteEndpoint()
1124}