blob: ae9a0b5cfcfa24e67b124f6052cbc347ba185e0f [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron2"
13 "veyron.io/veyron/veyron2/config"
14 "veyron.io/veyron/veyron2/context"
15 "veyron.io/veyron/veyron2/ipc"
16 "veyron.io/veyron/veyron2/ipc/stream"
17 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070018 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070019 "veyron.io/veyron/veyron2/security"
Robin Thellendc26c32e2014-10-06 17:44:04 -070020 mttypes "veyron.io/veyron/veyron2/services/mounttable/types"
Jiri Simsa519c5072014-09-17 21:37:57 -070021 "veyron.io/veyron/veyron2/verror"
22 "veyron.io/veyron/veyron2/vlog"
23 "veyron.io/veyron/veyron2/vom"
24 "veyron.io/veyron/veyron2/vtrace"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070025
26 "veyron.io/veyron/veyron/lib/glob"
27 "veyron.io/veyron/veyron/lib/netstate"
28 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
29 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070030 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070031)
32
33var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070034 errServerStopped = verror.Abortedf("ipc: server is stopped")
35)
36
Jiri Simsa5293dcb2014-05-10 09:56:38 -070037type server struct {
38 sync.Mutex
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070039 ctx context.T // context used by the server to make internal RPCs.
40 streamMgr stream.Manager // stream manager to listen for new flows.
41 publisher publisher.Publisher // publisher to publish mounttable mounts.
42 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
43 listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
44 disp ipc.Dispatcher // dispatcher to serve RPCs
45 active sync.WaitGroup // active goroutines we've spawned.
46 stopped bool // whether the server has been stopped.
47 stoppedChan chan struct{} // closed when the server has been stopped.
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070048 ns naming.Namespace
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070049 servesMountTable bool
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080050 // TODO(cnicolaou): remove this when the publisher tracks published names
51 // and can return an appropriate error for RemoveName on a name that
52 // wasn't 'Added' for this server.
53 names map[string]struct{}
54 reservedOpt options.ReservedNameDispatcher
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070055 // TODO(cnicolaou): add roaming stats to ipcStats
56 stats *ipcStats // stats for this server.
57}
58
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070059var _ ipc.Server = (*server)(nil)
60
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070061type dhcpListener struct {
62 sync.Mutex
63 publisher *config.Publisher // publisher used to fork the stream
64 name string // name of the publisher stream
65 ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
66 port string
67 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070068}
69
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070070func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070071 s := &server{
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070072 ctx: ctx,
73 streamMgr: streamMgr,
74 publisher: publisher.New(ctx, ns, publishPeriod),
75 listeners: make(map[stream.Listener]*dhcpListener),
76 stoppedChan: make(chan struct{}),
77 ns: ns,
78 stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070079 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070080 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -070081 switch opt := opt.(type) {
82 case stream.ListenerOpt:
83 // Collect all ServerOpts that are also ListenerOpts.
84 s.listenerOpts = append(s.listenerOpts, opt)
Asim Shankarcc044212014-10-15 23:25:26 -070085 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070086 s.servesMountTable = bool(opt)
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -070087 case options.ReservedNameDispatcher:
88 s.reservedOpt = opt
Jiri Simsa5293dcb2014-05-10 09:56:38 -070089 }
90 }
91 return s, nil
92}
93
Jiri Simsa5293dcb2014-05-10 09:56:38 -070094func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -070095 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070096 s.Lock()
97 defer s.Unlock()
98 if s.stopped {
99 return nil, errServerStopped
100 }
101 return s.publisher.Published(), nil
102}
103
104// resolveToAddress will try to resolve the input to an address using the
105// mount table, if the input is not already an address.
Asim Shankardee311d2014-08-01 17:41:31 -0700106func (s *server) resolveToAddress(address string) (string, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700107 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700108 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700109 }
Asim Shankardee311d2014-08-01 17:41:31 -0700110 var names []string
111 if s.ns != nil {
112 var err error
113 if names, err = s.ns.Resolve(s.ctx, address); err != nil {
114 return "", err
115 }
116 } else {
117 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700118 }
119 for _, n := range names {
120 address, suffix := naming.SplitAddressName(n)
Asim Shankardee311d2014-08-01 17:41:31 -0700121 if suffix != "" && suffix != "//" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700122 continue
123 }
124 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700125 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700126 }
127 }
Asim Shankardee311d2014-08-01 17:41:31 -0700128 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700129}
130
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700131/*
Cosmos Nicolaouf889c732014-10-16 20:46:54 -0700132// ipAddressChooser returns the preferred IP address, which is,
133// a public IPv4 address, then any non-loopback IPv4, then a public
134// IPv6 address and finally any non-loopback/link-local IPv6
135// It is replicated here to avoid a circular dependency and will, in any case,
136// go away when we transition away from Listen to the ListenX API.
137func ipAddressChooser(network string, addrs []ipc.Address) ([]ipc.Address, error) {
138 if !netstate.IsIPProtocol(network) {
139 return nil, fmt.Errorf("can't support network protocol %q", network)
140 }
141 accessible := netstate.AddrList(addrs)
142
143 // Try and find an address on a interface with a default route.
144 predicates := []netstate.AddressPredicate{netstate.IsPublicUnicastIPv4,
145 netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6}
146 for _, predicate := range predicates {
147 if addrs := accessible.Filter(predicate); len(addrs) > 0 {
148 onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
149 if len(onDefaultRoutes) > 0 {
150 return onDefaultRoutes, nil
151 }
152 }
153 }
154
155 // We failed to find any addresses with default routes, try again
156 // but without the default route requirement.
157 for _, predicate := range predicates {
158 if addrs := accessible.Filter(predicate); len(addrs) > 0 {
159 return addrs, nil
160 }
161 }
162
163 return nil, fmt.Errorf("failed to find any usable address for %q", network)
164}
165
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700166func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700167 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700168 s.Lock()
169 // Shortcut if the server is stopped, to avoid needlessly creating a
170 // listener.
171 if s.stopped {
172 s.Unlock()
173 return nil, errServerStopped
174 }
175 s.Unlock()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700176 var proxyName string
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700177 if protocol == inaming.Network {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700178 proxyName = address
Asim Shankardee311d2014-08-01 17:41:31 -0700179 var err error
180 if address, err = s.resolveToAddress(address); err != nil {
181 return nil, err
182 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700183 }
Asim Shankarcc044212014-10-15 23:25:26 -0700184 // TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700185 // it can more cleanly set the IsMountTable bit in the endpoint.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700186 ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
187 if err != nil {
188 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
189 return nil, err
190 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700191 iep, ok := ep.(*inaming.Endpoint)
192 if !ok {
193 return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
194 }
195
Robin Thellend7f424272014-09-04 10:42:14 -0700196 if protocol != inaming.Network {
197 // We know the endpoint format, so we crack it open...
198 switch iep.Protocol {
199 case "tcp", "tcp4", "tcp6":
200 host, port, err := net.SplitHostPort(iep.Address)
201 if err != nil {
202 return nil, err
203 }
204 ip := net.ParseIP(host)
205 if ip == nil {
206 return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
207 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700208 if ip.IsUnspecified() {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700209 addrs, err := netstate.GetAccessibleIPs()
210 if err == nil {
Cosmos Nicolaouf889c732014-10-16 20:46:54 -0700211 if a, err := ipAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700212 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
Cosmos Nicolaou9a246552014-08-29 13:07:29 -0700213 }
Cosmos Nicolaouf7a11d92014-08-29 09:56:07 -0700214 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700215 }
216 }
217 }
218
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700219 s.Lock()
220 if s.stopped {
221 s.Unlock()
222 // Ignore error return since we can't really do much about it.
223 ln.Close()
224 return nil, errServerStopped
225 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700226 s.listeners[ln] = nil
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700227 // We have a single goroutine per listener to accept new flows.
228 // Each flow is served from its own goroutine.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700229 s.active.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700230 if protocol == inaming.Network {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700231 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700232 s.proxyListenLoop(ln, ep, proxy)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700233 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700234 }(ln, iep, proxyName)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700235 } else {
236 go func(ln stream.Listener, ep naming.Endpoint) {
237 s.listenLoop(ln, ep)
238 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700239 }(ln, iep)
Bogdan Caprita187269b2014-05-13 19:59:46 -0700240 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700241 s.Unlock()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700242 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700243 return ep, nil
244}
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700245*/
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700246
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700247// externalEndpoint examines the endpoint returned by the stream listen call
248// and fills in the address to publish to the mount table. It also returns the
249// IP host address that it selected for publishing to the mount table.
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700250func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700251 // We know the endpoint format, so we crack it open...
252 iep, ok := lep.(*inaming.Endpoint)
253 if !ok {
254 return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
255 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700256 switch iep.Protocol {
257 case "tcp", "tcp4", "tcp6":
258 host, port, err := net.SplitHostPort(iep.Address)
259 if err != nil {
260 return nil, nil, err
261 }
262 ip := net.ParseIP(host)
263 if ip == nil {
264 return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
265 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700266 if ip.IsUnspecified() && chooser != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700267 // Need to find a usable IP address since the call to listen
268 // didn't specify one.
269 addrs, err := netstate.GetAccessibleIPs()
270 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700271 // TODO(cnicolaou): we could return multiple addresses here,
272 // all of which can be exported to the mount table. Look at
273 // this after we transition fully to ListenX.
274 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
275 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
276 return iep, a[0].Address().(*net.IPAddr), nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700277 }
278 }
279 } else {
280 // Listen used a fixed IP address, which essentially disables
281 // roaming.
282 return iep, nil, nil
283 }
284 }
285 return iep, nil, nil
286}
287
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700288func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700289 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700290 s.Lock()
291 // Shortcut if the server is stopped, to avoid needlessly creating a
292 // listener.
293 if s.stopped {
294 s.Unlock()
295 return nil, errServerStopped
296 }
297 s.Unlock()
298
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700299 protocol := listenSpec.Protocol
300 address := listenSpec.Address
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700301 proxyAddress := ""
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700302 if len(listenSpec.Proxy) > 0 {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700303 if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
304 return nil, err
305 } else {
306 proxyAddress = address
307 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700308 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700309
310 ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
311 if err != nil {
312 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
313 return nil, err
314 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700315 ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700316 if err != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700317 ln.Close()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700318 return nil, err
319 }
320
321 s.Lock()
322 if s.stopped {
323 s.Unlock()
324 // Ignore error return since we can't really do much about it.
325 ln.Close()
326 return nil, errServerStopped
327 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700328
Srdjan Petrovic69381a92014-10-30 13:20:55 -0700329 var ip net.IP
330 if ipaddr != nil {
331 ip = net.ParseIP(ipaddr.String())
332 } else {
333 vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
334 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700335 publisher := listenSpec.StreamPublisher
Srdjan Petrovic69381a92014-10-30 13:20:55 -0700336 if ip != nil && !ip.IsLoopback() && publisher != nil {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700337 streamName := listenSpec.StreamName
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700338 ch := make(chan config.Setting)
339 _, err := publisher.ForkStream(streamName, ch)
340 if err != nil {
341 return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
342 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700343 _, port, _ := net.SplitHostPort(ep.Address)
344 dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
345
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700346 // We have a goroutine to listen for dhcp changes.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700347 s.active.Add(1)
348 // goroutine to listen for address changes.
349 go func(dl *dhcpListener) {
350 s.dhcpLoop(dl)
351 s.active.Done()
352 }(dhcpl)
353 s.listeners[ln] = dhcpl
354 } else {
355 s.listeners[ln] = nil
356 }
357
358 // We have a goroutine per listener to accept new flows.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700359 // Each flow is served from its own goroutine.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700360 s.active.Add(1)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700361
362 // goroutine to listen for connections
363 go func(ln stream.Listener, ep naming.Endpoint) {
364 s.listenLoop(ln, ep)
365 s.active.Done()
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700366 }(ln, lep)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700367
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700368 if len(proxyAddress) > 0 {
369 pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
370 if err != nil {
371 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
372 return nil, err
373 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700374 ipep, ok := pep.(*inaming.Endpoint)
375 if !ok {
376 return nil, fmt.Errorf("failed translating internal endpoint data types")
377 }
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700378 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700379 s.active.Add(1)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700380 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700381 s.proxyListenLoop(ln, ep, proxy)
382 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700383 }(pln, ipep, listenSpec.Proxy)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700384 s.listeners[pln] = nil
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700385 // TODO(cnicolaou,p): AddServer no longer needs to take the
386 // servesMountTable bool since it can be extracted from the endpoint.
387 s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700388 } else {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700389 s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700390 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700391 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700392 return ep, nil
393}
394
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700395func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700396 var name string
397 if !s.servesMountTable {
398 // Make sure that client MountTable code doesn't try and
399 // ResolveStep past this final address.
400 name = "//"
401 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700402 ep.IsMountTable = servesMountTable
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700403 return naming.JoinAddressName(ep.String(), name)
404}
405
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700406func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700407 const (
408 min = 5 * time.Millisecond
409 max = 5 * time.Minute
410 )
411 for {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700412 s.listenLoop(ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700413 // The listener is done, so:
414 // (1) Unpublish its name
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700415 s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700416 // (2) Reconnect to the proxy unless the server has been stopped
417 backoff := min
418 ln = nil
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700419 // TODO(ashankar,cnicolaou): this code is way too confusing and should
420 // be cleaned up.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700421 for ln == nil {
422 select {
423 case <-time.After(backoff):
Asim Shankardee311d2014-08-01 17:41:31 -0700424 resolved, err := s.resolveToAddress(proxy)
425 if err != nil {
426 vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700427 if backoff = backoff * 2; backoff > max {
428 backoff = max
429 }
Asim Shankardee311d2014-08-01 17:41:31 -0700430 break
431 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700432 var ep naming.Endpoint
Asim Shankardee311d2014-08-01 17:41:31 -0700433 ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700434 if err == nil {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700435 var ok bool
436 iep, ok = ep.(*inaming.Endpoint)
437 if !ok {
438 vlog.Errorf("failed translating internal endpoint data types")
439 ln = nil
440 continue
441 }
442 vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700443 break
444 }
445 if backoff = backoff * 2; backoff > max {
446 backoff = max
447 }
448 vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
449 case <-s.stoppedChan:
450 return
451 }
452 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700453 // TODO(cnicolaou,ashankar): this won't work when we are both
454 // proxying and publishing locally, which is the common case.
455 // listenLoop, dhcpLoop and the original publish are all publishing
456 // addresses to the same name, but the client is not smart enough
457 // to choose sensibly between them.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700458 // (3) reconnected, publish new address
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700459 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700460 s.Lock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700461 s.listeners[ln] = nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700462 s.Unlock()
463 }
464}
465
466func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
467 defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
468 defer func() {
469 s.Lock()
470 delete(s.listeners, ln)
471 s.Unlock()
472 }()
473 for {
474 flow, err := ln.Accept()
475 if err != nil {
476 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
477 return
478 }
479 s.active.Add(1)
480 go func(flow stream.Flow) {
481 if err := newFlowServer(flow, s).serve(); err != nil {
482 // TODO(caprita): Logging errors here is
483 // too spammy. For example, "not
484 // authorized" errors shouldn't be
485 // logged as server errors.
486 vlog.Errorf("Flow serve on %v failed: %v", ln, err)
487 }
488 s.active.Done()
489 }(flow)
490 }
491}
492
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700493func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
494 dhcpl.Lock()
495 defer dhcpl.Unlock()
496 for _, a := range addrs {
497 if ip := netstate.AsIP(a); ip != nil {
498 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700499 fn(s.publishEP(dhcpl.ep, s.servesMountTable))
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700500 }
501 }
502}
503
504func (s *server) dhcpLoop(dhcpl *dhcpListener) {
505 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
506 vlog.VI(2).Infof("ipc: dhcp loop")
507 for setting := range dhcpl.ch {
508 if setting == nil {
509 return
510 }
511 switch v := setting.Value().(type) {
512 case bool:
513 return
514 case []net.Addr:
515 s.Lock()
516 if s.stopped {
517 s.Unlock()
518 return
519 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700520 // TODO(cnicolaou,ashankar): this won't work when we are both
521 // proxying and publishing locally, which is the common case.
522 // listenLoop, dhcpLoop and the original publish are all publishing
523 // addresses to the same name, but the client is not smart enough
524 // to choose sensibly between them.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700525 publisher := s.publisher
526 s.Unlock()
527 switch setting.Name() {
528 case ipc.NewAddrsSetting:
529 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700530 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700531 case ipc.RmAddrsSetting:
532 vlog.Infof("Removed some addresses: %q", v)
533 s.applyChange(dhcpl, v, publisher.RemoveServer)
534 }
535
536 }
537 }
538}
539
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800540func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800541 if obj == nil {
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800542 // The ReflectInvoker inside the LeafDispatcher will panic
543 // if called for a nil value.
544 return fmt.Errorf("A nil object is not allowed")
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800545 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800546 return s.ServeDispatcher(name, ipc.LeafDispatcher(obj, authorizer))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800547}
548
549func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700550 s.Lock()
551 defer s.Unlock()
552 if s.stopped {
553 return errServerStopped
554 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800555 if disp == nil {
556 return fmt.Errorf("A nil dispacther is not allowed")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700557 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800558 if s.disp != nil {
559 return fmt.Errorf("Serve or ServeDispatcher has already been called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700560 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800561 s.disp = disp
562 s.names = make(map[string]struct{})
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700563 if len(name) > 0 {
564 s.publisher.AddName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800565 s.names[name] = struct{}{}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700566 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700567 return nil
568}
569
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800570func (s *server) AddName(name string) error {
571 s.Lock()
572 defer s.Unlock()
573 if s.stopped {
574 return errServerStopped
575 }
576 if len(name) == 0 {
577 return fmt.Errorf("empty name")
578 }
579 s.publisher.AddName(name)
580 // TODO(cnicolaou): remove this map when the publisher's RemoveName
581 // method returns an error.
582 s.names[name] = struct{}{}
583 return nil
584}
585
586func (s *server) RemoveName(name string) error {
587 s.Lock()
588 defer s.Unlock()
589 if s.stopped {
590 return errServerStopped
591 }
592 if _, present := s.names[name]; !present {
593 return fmt.Errorf("%q has not been previously used for this server", name)
594 }
595 s.publisher.RemoveName(name)
596 delete(s.names, name)
597 return nil
598}
599
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700600func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700601 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700602 s.Lock()
603 if s.stopped {
604 s.Unlock()
605 return nil
606 }
607 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700608 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700609 s.Unlock()
610
Robin Thellenddf428232014-10-06 12:50:44 -0700611 // Delete the stats object.
612 s.stats.stop()
613
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700614 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
615 // server lock, since publisher is safe for concurrent access.
616
617 // Stop the publisher, which triggers unmounting of published names.
618 s.publisher.Stop()
619 // Wait for the publisher to be done unmounting before we can proceed to
620 // close the listeners (to minimize the number of mounted names pointing
621 // to endpoint that are no longer serving).
622 //
623 // TODO(caprita): See if make sense to fail fast on rejecting
624 // connections once listeners are closed, and parallelize the publisher
625 // and listener shutdown.
626 s.publisher.WaitForStop()
627
628 s.Lock()
629 // Close all listeners. No new flows will be accepted, while in-flight
630 // flows will continue until they terminate naturally.
631 nListeners := len(s.listeners)
632 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700633
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700634 for ln, dhcpl := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700635 go func(ln stream.Listener) {
636 errCh <- ln.Close()
637 }(ln)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700638 if dhcpl != nil {
639 dhcpl.Lock()
640 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
641 dhcpl.ch <- config.NewBool("EOF", "stop", true)
642 dhcpl.Unlock()
643 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700644 }
645 s.Unlock()
646 var firstErr error
647 for i := 0; i < nListeners; i++ {
648 if err := <-errCh; err != nil && firstErr == nil {
649 firstErr = err
650 }
651 }
652 // At this point, we are guaranteed that no new requests are going to be
653 // accepted.
654
655 // Wait for the publisher and active listener + flows to finish.
656 s.active.Wait()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700657 s.Lock()
658 s.disp = nil
659 s.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700660 return firstErr
661}
662
663// flowServer implements the RPC server-side protocol for a single RPC, over a
664// flow that's already connected to the client.
665type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700666 context.T
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700667 server *server // ipc.Server that this flow server belongs to
668 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
669 dec *vom.Decoder // to decode requests and args from the client
670 enc *vom.Encoder // to encode responses and results to the client
671 flow stream.Flow // underlying flow
672 reservedOpt options.ReservedNameDispatcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700673
Asim Shankar220a0152014-10-30 21:21:09 -0700674 // Fields filled in during the server invocation.
675 blessings security.Blessings
676 method, suffix string
Asim Shankar0cad0832014-11-04 01:27:38 -0800677 tags []interface{}
Asim Shankar220a0152014-10-30 21:21:09 -0700678 discharges map[string]security.Discharge
Asim Shankar0cad0832014-11-04 01:27:38 -0800679 starttime time.Time
Asim Shankar220a0152014-10-30 21:21:09 -0700680 endStreamArgs bool // are the stream args at EOF?
681 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700682}
683
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700684var _ ipc.Stream = (*flowServer)(nil)
685
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700686func newFlowServer(flow stream.Flow, server *server) *flowServer {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700687 server.Lock()
688 disp := server.disp
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700689 runtime := veyron2.RuntimeFromContext(server.ctx)
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700690 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700691
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700692 return &flowServer{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700693 T: InternalNewContext(runtime),
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700694 server: server,
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700695 disp: disp,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700696 // TODO(toddw): Support different codecs
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700697 dec: vom.NewDecoder(flow),
698 enc: vom.NewEncoder(flow),
699 flow: flow,
700 reservedOpt: server.reservedOpt,
701 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700702 }
703}
704
705// Vom does not encode untyped nils.
706// Consequently, the ipc system does not allow nil results with an interface
707// type from server methods. The one exception being errors.
708//
709// For now, the following hacky assumptions are made, which will be revisited when
710// a decision is made on how untyped nils should be encoded/decoded in
711// vom/vom2:
712//
713// - Server methods return 0 or more results
714// - Any values returned by the server that have an interface type are either
715// non-nil or of type error.
716func result2vom(res interface{}) vom.Value {
717 v := vom.ValueOf(res)
718 if !v.IsValid() {
719 // Untyped nils are assumed to be nil-errors.
720 var boxed verror.E
721 return vom.ValueOf(&boxed).Elem()
722 }
723 if err, iserr := res.(error); iserr {
724 // Convert errors to verror since errors are often not
725 // serializable via vom/gob (errors.New and fmt.Errorf return a
726 // type with no exported fields).
727 return vom.ValueOf(verror.Convert(err))
728 }
729 return v
730}
731
732func (fs *flowServer) serve() error {
733 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700734
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700735 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700736
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700737 ivtrace.FromContext(fs).Finish()
738
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700739 var traceResponse vtrace.Response
740 if fs.allowDebug {
741 traceResponse = ivtrace.Response(fs)
742 }
743
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700744 // Respond to the client with the response header and positional results.
745 response := ipc.Response{
746 Error: err,
747 EndStreamResults: true,
748 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700749 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700750 }
751 if err := fs.enc.Encode(response); err != nil {
752 return verror.BadProtocolf("ipc: response encoding failed: %v", err)
753 }
754 if response.Error != nil {
755 return response.Error
756 }
757 for ix, res := range results {
758 if err := fs.enc.EncodeValue(result2vom(res)); err != nil {
759 return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
760 }
761 }
762 // TODO(ashankar): Should unread data from the flow be drained?
763 //
764 // Reason to do so:
765 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
766 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
767 // slices will not be returned to the pool leading to possibly increased memory usage.
768 //
769 // Reason to not do so:
770 // Draining here will conflict with any Reads on the flow in a separate goroutine
771 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
772 //
773 // For now, go with the reason to not do so as having unread data in the stream
774 // should be a rare case.
775 return nil
776}
777
Matt Rosencrantz86897932014-10-02 09:34:34 -0700778func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700779 // Set a default timeout before reading from the flow. Without this timeout,
780 // a client that sends no request or a partial request will retain the flow
781 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700782 initTimer := newTimer(defaultCallTimeout)
783 defer initTimer.Stop()
784 fs.flow.SetDeadline(initTimer.C)
785
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700786 // Decode the initial request.
787 var req ipc.Request
788 if err := fs.dec.Decode(&req); err != nil {
789 return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
790 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700791 return &req, nil
792}
793
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800794func lookupInvoker(d ipc.Dispatcher, name, method string) (ipc.Invoker, security.Authorizer, error) {
795 obj, auth, err := d.Lookup(name, method)
796 switch {
797 case err != nil:
798 return nil, nil, err
799 case obj == nil:
800 return nil, auth, nil
801 }
802 if invoker, ok := obj.(ipc.Invoker); ok {
803 return invoker, auth, nil
804 }
805 return ipc.ReflectInvoker(obj), auth, nil
806}
807
Matt Rosencrantz86897932014-10-02 09:34:34 -0700808func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
Asim Shankar0cad0832014-11-04 01:27:38 -0800809 fs.starttime = time.Now()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700810 req, verr := fs.readIPCRequest()
811 if verr != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700812 // We don't know what the ipc call was supposed to be, but we'll create
813 // a placeholder span so we can capture annotations.
814 fs.T, _ = ivtrace.WithNewSpan(fs, "Failed IPC Call")
Matt Rosencrantz86897932014-10-02 09:34:34 -0700815 return nil, verr
816 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817 fs.method = req.Method
Matt Rosencrantz86897932014-10-02 09:34:34 -0700818
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700819 // TODO(mattr): Currently this allows users to trigger trace collection
820 // on the server even if they will not be allowed to collect the
821 // results later. This might be consider a DOS vector.
822 spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
823 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700824
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700825 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700826 if req.Timeout != ipc.NoTimeout {
Asim Shankar0cad0832014-11-04 01:27:38 -0800827 fs.T, cancel = fs.WithDeadline(fs.starttime.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700828 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700829 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700830 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700831 fs.flow.SetDeadline(fs.Done())
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700832
Matt Rosencrantz86897932014-10-02 09:34:34 -0700833 // Ensure that the context gets cancelled if the flow is closed
834 // due to a network error, or client cancellation.
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700835 go func() {
Matt Rosencrantzbae08212014-10-03 08:04:17 -0700836 select {
837 case <-fs.flow.Closed():
838 // Here we remove the contexts channel as a deadline to the flow.
839 // We do this to ensure clients get a consistent error when they read/write
840 // after the flow is closed. Since the flow is already closed, it doesn't
841 // matter that the context is also cancelled.
842 fs.flow.SetDeadline(nil)
843 cancel()
844 case <-fs.Done():
845 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700846 }()
847
Asim Shankarb54d7642014-06-05 13:08:04 -0700848 // If additional credentials are provided, make them available in the context
Asim Shankar8f05c222014-10-06 22:08:19 -0700849 var err error
850 if fs.blessings, err = security.NewBlessings(req.GrantedBlessings); err != nil {
851 return nil, verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
852 }
853 // Detect unusable blessings now, rather then discovering they are unusable on first use.
854 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides the
855 // server's identity as the blessing. Figure out what we want to do about this -
856 // should servers be able to assume that a blessing is something that does not
857 // have the authorizations that the server's own identity has?
858 if fs.blessings != nil && !reflect.DeepEqual(fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
859 return nil, verror.BadProtocolf("ipc: blessing granted not bound to this server(%v vs %v)", fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Asim Shankarb54d7642014-06-05 13:08:04 -0700860 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700861 // Receive third party caveat discharges the client sent
862 for i := uint64(0); i < req.NumDischarges; i++ {
Ankurf044a8d2014-09-05 17:05:24 -0700863 var d security.Discharge
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700864 if err := fs.dec.Decode(&d); err != nil {
865 return nil, verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
866 }
Ankurf044a8d2014-09-05 17:05:24 -0700867 fs.discharges[d.ID()] = d
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700868 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700869 // Lookup the invoker.
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800870 invoker, auth, suffix, verr := fs.lookup(req.Suffix, req.Method)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700871 fs.suffix = suffix // with leading /'s stripped
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700872 if verr != nil {
873 return nil, verr
874 }
875 // Prepare invoker and decode args.
876 numArgs := int(req.NumPosArgs)
Asim Shankar214f89c2014-11-03 16:35:47 -0800877 argptrs, tags, err := invoker.Prepare(req.Method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -0800878 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700879 if err != nil {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700880 return nil, verror.Makef(verror.ErrorID(err), "%s: name: %q", err, req.Suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700881 }
882 if len(argptrs) != numArgs {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700883 return nil, verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", req.Method, req.Suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700884 }
885 for ix, argptr := range argptrs {
886 if err := fs.dec.Decode(argptr); err != nil {
887 return nil, verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
888 }
889 }
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700890 fs.allowDebug = fs.LocalPrincipal() == nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700891 // Check application's authorization policy and invoke the method.
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700892 // LocalPrincipal is nil means that the server wanted to avoid authentication,
893 // and thus wanted to skip authorization as well.
Asim Shankar220a0152014-10-30 21:21:09 -0700894 if fs.LocalPrincipal() != nil {
Suharsh Sivakumarcd743f72014-10-27 10:03:42 -0700895 // Check if the caller is permitted to view debug information.
896 if err := fs.authorize(auth); err != nil {
897 return nil, err
898 }
899 fs.allowDebug = fs.authorizeForDebug(auth) == nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700900 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700901
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700902 results, err := invoker.Invoke(req.Method, fs, argptrs)
Asim Shankar0cad0832014-11-04 01:27:38 -0800903 fs.server.stats.record(req.Method, time.Since(fs.starttime))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700904 return results, verror.Convert(err)
905}
906
907// lookup returns the invoker and authorizer responsible for serving the given
Robin Thellendd24f0842014-09-23 10:27:29 -0700908// name and method. The name is stripped of any leading slashes. If it begins
909// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
910// invoker. Otherwise, and we use the server's dispatcher. The (stripped) name
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700911// and dispatch suffix are also returned.
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800912func (fs *flowServer) lookup(name, method string) (ipc.Invoker, security.Authorizer, string, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700913 name = strings.TrimLeft(name, "/")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700914 if method == "Glob" && len(name) == 0 {
Cosmos Nicolaouadd8e4a2014-11-05 22:25:21 -0800915 return ipc.ReflectInvoker(&globInvoker{naming.ReservedNamePrefix, fs}), &acceptAllAuthorizer{}, name, nil
Robin Thellendc26c32e2014-10-06 17:44:04 -0700916 }
Robin Thellendd24f0842014-09-23 10:27:29 -0700917 disp := fs.disp
Cosmos Nicolaouadd8e4a2014-11-05 22:25:21 -0800918 if strings.HasPrefix(name, naming.ReservedNamePrefix) {
919 parts := strings.SplitN(name, "/", 2)
920 if len(parts) > 1 {
921 name = parts[1]
922 } else {
923 name = ""
924 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700925 disp = fs.reservedOpt.Dispatcher
Robin Thellendd24f0842014-09-23 10:27:29 -0700926 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700927
Robin Thellendd24f0842014-09-23 10:27:29 -0700928 if disp != nil {
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800929 invoker, auth, err := lookupInvoker(disp, name, method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700930 switch {
931 case err != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700932 return nil, nil, "", verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700933 case invoker != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700934 return invoker, auth, name, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700935 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700936 }
Robin Thellendc26c32e2014-10-06 17:44:04 -0700937 return nil, nil, "", verror.NoExistf("ipc: invoker not found for %q", name)
938}
939
940type acceptAllAuthorizer struct{}
941
942func (acceptAllAuthorizer) Authorize(security.Context) error {
943 return nil
944}
945
946type globInvoker struct {
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700947 prefix string
948 fs *flowServer
Robin Thellendc26c32e2014-10-06 17:44:04 -0700949}
950
951// Glob matches the pattern against internal object names if the double-
952// underscore prefix is explicitly part of the pattern. Otherwise, it invokes
953// the service's Glob method.
954func (i *globInvoker) Glob(call ipc.ServerCall, pattern string) error {
955 g, err := glob.Parse(pattern)
956 if err != nil {
957 return err
958 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700959 if strings.HasPrefix(pattern, naming.ReservedNamePrefix) {
Robin Thellendc26c32e2014-10-06 17:44:04 -0700960 var err error
961 // Match against internal object names.
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700962 if ok, _, left := g.MatchInitialSegment(i.prefix); ok {
963 if ierr := i.invokeGlob(call, i.fs.reservedOpt.Dispatcher, i.prefix, left.String()); ierr != nil {
964 err = ierr
Robin Thellendc26c32e2014-10-06 17:44:04 -0700965 }
966 }
967 return err
968 }
969 // Invoke the service's method.
970 return i.invokeGlob(call, i.fs.disp, "", pattern)
971}
972
973func (i *globInvoker) invokeGlob(call ipc.ServerCall, d ipc.Dispatcher, prefix, pattern string) error {
974 if d == nil {
975 return nil
976 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800977 obj, auth, err := d.Lookup("", "Glob")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700978 if err != nil {
979 return err
980 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800981 // TODO(cnicolaou): ipc.Serve TRANSITION
982 invoker, ok := obj.(ipc.Invoker)
983 if !ok {
Asim Shankar214f89c2014-11-03 16:35:47 -0800984 panic(fmt.Errorf("Lookup should have returned an ipc.Invoker, returned %T", obj))
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -0800985 }
986 if obj == nil || !ok {
Robin Thellendc26c32e2014-10-06 17:44:04 -0700987 return verror.NoExistf("ipc: invoker not found for Glob")
988 }
989
Asim Shankar214f89c2014-11-03 16:35:47 -0800990 argptrs, tags, err := invoker.Prepare("Glob", 1)
Asim Shankar0cad0832014-11-04 01:27:38 -0800991 i.fs.tags = tags
Robin Thellendc26c32e2014-10-06 17:44:04 -0700992 if err != nil {
993 return verror.Makef(verror.ErrorID(err), "%s", err)
994 }
995 if err := i.fs.authorize(auth); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -0700996 return err
Robin Thellendc26c32e2014-10-06 17:44:04 -0700997 }
998 leafCall := &localServerCall{call, prefix}
999 argptrs[0] = &pattern
1000 results, err := invoker.Invoke("Glob", leafCall, argptrs)
1001 if err != nil {
1002 return err
1003 }
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -07001004
Robin Thellendc26c32e2014-10-06 17:44:04 -07001005 if len(results) != 1 {
1006 return verror.BadArgf("unexpected number of results. Got %d, want 1", len(results))
1007 }
1008 res := results[0]
1009 if res == nil {
1010 return nil
1011 }
Cosmos Nicolaou1ee5e1a2014-11-02 10:20:30 -08001012 err, ok = res.(error)
Robin Thellendc26c32e2014-10-06 17:44:04 -07001013 if !ok {
1014 return verror.BadArgf("unexpected result type. Got %T, want error", res)
1015 }
1016 return err
1017}
1018
1019// An ipc.ServerCall that prepends a prefix to all the names in the streamed
1020// MountEntry objects.
1021type localServerCall struct {
1022 ipc.ServerCall
1023 prefix string
1024}
1025
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -07001026var _ ipc.ServerCall = (*localServerCall)(nil)
1027var _ ipc.Stream = (*localServerCall)(nil)
1028var _ ipc.ServerContext = (*localServerCall)(nil)
1029
Robin Thellendc26c32e2014-10-06 17:44:04 -07001030func (c *localServerCall) Send(v interface{}) error {
1031 me, ok := v.(mttypes.MountEntry)
1032 if !ok {
1033 return verror.BadArgf("unexpected stream type. Got %T, want MountEntry", v)
1034 }
1035 me.Name = naming.Join(c.prefix, me.Name)
1036 return c.ServerCall.Send(me)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001037}
1038
Asim Shankara5457f02014-10-24 23:23:07 -07001039func (fs *flowServer) authorize(auth security.Authorizer) verror.E {
Asim Shankar8f05c222014-10-06 22:08:19 -07001040 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001041 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001042 }
Asim Shankara5457f02014-10-24 23:23:07 -07001043 if err := auth.Authorize(fs); err != nil {
1044 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001045 return verror.NoAccessf("ipc: not authorized to call %q.%q (%v)", fs.Name(), fs.Method(), err)
Asim Shankara5457f02014-10-24 23:23:07 -07001046 }
1047 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001048}
1049
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001050// debugContext is a context which wraps another context but always returns
1051// the debug label.
1052type debugContext struct {
1053 security.Context
1054}
1055
1056func (debugContext) Label() security.Label { return security.DebugLabel }
1057
1058// TODO(mattr): Is DebugLabel the right thing to check?
1059func (fs *flowServer) authorizeForDebug(auth security.Authorizer) error {
1060 dc := debugContext{fs}
Asim Shankar8f05c222014-10-06 22:08:19 -07001061 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001062 auth = defaultAuthorizer{}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001063 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001064 return auth.Authorize(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001065}
1066
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001067// Send implements the ipc.Stream method.
1068func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001069 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001070 // The empty response header indicates what follows is a streaming result.
1071 if err := fs.enc.Encode(ipc.Response{}); err != nil {
1072 return err
1073 }
1074 return fs.enc.Encode(item)
1075}
1076
1077// Recv implements the ipc.Stream method.
1078func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001079 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001080 var req ipc.Request
1081 if err := fs.dec.Decode(&req); err != nil {
1082 return err
1083 }
1084 if req.EndStreamArgs {
1085 fs.endStreamArgs = true
1086 return io.EOF
1087 }
1088 return fs.dec.Decode(itemptr)
1089}
1090
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001091// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001092
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001093func (fs *flowServer) Discharges() map[string]security.Discharge {
1094 //nologcall
1095 return fs.discharges
1096}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001097
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001098func (fs *flowServer) Server() ipc.Server {
1099 //nologcall
1100 return fs.server
1101}
Asim Shankar0cad0832014-11-04 01:27:38 -08001102func (fs *flowServer) Timestamp() time.Time {
1103 //nologcall
1104 return fs.starttime
1105}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001106func (fs *flowServer) Method() string {
1107 //nologcall
1108 return fs.method
1109}
Asim Shankar0cad0832014-11-04 01:27:38 -08001110func (fs *flowServer) MethodTags() []interface{} {
1111 //nologcall
1112 return fs.tags
1113}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001114
1115// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1116// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001117func (fs *flowServer) Name() string {
1118 //nologcall
1119 return fs.suffix
1120}
1121func (fs *flowServer) Suffix() string {
1122 //nologcall
1123 return fs.suffix
1124}
1125func (fs *flowServer) Label() security.Label {
1126 //nologcall
Asim Shankar0cad0832014-11-04 01:27:38 -08001127 for _, t := range fs.tags {
1128 if l, ok := t.(security.Label); ok {
1129 return l
1130 }
1131 }
1132 return security.AdminLabel
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001133}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001134func (fs *flowServer) LocalPrincipal() security.Principal {
1135 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001136 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001137}
1138func (fs *flowServer) LocalBlessings() security.Blessings {
1139 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001140 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001141}
1142func (fs *flowServer) RemoteBlessings() security.Blessings {
1143 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001144 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001145}
Asim Shankar8f05c222014-10-06 22:08:19 -07001146func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001147 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001148 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001149}
1150func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1151 //nologcall
1152 return fs.flow.LocalEndpoint()
1153}
1154func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1155 //nologcall
1156 return fs.flow.RemoteEndpoint()
1157}