blob: 3578abe04376d83041a21e22909de8102188739d [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Robin Thellendc26c32e2014-10-06 17:44:04 -070012 "veyron.io/veyron/veyron/lib/glob"
Jiri Simsa519c5072014-09-17 21:37:57 -070013 "veyron.io/veyron/veyron/lib/netstate"
14 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
15 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
16 isecurity "veyron.io/veyron/veyron/runtimes/google/security"
17 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
18 vsecurity "veyron.io/veyron/veyron/security"
Robin Thellendd24f0842014-09-23 10:27:29 -070019 "veyron.io/veyron/veyron/services/mgmt/debug"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -070021 "veyron.io/veyron/veyron/profiles/internal"
22
Jiri Simsa519c5072014-09-17 21:37:57 -070023 "veyron.io/veyron/veyron2"
24 "veyron.io/veyron/veyron2/config"
25 "veyron.io/veyron/veyron2/context"
26 "veyron.io/veyron/veyron2/ipc"
27 "veyron.io/veyron/veyron2/ipc/stream"
28 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070029 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070030 "veyron.io/veyron/veyron2/security"
Robin Thellendc26c32e2014-10-06 17:44:04 -070031 mttypes "veyron.io/veyron/veyron2/services/mounttable/types"
Jiri Simsa519c5072014-09-17 21:37:57 -070032 "veyron.io/veyron/veyron2/verror"
33 "veyron.io/veyron/veyron2/vlog"
34 "veyron.io/veyron/veyron2/vom"
35 "veyron.io/veyron/veyron2/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036)
37
38var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070039 errServerStopped = verror.Abortedf("ipc: server is stopped")
40)
41
42func errNotAuthorized(err error) verror.E {
Tilak Sharma492e8e92014-09-18 10:58:14 -070043 return verror.NoAccessf("ipc: not authorized(%v)", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044}
45
46type server struct {
47 sync.Mutex
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070048 ctx context.T // context used by the server to make internal RPCs.
49 streamMgr stream.Manager // stream manager to listen for new flows.
50 publisher publisher.Publisher // publisher to publish mounttable mounts.
51 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
52 listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
53 disp ipc.Dispatcher // dispatcher to serve RPCs
54 active sync.WaitGroup // active goroutines we've spawned.
55 stopped bool // whether the server has been stopped.
56 stoppedChan chan struct{} // closed when the server has been stopped.
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070057 ns naming.Namespace
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070058 servesMountTable bool
Robin Thellend27647d22014-09-18 10:06:35 -070059 debugAuthorizer security.Authorizer
Robin Thellendd24f0842014-09-23 10:27:29 -070060 debugDisp ipc.Dispatcher
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070061 // TODO(cnicolaou): add roaming stats to ipcStats
62 stats *ipcStats // stats for this server.
63}
64
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070065var _ ipc.Server = (*server)(nil)
66
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070067type dhcpListener struct {
68 sync.Mutex
69 publisher *config.Publisher // publisher used to fork the stream
70 name string // name of the publisher stream
71 ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
72 port string
73 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070074}
75
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070076func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070077 s := &server{
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070078 ctx: ctx,
79 streamMgr: streamMgr,
80 publisher: publisher.New(ctx, ns, publishPeriod),
81 listeners: make(map[stream.Listener]*dhcpListener),
82 stoppedChan: make(chan struct{}),
83 ns: ns,
84 stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070086 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -070087 switch opt := opt.(type) {
88 case stream.ListenerOpt:
89 // Collect all ServerOpts that are also ListenerOpts.
90 s.listenerOpts = append(s.listenerOpts, opt)
Asim Shankarcc044212014-10-15 23:25:26 -070091 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070092 s.servesMountTable = bool(opt)
Asim Shankarcc044212014-10-15 23:25:26 -070093 case options.DebugAuthorizer:
94 s.debugAuthorizer = opt.Authorizer
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095 }
96 }
Robin Thellendd24f0842014-09-23 10:27:29 -070097 s.debugDisp = debug.NewDispatcher(vlog.Log.LogDir(), s.debugAuthorizer)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070098 return s, nil
99}
100
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700101func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700102 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103 s.Lock()
104 defer s.Unlock()
105 if s.stopped {
106 return nil, errServerStopped
107 }
108 return s.publisher.Published(), nil
109}
110
111// resolveToAddress will try to resolve the input to an address using the
112// mount table, if the input is not already an address.
Asim Shankardee311d2014-08-01 17:41:31 -0700113func (s *server) resolveToAddress(address string) (string, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700114 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700115 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700116 }
Asim Shankardee311d2014-08-01 17:41:31 -0700117 var names []string
118 if s.ns != nil {
119 var err error
120 if names, err = s.ns.Resolve(s.ctx, address); err != nil {
121 return "", err
122 }
123 } else {
124 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700125 }
126 for _, n := range names {
127 address, suffix := naming.SplitAddressName(n)
Asim Shankardee311d2014-08-01 17:41:31 -0700128 if suffix != "" && suffix != "//" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700129 continue
130 }
131 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700132 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700133 }
134 }
Asim Shankardee311d2014-08-01 17:41:31 -0700135 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700136}
137
138func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700139 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700140 s.Lock()
141 // Shortcut if the server is stopped, to avoid needlessly creating a
142 // listener.
143 if s.stopped {
144 s.Unlock()
145 return nil, errServerStopped
146 }
147 s.Unlock()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700148 var proxyName string
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700149 if protocol == inaming.Network {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700150 proxyName = address
Asim Shankardee311d2014-08-01 17:41:31 -0700151 var err error
152 if address, err = s.resolveToAddress(address); err != nil {
153 return nil, err
154 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700155 }
Asim Shankarcc044212014-10-15 23:25:26 -0700156 // TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700157 // it can more cleanly set the IsMountTable bit in the endpoint.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700158 ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
159 if err != nil {
160 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
161 return nil, err
162 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700163 iep, ok := ep.(*inaming.Endpoint)
164 if !ok {
165 return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
166 }
167
Robin Thellend7f424272014-09-04 10:42:14 -0700168 if protocol != inaming.Network {
169 // We know the endpoint format, so we crack it open...
170 switch iep.Protocol {
171 case "tcp", "tcp4", "tcp6":
172 host, port, err := net.SplitHostPort(iep.Address)
173 if err != nil {
174 return nil, err
175 }
176 ip := net.ParseIP(host)
177 if ip == nil {
178 return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
179 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700180 if ip.IsUnspecified() {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700181 addrs, err := netstate.GetAccessibleIPs()
182 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700183 if a, err := internal.IPAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
184 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
Cosmos Nicolaou9a246552014-08-29 13:07:29 -0700185 }
Cosmos Nicolaouf7a11d92014-08-29 09:56:07 -0700186 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700187 }
188 }
189 }
190
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700191 s.Lock()
192 if s.stopped {
193 s.Unlock()
194 // Ignore error return since we can't really do much about it.
195 ln.Close()
196 return nil, errServerStopped
197 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700198 s.listeners[ln] = nil
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700199 // We have a single goroutine per listener to accept new flows.
200 // Each flow is served from its own goroutine.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700201 s.active.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700202 if protocol == inaming.Network {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700203 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700204 s.proxyListenLoop(ln, ep, proxy)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700205 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700206 }(ln, iep, proxyName)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700207 } else {
208 go func(ln stream.Listener, ep naming.Endpoint) {
209 s.listenLoop(ln, ep)
210 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700211 }(ln, iep)
Bogdan Caprita187269b2014-05-13 19:59:46 -0700212 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700213 s.Unlock()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700214 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215 return ep, nil
216}
217
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700218// externalEndpoint examines the endpoint returned by the stream listen call
219// and fills in the address to publish to the mount table. It also returns the
220// IP host address that it selected for publishing to the mount table.
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700221func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700222 // We know the endpoint format, so we crack it open...
223 iep, ok := lep.(*inaming.Endpoint)
224 if !ok {
225 return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
226 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700227 switch iep.Protocol {
228 case "tcp", "tcp4", "tcp6":
229 host, port, err := net.SplitHostPort(iep.Address)
230 if err != nil {
231 return nil, nil, err
232 }
233 ip := net.ParseIP(host)
234 if ip == nil {
235 return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
236 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700237 if ip.IsUnspecified() && chooser != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700238 // Need to find a usable IP address since the call to listen
239 // didn't specify one.
240 addrs, err := netstate.GetAccessibleIPs()
241 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700242 // TODO(cnicolaou): we could return multiple addresses here,
243 // all of which can be exported to the mount table. Look at
244 // this after we transition fully to ListenX.
245 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
246 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
247 return iep, a[0].Address().(*net.IPAddr), nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700248 }
249 }
250 } else {
251 // Listen used a fixed IP address, which essentially disables
252 // roaming.
253 return iep, nil, nil
254 }
255 }
256 return iep, nil, nil
257}
258
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700259func (s *server) ListenX(listenSpec *ipc.ListenSpec) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700260 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700261 s.Lock()
262 // Shortcut if the server is stopped, to avoid needlessly creating a
263 // listener.
264 if s.stopped {
265 s.Unlock()
266 return nil, errServerStopped
267 }
268 s.Unlock()
269
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700270 protocol := listenSpec.Protocol
271 address := listenSpec.Address
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700272 proxyAddress := ""
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700273 if len(listenSpec.Proxy) > 0 {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700274 if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
275 return nil, err
276 } else {
277 proxyAddress = address
278 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700279 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700280
281 ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
282 if err != nil {
283 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
284 return nil, err
285 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700286 ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700287 if err != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700288 ln.Close()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700289 return nil, err
290 }
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700291 if ipaddr == nil {
292 vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
293 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700294
295 s.Lock()
296 if s.stopped {
297 s.Unlock()
298 // Ignore error return since we can't really do much about it.
299 ln.Close()
300 return nil, errServerStopped
301 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700302
303 h, _, _ := net.SplitHostPort(address)
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700304 publisher := listenSpec.StreamPublisher
305 if ip := net.ParseIP(h); ip != nil && ip.IsLoopback() && publisher != nil {
306 streamName := listenSpec.StreamName
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700307 ch := make(chan config.Setting)
308 _, err := publisher.ForkStream(streamName, ch)
309 if err != nil {
310 return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
311 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700312 _, port, _ := net.SplitHostPort(ep.Address)
313 dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
314
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700315 // We have a goroutine to listen for dhcp changes.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700316 s.active.Add(1)
317 // goroutine to listen for address changes.
318 go func(dl *dhcpListener) {
319 s.dhcpLoop(dl)
320 s.active.Done()
321 }(dhcpl)
322 s.listeners[ln] = dhcpl
323 } else {
324 s.listeners[ln] = nil
325 }
326
327 // We have a goroutine per listener to accept new flows.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700328 // Each flow is served from its own goroutine.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700329 s.active.Add(1)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700330
331 // goroutine to listen for connections
332 go func(ln stream.Listener, ep naming.Endpoint) {
333 s.listenLoop(ln, ep)
334 s.active.Done()
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700335 }(ln, lep)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700336
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700337 if len(proxyAddress) > 0 {
338 pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
339 if err != nil {
340 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
341 return nil, err
342 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700343 ipep, ok := pep.(*inaming.Endpoint)
344 if !ok {
345 return nil, fmt.Errorf("failed translating internal endpoint data types")
346 }
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700347 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700348 s.active.Add(1)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700349 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700350 s.proxyListenLoop(ln, ep, proxy)
351 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700352 }(pln, ipep, listenSpec.Proxy)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700353 s.listeners[pln] = nil
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700354 // TODO(cnicolaou,p): AddServer no longer needs to take the
355 // servesMountTable bool since it can be extracted from the endpoint.
356 s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700357 } else {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700358 s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700359 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700360 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700361 return ep, nil
362}
363
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700364func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700365 var name string
366 if !s.servesMountTable {
367 // Make sure that client MountTable code doesn't try and
368 // ResolveStep past this final address.
369 name = "//"
370 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700371 ep.IsMountTable = servesMountTable
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700372 return naming.JoinAddressName(ep.String(), name)
373}
374
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700375func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700376 const (
377 min = 5 * time.Millisecond
378 max = 5 * time.Minute
379 )
380 for {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700381 s.listenLoop(ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700382 // The listener is done, so:
383 // (1) Unpublish its name
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700384 s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700385 // (2) Reconnect to the proxy unless the server has been stopped
386 backoff := min
387 ln = nil
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700388 // TODO(ashankar,cnicolaou): this code is way too confusing and should
389 // be cleaned up.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700390 for ln == nil {
391 select {
392 case <-time.After(backoff):
Asim Shankardee311d2014-08-01 17:41:31 -0700393 resolved, err := s.resolveToAddress(proxy)
394 if err != nil {
395 vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700396 if backoff = backoff * 2; backoff > max {
397 backoff = max
398 }
Asim Shankardee311d2014-08-01 17:41:31 -0700399 break
400 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700401 var ep naming.Endpoint
Asim Shankardee311d2014-08-01 17:41:31 -0700402 ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700403 if err == nil {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700404 var ok bool
405 iep, ok = ep.(*inaming.Endpoint)
406 if !ok {
407 vlog.Errorf("failed translating internal endpoint data types")
408 ln = nil
409 continue
410 }
411 vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700412 break
413 }
414 if backoff = backoff * 2; backoff > max {
415 backoff = max
416 }
417 vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
418 case <-s.stoppedChan:
419 return
420 }
421 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700422 // TODO(cnicolaou,ashankar): this won't work when we are both
423 // proxying and publishing locally, which is the common case.
424 // listenLoop, dhcpLoop and the original publish are all publishing
425 // addresses to the same name, but the client is not smart enough
426 // to choose sensibly between them.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700427 // (3) reconnected, publish new address
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700428 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700429 s.Lock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700430 s.listeners[ln] = nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700431 s.Unlock()
432 }
433}
434
435func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
436 defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
437 defer func() {
438 s.Lock()
439 delete(s.listeners, ln)
440 s.Unlock()
441 }()
442 for {
443 flow, err := ln.Accept()
444 if err != nil {
445 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
446 return
447 }
448 s.active.Add(1)
449 go func(flow stream.Flow) {
450 if err := newFlowServer(flow, s).serve(); err != nil {
451 // TODO(caprita): Logging errors here is
452 // too spammy. For example, "not
453 // authorized" errors shouldn't be
454 // logged as server errors.
455 vlog.Errorf("Flow serve on %v failed: %v", ln, err)
456 }
457 s.active.Done()
458 }(flow)
459 }
460}
461
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700462func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
463 dhcpl.Lock()
464 defer dhcpl.Unlock()
465 for _, a := range addrs {
466 if ip := netstate.AsIP(a); ip != nil {
467 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700468 fn(s.publishEP(dhcpl.ep, s.servesMountTable))
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700469 }
470 }
471}
472
473func (s *server) dhcpLoop(dhcpl *dhcpListener) {
474 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
475 vlog.VI(2).Infof("ipc: dhcp loop")
476 for setting := range dhcpl.ch {
477 if setting == nil {
478 return
479 }
480 switch v := setting.Value().(type) {
481 case bool:
482 return
483 case []net.Addr:
484 s.Lock()
485 if s.stopped {
486 s.Unlock()
487 return
488 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700489 // TODO(cnicolaou,ashankar): this won't work when we are both
490 // proxying and publishing locally, which is the common case.
491 // listenLoop, dhcpLoop and the original publish are all publishing
492 // addresses to the same name, but the client is not smart enough
493 // to choose sensibly between them.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700494 publisher := s.publisher
495 s.Unlock()
496 switch setting.Name() {
497 case ipc.NewAddrsSetting:
498 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700499 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700500 case ipc.RmAddrsSetting:
501 vlog.Infof("Removed some addresses: %q", v)
502 s.applyChange(dhcpl, v, publisher.RemoveServer)
503 }
504
505 }
506 }
507}
508
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700509func (s *server) Serve(name string, disp ipc.Dispatcher) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700510 defer vlog.LogCall()()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700511 s.Lock()
512 defer s.Unlock()
513 if s.stopped {
514 return errServerStopped
515 }
516 if s.disp != nil && disp != nil && s.disp != disp {
517 return fmt.Errorf("attempt to change dispatcher")
518 }
519 if disp != nil {
520 s.disp = disp
521 }
522 if len(name) > 0 {
523 s.publisher.AddName(name)
524 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700525 return nil
526}
527
528func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700529 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700530 s.Lock()
531 if s.stopped {
532 s.Unlock()
533 return nil
534 }
535 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700536 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700537 s.Unlock()
538
Robin Thellenddf428232014-10-06 12:50:44 -0700539 // Delete the stats object.
540 s.stats.stop()
541
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700542 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
543 // server lock, since publisher is safe for concurrent access.
544
545 // Stop the publisher, which triggers unmounting of published names.
546 s.publisher.Stop()
547 // Wait for the publisher to be done unmounting before we can proceed to
548 // close the listeners (to minimize the number of mounted names pointing
549 // to endpoint that are no longer serving).
550 //
551 // TODO(caprita): See if make sense to fail fast on rejecting
552 // connections once listeners are closed, and parallelize the publisher
553 // and listener shutdown.
554 s.publisher.WaitForStop()
555
556 s.Lock()
557 // Close all listeners. No new flows will be accepted, while in-flight
558 // flows will continue until they terminate naturally.
559 nListeners := len(s.listeners)
560 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700561
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700562 for ln, dhcpl := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700563 go func(ln stream.Listener) {
564 errCh <- ln.Close()
565 }(ln)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700566 if dhcpl != nil {
567 dhcpl.Lock()
568 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
569 dhcpl.ch <- config.NewBool("EOF", "stop", true)
570 dhcpl.Unlock()
571 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700572 }
573 s.Unlock()
574 var firstErr error
575 for i := 0; i < nListeners; i++ {
576 if err := <-errCh; err != nil && firstErr == nil {
577 firstErr = err
578 }
579 }
580 // At this point, we are guaranteed that no new requests are going to be
581 // accepted.
582
583 // Wait for the publisher and active listener + flows to finish.
584 s.active.Wait()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700585 s.Lock()
586 s.disp = nil
587 s.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700588 return firstErr
589}
590
591// flowServer implements the RPC server-side protocol for a single RPC, over a
592// flow that's already connected to the client.
593type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700594 context.T
Robin Thellendd24f0842014-09-23 10:27:29 -0700595 server *server // ipc.Server that this flow server belongs to
596 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
597 dec *vom.Decoder // to decode requests and args from the client
598 enc *vom.Encoder // to encode responses and results to the client
599 flow stream.Flow // underlying flow
600 debugDisp ipc.Dispatcher // internal debug dispatcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700601 // Fields filled in during the server invocation.
602
603 // authorizedRemoteID is the PublicID obtained after authorizing the remoteID
604 // of the underlying flow for the current request context.
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700605 authorizedRemoteID security.PublicID
Asim Shankar8f05c222014-10-06 22:08:19 -0700606 blessings security.Blessings
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700607 method, suffix string
608 label security.Label
Ankurf044a8d2014-09-05 17:05:24 -0700609 discharges map[string]security.Discharge
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700610 deadline time.Time
611 endStreamArgs bool // are the stream args at EOF?
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700612 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700613}
614
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700615var _ ipc.Stream = (*flowServer)(nil)
616
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700617func newFlowServer(flow stream.Flow, server *server) *flowServer {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700618 server.Lock()
619 disp := server.disp
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700620 runtime := veyron2.RuntimeFromContext(server.ctx)
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700621 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700622
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700623 return &flowServer{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700624 T: InternalNewContext(runtime),
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700625 server: server,
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700626 disp: disp,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700627 // TODO(toddw): Support different codecs
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700628 dec: vom.NewDecoder(flow),
629 enc: vom.NewEncoder(flow),
630 flow: flow,
Robin Thellendd24f0842014-09-23 10:27:29 -0700631 debugDisp: server.debugDisp,
Ankurf044a8d2014-09-05 17:05:24 -0700632 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700633 }
634}
635
636// Vom does not encode untyped nils.
637// Consequently, the ipc system does not allow nil results with an interface
638// type from server methods. The one exception being errors.
639//
640// For now, the following hacky assumptions are made, which will be revisited when
641// a decision is made on how untyped nils should be encoded/decoded in
642// vom/vom2:
643//
644// - Server methods return 0 or more results
645// - Any values returned by the server that have an interface type are either
646// non-nil or of type error.
647func result2vom(res interface{}) vom.Value {
648 v := vom.ValueOf(res)
649 if !v.IsValid() {
650 // Untyped nils are assumed to be nil-errors.
651 var boxed verror.E
652 return vom.ValueOf(&boxed).Elem()
653 }
654 if err, iserr := res.(error); iserr {
655 // Convert errors to verror since errors are often not
656 // serializable via vom/gob (errors.New and fmt.Errorf return a
657 // type with no exported fields).
658 return vom.ValueOf(verror.Convert(err))
659 }
660 return v
661}
662
Asim Shankar8f05c222014-10-06 22:08:19 -0700663func defaultAuthorizer(ctx security.Context) security.Authorizer {
664 var blessings []string
665 if ctx.LocalBlessings() == nil { // TODO(ashankar): This will go away once the old security model is removed
666 blessings = ctx.LocalID().Names()
667 } else {
668 blessings = ctx.LocalBlessings().ForContext(ctx)
Ankur992269a2014-05-13 13:03:24 -0700669 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700670 acl := security.ACL{In: make(map[security.BlessingPattern]security.LabelSet)}
671 for _, b := range blessings {
672 acl.In[security.BlessingPattern(b).MakeGlob()] = security.AllLabels
Ankur992269a2014-05-13 13:03:24 -0700673 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700674 return vsecurity.NewACLAuthorizer(acl)
Ankur992269a2014-05-13 13:03:24 -0700675}
676
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700677func (fs *flowServer) serve() error {
678 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700679
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700680 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700681
682 var traceResponse vtrace.Response
683 if fs.allowDebug {
684 traceResponse = ivtrace.Response(fs)
685 }
686
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700687 // Respond to the client with the response header and positional results.
688 response := ipc.Response{
689 Error: err,
690 EndStreamResults: true,
691 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700692 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700693 }
694 if err := fs.enc.Encode(response); err != nil {
695 return verror.BadProtocolf("ipc: response encoding failed: %v", err)
696 }
697 if response.Error != nil {
698 return response.Error
699 }
700 for ix, res := range results {
701 if err := fs.enc.EncodeValue(result2vom(res)); err != nil {
702 return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
703 }
704 }
705 // TODO(ashankar): Should unread data from the flow be drained?
706 //
707 // Reason to do so:
708 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
709 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
710 // slices will not be returned to the pool leading to possibly increased memory usage.
711 //
712 // Reason to not do so:
713 // Draining here will conflict with any Reads on the flow in a separate goroutine
714 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
715 //
716 // For now, go with the reason to not do so as having unread data in the stream
717 // should be a rare case.
718 return nil
719}
720
Matt Rosencrantz86897932014-10-02 09:34:34 -0700721func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700722 // Set a default timeout before reading from the flow. Without this timeout,
723 // a client that sends no request or a partial request will retain the flow
724 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700725 initTimer := newTimer(defaultCallTimeout)
726 defer initTimer.Stop()
727 fs.flow.SetDeadline(initTimer.C)
728
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700729 // Decode the initial request.
730 var req ipc.Request
731 if err := fs.dec.Decode(&req); err != nil {
732 return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
733 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700734 return &req, nil
735}
736
737func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
738 start := time.Now()
739
740 req, verr := fs.readIPCRequest()
741 if verr != nil {
742 return nil, verr
743 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700744 fs.method = req.Method
Matt Rosencrantz86897932014-10-02 09:34:34 -0700745
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700746 // TODO(mattr): Currently this allows users to trigger trace collection
747 // on the server even if they will not be allowed to collect the
748 // results later. This might be consider a DOS vector.
749 spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
750 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700751
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700752 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700753 if req.Timeout != ipc.NoTimeout {
754 fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700755 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700756 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700757 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700758 fs.flow.SetDeadline(fs.Done())
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700759
Matt Rosencrantz86897932014-10-02 09:34:34 -0700760 // Ensure that the context gets cancelled if the flow is closed
761 // due to a network error, or client cancellation.
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700762 go func() {
Matt Rosencrantzbae08212014-10-03 08:04:17 -0700763 select {
764 case <-fs.flow.Closed():
765 // Here we remove the contexts channel as a deadline to the flow.
766 // We do this to ensure clients get a consistent error when they read/write
767 // after the flow is closed. Since the flow is already closed, it doesn't
768 // matter that the context is also cancelled.
769 fs.flow.SetDeadline(nil)
770 cancel()
771 case <-fs.Done():
772 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700773 }()
774
Asim Shankarb54d7642014-06-05 13:08:04 -0700775 // If additional credentials are provided, make them available in the context
Asim Shankar8f05c222014-10-06 22:08:19 -0700776 var err error
777 if fs.blessings, err = security.NewBlessings(req.GrantedBlessings); err != nil {
778 return nil, verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
779 }
780 // Detect unusable blessings now, rather then discovering they are unusable on first use.
781 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides the
782 // server's identity as the blessing. Figure out what we want to do about this -
783 // should servers be able to assume that a blessing is something that does not
784 // have the authorizations that the server's own identity has?
785 if fs.blessings != nil && !reflect.DeepEqual(fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
786 return nil, verror.BadProtocolf("ipc: blessing granted not bound to this server(%v vs %v)", fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Asim Shankarb54d7642014-06-05 13:08:04 -0700787 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700788 // Receive third party caveat discharges the client sent
789 for i := uint64(0); i < req.NumDischarges; i++ {
Ankurf044a8d2014-09-05 17:05:24 -0700790 var d security.Discharge
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700791 if err := fs.dec.Decode(&d); err != nil {
792 return nil, verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
793 }
Ankurf044a8d2014-09-05 17:05:24 -0700794 fs.discharges[d.ID()] = d
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700795 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700796 // Lookup the invoker.
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700797 invoker, auth, suffix, verr := fs.lookup(req.Suffix, req.Method)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700798 fs.suffix = suffix // with leading /'s stripped
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700799 if verr != nil {
800 return nil, verr
801 }
802 // Prepare invoker and decode args.
803 numArgs := int(req.NumPosArgs)
804 argptrs, label, err := invoker.Prepare(req.Method, numArgs)
805 fs.label = label
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700806 if err != nil {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700807 return nil, verror.Makef(verror.ErrorID(err), "%s: name: %q", err, req.Suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700808 }
809 if len(argptrs) != numArgs {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700810 return nil, verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", req.Method, req.Suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700811 }
812 for ix, argptr := range argptrs {
813 if err := fs.dec.Decode(argptr); err != nil {
814 return nil, verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
815 }
816 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817 if remoteID := fs.flow.RemoteID(); remoteID != nil {
Asim Shankar8f05c222014-10-06 22:08:19 -0700818 // TODO(ashankar): This whole check goes away once the old security model is ripped out.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700819 if fs.authorizedRemoteID, err = remoteID.Authorize(isecurity.NewContext(
820 isecurity.ContextArgs{
821 LocalID: fs.flow.LocalID(),
822 RemoteID: fs.flow.RemoteID(),
823 Method: fs.method,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700824 Suffix: fs.suffix,
825 Discharges: fs.discharges,
826 Label: fs.label})); err != nil {
827 return nil, errNotAuthorized(err)
828 }
829 }
830 // Check application's authorization policy and invoke the method.
831 if err := fs.authorize(auth); err != nil {
832 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error (err)?
Asim Shankar8f05c222014-10-06 22:08:19 -0700833 return nil, errNotAuthorized(fmt.Errorf("%v (PublicID:%v) not authorized for %q.%q: %v", fs.RemoteBlessings(), fs.RemoteID(), fs.Name(), fs.Method(), err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700834 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700835 // Check if the caller is permitted to view debug information.
836 fs.allowDebug = fs.authorizeForDebug(auth) == nil
837
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838 results, err := invoker.Invoke(req.Method, fs, argptrs)
Robin Thellend8eb77522014-08-28 14:12:01 -0700839 fs.server.stats.record(req.Method, time.Since(start))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700840 return results, verror.Convert(err)
841}
842
843// lookup returns the invoker and authorizer responsible for serving the given
Robin Thellendd24f0842014-09-23 10:27:29 -0700844// name and method. The name is stripped of any leading slashes. If it begins
845// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
846// invoker. Otherwise, and we use the server's dispatcher. The (stripped) name
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700847// and dispatch suffix are also returned.
848func (fs *flowServer) lookup(name, method string) (ipc.Invoker, security.Authorizer, string, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700849 name = strings.TrimLeft(name, "/")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700850 if method == "Glob" && len(name) == 0 {
851 return ipc.ReflectInvoker(&globInvoker{fs}), &acceptAllAuthorizer{}, name, nil
852 }
Robin Thellendd24f0842014-09-23 10:27:29 -0700853 disp := fs.disp
854 if name == ipc.DebugKeyword || strings.HasPrefix(name, ipc.DebugKeyword+"/") {
855 name = strings.TrimPrefix(name, ipc.DebugKeyword)
856 name = strings.TrimLeft(name, "/")
857 disp = fs.debugDisp
858 }
859 if disp != nil {
860 invoker, auth, err := disp.Lookup(name, method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700861 switch {
862 case err != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700863 return nil, nil, "", verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700864 case invoker != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700865 return invoker, auth, name, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700866 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700867 }
Robin Thellendc26c32e2014-10-06 17:44:04 -0700868 return nil, nil, "", verror.NoExistf("ipc: invoker not found for %q", name)
869}
870
871type acceptAllAuthorizer struct{}
872
873func (acceptAllAuthorizer) Authorize(security.Context) error {
874 return nil
875}
876
877type globInvoker struct {
878 fs *flowServer
879}
880
881// Glob matches the pattern against internal object names if the double-
882// underscore prefix is explicitly part of the pattern. Otherwise, it invokes
883// the service's Glob method.
884func (i *globInvoker) Glob(call ipc.ServerCall, pattern string) error {
885 g, err := glob.Parse(pattern)
886 if err != nil {
887 return err
888 }
889 if strings.HasPrefix(pattern, "__") {
890 var err error
891 // Match against internal object names.
892 internalLeaves := []string{ipc.DebugKeyword}
893 for _, leaf := range internalLeaves {
894 if ok, _, left := g.MatchInitialSegment(leaf); ok {
895 if ierr := i.invokeGlob(call, i.fs.debugDisp, leaf, left.String()); ierr != nil {
896 err = ierr
897 }
898 }
899 }
900 return err
901 }
902 // Invoke the service's method.
903 return i.invokeGlob(call, i.fs.disp, "", pattern)
904}
905
906func (i *globInvoker) invokeGlob(call ipc.ServerCall, d ipc.Dispatcher, prefix, pattern string) error {
907 if d == nil {
908 return nil
909 }
910 invoker, auth, err := d.Lookup("", "Glob")
911 if err != nil {
912 return err
913 }
914 if invoker == nil {
915 return verror.NoExistf("ipc: invoker not found for Glob")
916 }
917
918 argptrs, label, err := invoker.Prepare("Glob", 1)
919 i.fs.label = label
920 if err != nil {
921 return verror.Makef(verror.ErrorID(err), "%s", err)
922 }
923 if err := i.fs.authorize(auth); err != nil {
924 return errNotAuthorized(fmt.Errorf("%q not authorized for method %q: %v", i.fs.RemoteID(), i.fs.Method(), err))
925 }
926 leafCall := &localServerCall{call, prefix}
927 argptrs[0] = &pattern
928 results, err := invoker.Invoke("Glob", leafCall, argptrs)
929 if err != nil {
930 return err
931 }
932 if len(results) != 1 {
933 return verror.BadArgf("unexpected number of results. Got %d, want 1", len(results))
934 }
935 res := results[0]
936 if res == nil {
937 return nil
938 }
939 err, ok := res.(error)
940 if !ok {
941 return verror.BadArgf("unexpected result type. Got %T, want error", res)
942 }
943 return err
944}
945
946// An ipc.ServerCall that prepends a prefix to all the names in the streamed
947// MountEntry objects.
948type localServerCall struct {
949 ipc.ServerCall
950 prefix string
951}
952
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700953var _ ipc.ServerCall = (*localServerCall)(nil)
954var _ ipc.Stream = (*localServerCall)(nil)
955var _ ipc.ServerContext = (*localServerCall)(nil)
956
Robin Thellendc26c32e2014-10-06 17:44:04 -0700957func (c *localServerCall) Send(v interface{}) error {
958 me, ok := v.(mttypes.MountEntry)
959 if !ok {
960 return verror.BadArgf("unexpected stream type. Got %T, want MountEntry", v)
961 }
962 me.Name = naming.Join(c.prefix, me.Name)
963 return c.ServerCall.Send(me)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700964}
965
966func (fs *flowServer) authorize(auth security.Authorizer) error {
Asim Shankar8f05c222014-10-06 22:08:19 -0700967 if auth == nil {
968 auth = defaultAuthorizer(fs)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700969 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700970 return auth.Authorize(fs)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700971}
972
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700973// debugContext is a context which wraps another context but always returns
974// the debug label.
975type debugContext struct {
976 security.Context
977}
978
979func (debugContext) Label() security.Label { return security.DebugLabel }
980
981// TODO(mattr): Is DebugLabel the right thing to check?
982func (fs *flowServer) authorizeForDebug(auth security.Authorizer) error {
983 dc := debugContext{fs}
Asim Shankar8f05c222014-10-06 22:08:19 -0700984 if auth == nil {
985 auth = defaultAuthorizer(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700986 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700987 return auth.Authorize(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700988}
989
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700990// Send implements the ipc.Stream method.
991func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700992 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700993 // The empty response header indicates what follows is a streaming result.
994 if err := fs.enc.Encode(ipc.Response{}); err != nil {
995 return err
996 }
997 return fs.enc.Encode(item)
998}
999
1000// Recv implements the ipc.Stream method.
1001func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001002 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001003 var req ipc.Request
1004 if err := fs.dec.Decode(&req); err != nil {
1005 return err
1006 }
1007 if req.EndStreamArgs {
1008 fs.endStreamArgs = true
1009 return io.EOF
1010 }
1011 return fs.dec.Decode(itemptr)
1012}
1013
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001014// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001015
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001016func (fs *flowServer) Discharges() map[string]security.Discharge {
1017 //nologcall
1018 return fs.discharges
1019}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001020
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001021func (fs *flowServer) Server() ipc.Server {
1022 //nologcall
1023 return fs.server
1024}
1025func (fs *flowServer) Method() string {
1026 //nologcall
1027 return fs.method
1028}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001029
1030// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1031// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001032func (fs *flowServer) Name() string {
1033 //nologcall
1034 return fs.suffix
1035}
1036func (fs *flowServer) Suffix() string {
1037 //nologcall
1038 return fs.suffix
1039}
1040func (fs *flowServer) Label() security.Label {
1041 //nologcall
1042 return fs.label
1043}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001044
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001045func (fs *flowServer) LocalID() security.PublicID {
1046 //nologcall
1047 return fs.flow.LocalID()
1048}
1049func (fs *flowServer) RemoteID() security.PublicID {
1050 //nologcall
1051 return fs.authorizedRemoteID
1052}
1053func (fs *flowServer) LocalPrincipal() security.Principal {
1054 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001055 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001056}
1057func (fs *flowServer) LocalBlessings() security.Blessings {
1058 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001059 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001060}
1061func (fs *flowServer) RemoteBlessings() security.Blessings {
1062 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001063 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001064}
Asim Shankar8f05c222014-10-06 22:08:19 -07001065func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001066 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001067 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001068}
1069func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1070 //nologcall
1071 return fs.flow.LocalEndpoint()
1072}
1073func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1074 //nologcall
1075 return fs.flow.RemoteEndpoint()
1076}