blob: 608424f00c5897a1a3dbe40117ae5b44b16c96c0 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Robin Thellendc26c32e2014-10-06 17:44:04 -070012 "veyron.io/veyron/veyron/lib/glob"
Jiri Simsa519c5072014-09-17 21:37:57 -070013 "veyron.io/veyron/veyron/lib/netstate"
14 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
15 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
16 isecurity "veyron.io/veyron/veyron/runtimes/google/security"
17 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
18 vsecurity "veyron.io/veyron/veyron/security"
Robin Thellendd24f0842014-09-23 10:27:29 -070019 "veyron.io/veyron/veyron/services/mgmt/debug"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -070021 "veyron.io/veyron/veyron/profiles/internal"
22
Jiri Simsa519c5072014-09-17 21:37:57 -070023 "veyron.io/veyron/veyron2"
24 "veyron.io/veyron/veyron2/config"
25 "veyron.io/veyron/veyron2/context"
26 "veyron.io/veyron/veyron2/ipc"
27 "veyron.io/veyron/veyron2/ipc/stream"
28 "veyron.io/veyron/veyron2/naming"
29 "veyron.io/veyron/veyron2/security"
Robin Thellendc26c32e2014-10-06 17:44:04 -070030 mttypes "veyron.io/veyron/veyron2/services/mounttable/types"
Jiri Simsa519c5072014-09-17 21:37:57 -070031 "veyron.io/veyron/veyron2/verror"
32 "veyron.io/veyron/veyron2/vlog"
33 "veyron.io/veyron/veyron2/vom"
34 "veyron.io/veyron/veyron2/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070035)
36
37var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038 errServerStopped = verror.Abortedf("ipc: server is stopped")
39)
40
41func errNotAuthorized(err error) verror.E {
Tilak Sharma492e8e92014-09-18 10:58:14 -070042 return verror.NoAccessf("ipc: not authorized(%v)", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070043}
44
45type server struct {
46 sync.Mutex
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070047 ctx context.T // context used by the server to make internal RPCs.
48 streamMgr stream.Manager // stream manager to listen for new flows.
49 publisher publisher.Publisher // publisher to publish mounttable mounts.
50 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
51 listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
52 disp ipc.Dispatcher // dispatcher to serve RPCs
53 active sync.WaitGroup // active goroutines we've spawned.
54 stopped bool // whether the server has been stopped.
55 stoppedChan chan struct{} // closed when the server has been stopped.
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070056 ns naming.Namespace
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070057 servesMountTable bool
Robin Thellend27647d22014-09-18 10:06:35 -070058 debugAuthorizer security.Authorizer
Robin Thellendd24f0842014-09-23 10:27:29 -070059 debugDisp ipc.Dispatcher
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070060 // TODO(cnicolaou): add roaming stats to ipcStats
61 stats *ipcStats // stats for this server.
62}
63
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070064var _ ipc.Server = (*server)(nil)
65
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070066type dhcpListener struct {
67 sync.Mutex
68 publisher *config.Publisher // publisher used to fork the stream
69 name string // name of the publisher stream
70 ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
71 port string
72 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070073}
74
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070075func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070076 s := &server{
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070077 ctx: ctx,
78 streamMgr: streamMgr,
79 publisher: publisher.New(ctx, ns, publishPeriod),
80 listeners: make(map[stream.Listener]*dhcpListener),
81 stoppedChan: make(chan struct{}),
82 ns: ns,
83 stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070084 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -070086 switch opt := opt.(type) {
87 case stream.ListenerOpt:
88 // Collect all ServerOpts that are also ListenerOpts.
89 s.listenerOpts = append(s.listenerOpts, opt)
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070090 case veyron2.ServesMountTableOpt:
91 s.servesMountTable = bool(opt)
Robin Thellend27647d22014-09-18 10:06:35 -070092 case veyron2.DebugAuthorizerOpt:
93 s.debugAuthorizer = security.Authorizer(opt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070094 }
95 }
Robin Thellendd24f0842014-09-23 10:27:29 -070096 s.debugDisp = debug.NewDispatcher(vlog.Log.LogDir(), s.debugAuthorizer)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070097 return s, nil
98}
99
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700100func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700101 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700102 s.Lock()
103 defer s.Unlock()
104 if s.stopped {
105 return nil, errServerStopped
106 }
107 return s.publisher.Published(), nil
108}
109
110// resolveToAddress will try to resolve the input to an address using the
111// mount table, if the input is not already an address.
Asim Shankardee311d2014-08-01 17:41:31 -0700112func (s *server) resolveToAddress(address string) (string, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700113 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700114 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 }
Asim Shankardee311d2014-08-01 17:41:31 -0700116 var names []string
117 if s.ns != nil {
118 var err error
119 if names, err = s.ns.Resolve(s.ctx, address); err != nil {
120 return "", err
121 }
122 } else {
123 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700124 }
125 for _, n := range names {
126 address, suffix := naming.SplitAddressName(n)
Asim Shankardee311d2014-08-01 17:41:31 -0700127 if suffix != "" && suffix != "//" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700128 continue
129 }
130 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700131 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700132 }
133 }
Asim Shankardee311d2014-08-01 17:41:31 -0700134 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700135}
136
137func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700138 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700139 s.Lock()
140 // Shortcut if the server is stopped, to avoid needlessly creating a
141 // listener.
142 if s.stopped {
143 s.Unlock()
144 return nil, errServerStopped
145 }
146 s.Unlock()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700147 var proxyName string
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700148 if protocol == inaming.Network {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700149 proxyName = address
Asim Shankardee311d2014-08-01 17:41:31 -0700150 var err error
151 if address, err = s.resolveToAddress(address); err != nil {
152 return nil, err
153 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700154 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700155 // TODO(cnicolaou): pass ServesMountTableOpt to streamMgr.Listen so that
156 // it can more cleanly set the IsMountTable bit in the endpoint.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700157 ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
158 if err != nil {
159 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
160 return nil, err
161 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700162 iep, ok := ep.(*inaming.Endpoint)
163 if !ok {
164 return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
165 }
166
Robin Thellend7f424272014-09-04 10:42:14 -0700167 if protocol != inaming.Network {
168 // We know the endpoint format, so we crack it open...
169 switch iep.Protocol {
170 case "tcp", "tcp4", "tcp6":
171 host, port, err := net.SplitHostPort(iep.Address)
172 if err != nil {
173 return nil, err
174 }
175 ip := net.ParseIP(host)
176 if ip == nil {
177 return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
178 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700179 if ip.IsUnspecified() {
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700180 addrs, err := netstate.GetAccessibleIPs()
181 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700182 if a, err := internal.IPAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
183 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
Cosmos Nicolaou9a246552014-08-29 13:07:29 -0700184 }
Cosmos Nicolaouf7a11d92014-08-29 09:56:07 -0700185 }
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700186 }
187 }
188 }
189
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700190 s.Lock()
191 if s.stopped {
192 s.Unlock()
193 // Ignore error return since we can't really do much about it.
194 ln.Close()
195 return nil, errServerStopped
196 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700197 s.listeners[ln] = nil
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700198 // We have a single goroutine per listener to accept new flows.
199 // Each flow is served from its own goroutine.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700200 s.active.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700201 if protocol == inaming.Network {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700202 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -0700203 s.proxyListenLoop(ln, ep, proxy)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700204 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700205 }(ln, iep, proxyName)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700206 } else {
207 go func(ln stream.Listener, ep naming.Endpoint) {
208 s.listenLoop(ln, ep)
209 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700210 }(ln, iep)
Bogdan Caprita187269b2014-05-13 19:59:46 -0700211 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700212 s.Unlock()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700213 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700214 return ep, nil
215}
216
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700217// externalEndpoint examines the endpoint returned by the stream listen call
218// and fills in the address to publish to the mount table. It also returns the
219// IP host address that it selected for publishing to the mount table.
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700220func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700221 // We know the endpoint format, so we crack it open...
222 iep, ok := lep.(*inaming.Endpoint)
223 if !ok {
224 return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
225 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700226 switch iep.Protocol {
227 case "tcp", "tcp4", "tcp6":
228 host, port, err := net.SplitHostPort(iep.Address)
229 if err != nil {
230 return nil, nil, err
231 }
232 ip := net.ParseIP(host)
233 if ip == nil {
234 return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
235 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700236 if ip.IsUnspecified() && chooser != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700237 // Need to find a usable IP address since the call to listen
238 // didn't specify one.
239 addrs, err := netstate.GetAccessibleIPs()
240 if err == nil {
Cosmos Nicolaou66bc1202014-09-30 20:42:43 -0700241 // TODO(cnicolaou): we could return multiple addresses here,
242 // all of which can be exported to the mount table. Look at
243 // this after we transition fully to ListenX.
244 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
245 iep.Address = net.JoinHostPort(a[0].Address().String(), port)
246 return iep, a[0].Address().(*net.IPAddr), nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700247 }
248 }
249 } else {
250 // Listen used a fixed IP address, which essentially disables
251 // roaming.
252 return iep, nil, nil
253 }
254 }
255 return iep, nil, nil
256}
257
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700258func (s *server) ListenX(listenSpec *ipc.ListenSpec) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700259 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700260 s.Lock()
261 // Shortcut if the server is stopped, to avoid needlessly creating a
262 // listener.
263 if s.stopped {
264 s.Unlock()
265 return nil, errServerStopped
266 }
267 s.Unlock()
268
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700269 protocol := listenSpec.Protocol
270 address := listenSpec.Address
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700271 proxyAddress := ""
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700272 if len(listenSpec.Proxy) > 0 {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700273 if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
274 return nil, err
275 } else {
276 proxyAddress = address
277 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700278 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700279
280 ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
281 if err != nil {
282 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
283 return nil, err
284 }
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700285 ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700286 if err != nil {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700287 ln.Close()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700288 return nil, err
289 }
Cosmos Nicolaouc0e4b792014-09-25 10:57:52 -0700290 if ipaddr == nil {
291 vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
292 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700293
294 s.Lock()
295 if s.stopped {
296 s.Unlock()
297 // Ignore error return since we can't really do much about it.
298 ln.Close()
299 return nil, errServerStopped
300 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700301
302 h, _, _ := net.SplitHostPort(address)
Cosmos Nicolaou767b62d2014-09-19 13:58:40 -0700303 publisher := listenSpec.StreamPublisher
304 if ip := net.ParseIP(h); ip != nil && ip.IsLoopback() && publisher != nil {
305 streamName := listenSpec.StreamName
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700306 ch := make(chan config.Setting)
307 _, err := publisher.ForkStream(streamName, ch)
308 if err != nil {
309 return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
310 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700311 _, port, _ := net.SplitHostPort(ep.Address)
312 dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
313
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700314 // We have a goroutine to listen for dhcp changes.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700315 s.active.Add(1)
316 // goroutine to listen for address changes.
317 go func(dl *dhcpListener) {
318 s.dhcpLoop(dl)
319 s.active.Done()
320 }(dhcpl)
321 s.listeners[ln] = dhcpl
322 } else {
323 s.listeners[ln] = nil
324 }
325
326 // We have a goroutine per listener to accept new flows.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700327 // Each flow is served from its own goroutine.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700328 s.active.Add(1)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700329
330 // goroutine to listen for connections
331 go func(ln stream.Listener, ep naming.Endpoint) {
332 s.listenLoop(ln, ep)
333 s.active.Done()
Cosmos Nicolaou778cb7e2014-09-10 15:07:43 -0700334 }(ln, lep)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700335
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700336 if len(proxyAddress) > 0 {
337 pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
338 if err != nil {
339 vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
340 return nil, err
341 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700342 ipep, ok := pep.(*inaming.Endpoint)
343 if !ok {
344 return nil, fmt.Errorf("failed translating internal endpoint data types")
345 }
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700346 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700347 s.active.Add(1)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700348 go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700349 s.proxyListenLoop(ln, ep, proxy)
350 s.active.Done()
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700351 }(pln, ipep, listenSpec.Proxy)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700352 s.listeners[pln] = nil
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700353 // TODO(cnicolaou,p): AddServer no longer needs to take the
354 // servesMountTable bool since it can be extracted from the endpoint.
355 s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700356 } else {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700357 s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700358 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700359 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700360 return ep, nil
361}
362
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700363func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700364 var name string
365 if !s.servesMountTable {
366 // Make sure that client MountTable code doesn't try and
367 // ResolveStep past this final address.
368 name = "//"
369 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700370 ep.IsMountTable = servesMountTable
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700371 return naming.JoinAddressName(ep.String(), name)
372}
373
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700374func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700375 const (
376 min = 5 * time.Millisecond
377 max = 5 * time.Minute
378 )
379 for {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700380 s.listenLoop(ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700381 // The listener is done, so:
382 // (1) Unpublish its name
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700383 s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700384 // (2) Reconnect to the proxy unless the server has been stopped
385 backoff := min
386 ln = nil
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700387 // TODO(ashankar,cnicolaou): this code is way too confusing and should
388 // be cleaned up.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700389 for ln == nil {
390 select {
391 case <-time.After(backoff):
Asim Shankardee311d2014-08-01 17:41:31 -0700392 resolved, err := s.resolveToAddress(proxy)
393 if err != nil {
394 vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
Cosmos Nicolaou29ee9852014-10-15 11:38:55 -0700395 if backoff = backoff * 2; backoff > max {
396 backoff = max
397 }
Asim Shankardee311d2014-08-01 17:41:31 -0700398 break
399 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700400 var ep naming.Endpoint
Asim Shankardee311d2014-08-01 17:41:31 -0700401 ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700402 if err == nil {
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700403 var ok bool
404 iep, ok = ep.(*inaming.Endpoint)
405 if !ok {
406 vlog.Errorf("failed translating internal endpoint data types")
407 ln = nil
408 continue
409 }
410 vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700411 break
412 }
413 if backoff = backoff * 2; backoff > max {
414 backoff = max
415 }
416 vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
417 case <-s.stoppedChan:
418 return
419 }
420 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700421 // TODO(cnicolaou,ashankar): this won't work when we are both
422 // proxying and publishing locally, which is the common case.
423 // listenLoop, dhcpLoop and the original publish are all publishing
424 // addresses to the same name, but the client is not smart enough
425 // to choose sensibly between them.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700426 // (3) reconnected, publish new address
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700427 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700428 s.Lock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700429 s.listeners[ln] = nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700430 s.Unlock()
431 }
432}
433
434func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
435 defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
436 defer func() {
437 s.Lock()
438 delete(s.listeners, ln)
439 s.Unlock()
440 }()
441 for {
442 flow, err := ln.Accept()
443 if err != nil {
444 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
445 return
446 }
447 s.active.Add(1)
448 go func(flow stream.Flow) {
449 if err := newFlowServer(flow, s).serve(); err != nil {
450 // TODO(caprita): Logging errors here is
451 // too spammy. For example, "not
452 // authorized" errors shouldn't be
453 // logged as server errors.
454 vlog.Errorf("Flow serve on %v failed: %v", ln, err)
455 }
456 s.active.Done()
457 }(flow)
458 }
459}
460
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700461func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
462 dhcpl.Lock()
463 defer dhcpl.Unlock()
464 for _, a := range addrs {
465 if ip := netstate.AsIP(a); ip != nil {
466 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700467 fn(s.publishEP(dhcpl.ep, s.servesMountTable))
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700468 }
469 }
470}
471
472func (s *server) dhcpLoop(dhcpl *dhcpListener) {
473 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
474 vlog.VI(2).Infof("ipc: dhcp loop")
475 for setting := range dhcpl.ch {
476 if setting == nil {
477 return
478 }
479 switch v := setting.Value().(type) {
480 case bool:
481 return
482 case []net.Addr:
483 s.Lock()
484 if s.stopped {
485 s.Unlock()
486 return
487 }
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700488 // TODO(cnicolaou,ashankar): this won't work when we are both
489 // proxying and publishing locally, which is the common case.
490 // listenLoop, dhcpLoop and the original publish are all publishing
491 // addresses to the same name, but the client is not smart enough
492 // to choose sensibly between them.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700493 publisher := s.publisher
494 s.Unlock()
495 switch setting.Name() {
496 case ipc.NewAddrsSetting:
497 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700498 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700499 case ipc.RmAddrsSetting:
500 vlog.Infof("Removed some addresses: %q", v)
501 s.applyChange(dhcpl, v, publisher.RemoveServer)
502 }
503
504 }
505 }
506}
507
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700508func (s *server) Serve(name string, disp ipc.Dispatcher) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700509 defer vlog.LogCall()()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700510 s.Lock()
511 defer s.Unlock()
512 if s.stopped {
513 return errServerStopped
514 }
515 if s.disp != nil && disp != nil && s.disp != disp {
516 return fmt.Errorf("attempt to change dispatcher")
517 }
518 if disp != nil {
519 s.disp = disp
520 }
521 if len(name) > 0 {
522 s.publisher.AddName(name)
523 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700524 return nil
525}
526
527func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700528 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700529 s.Lock()
530 if s.stopped {
531 s.Unlock()
532 return nil
533 }
534 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700535 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700536 s.Unlock()
537
Robin Thellenddf428232014-10-06 12:50:44 -0700538 // Delete the stats object.
539 s.stats.stop()
540
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700541 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
542 // server lock, since publisher is safe for concurrent access.
543
544 // Stop the publisher, which triggers unmounting of published names.
545 s.publisher.Stop()
546 // Wait for the publisher to be done unmounting before we can proceed to
547 // close the listeners (to minimize the number of mounted names pointing
548 // to endpoint that are no longer serving).
549 //
550 // TODO(caprita): See if make sense to fail fast on rejecting
551 // connections once listeners are closed, and parallelize the publisher
552 // and listener shutdown.
553 s.publisher.WaitForStop()
554
555 s.Lock()
556 // Close all listeners. No new flows will be accepted, while in-flight
557 // flows will continue until they terminate naturally.
558 nListeners := len(s.listeners)
559 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700560
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700561 for ln, dhcpl := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700562 go func(ln stream.Listener) {
563 errCh <- ln.Close()
564 }(ln)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700565 if dhcpl != nil {
566 dhcpl.Lock()
567 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
568 dhcpl.ch <- config.NewBool("EOF", "stop", true)
569 dhcpl.Unlock()
570 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700571 }
572 s.Unlock()
573 var firstErr error
574 for i := 0; i < nListeners; i++ {
575 if err := <-errCh; err != nil && firstErr == nil {
576 firstErr = err
577 }
578 }
579 // At this point, we are guaranteed that no new requests are going to be
580 // accepted.
581
582 // Wait for the publisher and active listener + flows to finish.
583 s.active.Wait()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700584 s.Lock()
585 s.disp = nil
586 s.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700587 return firstErr
588}
589
590// flowServer implements the RPC server-side protocol for a single RPC, over a
591// flow that's already connected to the client.
592type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700593 context.T
Robin Thellendd24f0842014-09-23 10:27:29 -0700594 server *server // ipc.Server that this flow server belongs to
595 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
596 dec *vom.Decoder // to decode requests and args from the client
597 enc *vom.Encoder // to encode responses and results to the client
598 flow stream.Flow // underlying flow
599 debugDisp ipc.Dispatcher // internal debug dispatcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700600 // Fields filled in during the server invocation.
601
602 // authorizedRemoteID is the PublicID obtained after authorizing the remoteID
603 // of the underlying flow for the current request context.
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700604 authorizedRemoteID security.PublicID
Asim Shankar8f05c222014-10-06 22:08:19 -0700605 blessings security.Blessings
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700606 method, suffix string
607 label security.Label
Ankurf044a8d2014-09-05 17:05:24 -0700608 discharges map[string]security.Discharge
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700609 deadline time.Time
610 endStreamArgs bool // are the stream args at EOF?
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700611 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700612}
613
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700614var _ ipc.Stream = (*flowServer)(nil)
615
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700616func newFlowServer(flow stream.Flow, server *server) *flowServer {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700617 server.Lock()
618 disp := server.disp
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700619 runtime := veyron2.RuntimeFromContext(server.ctx)
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700620 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700621
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700622 return &flowServer{
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700623 T: InternalNewContext(runtime),
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700624 server: server,
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700625 disp: disp,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700626 // TODO(toddw): Support different codecs
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700627 dec: vom.NewDecoder(flow),
628 enc: vom.NewEncoder(flow),
629 flow: flow,
Robin Thellendd24f0842014-09-23 10:27:29 -0700630 debugDisp: server.debugDisp,
Ankurf044a8d2014-09-05 17:05:24 -0700631 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700632 }
633}
634
635// Vom does not encode untyped nils.
636// Consequently, the ipc system does not allow nil results with an interface
637// type from server methods. The one exception being errors.
638//
639// For now, the following hacky assumptions are made, which will be revisited when
640// a decision is made on how untyped nils should be encoded/decoded in
641// vom/vom2:
642//
643// - Server methods return 0 or more results
644// - Any values returned by the server that have an interface type are either
645// non-nil or of type error.
646func result2vom(res interface{}) vom.Value {
647 v := vom.ValueOf(res)
648 if !v.IsValid() {
649 // Untyped nils are assumed to be nil-errors.
650 var boxed verror.E
651 return vom.ValueOf(&boxed).Elem()
652 }
653 if err, iserr := res.(error); iserr {
654 // Convert errors to verror since errors are often not
655 // serializable via vom/gob (errors.New and fmt.Errorf return a
656 // type with no exported fields).
657 return vom.ValueOf(verror.Convert(err))
658 }
659 return v
660}
661
Asim Shankar8f05c222014-10-06 22:08:19 -0700662func defaultAuthorizer(ctx security.Context) security.Authorizer {
663 var blessings []string
664 if ctx.LocalBlessings() == nil { // TODO(ashankar): This will go away once the old security model is removed
665 blessings = ctx.LocalID().Names()
666 } else {
667 blessings = ctx.LocalBlessings().ForContext(ctx)
Ankur992269a2014-05-13 13:03:24 -0700668 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700669 acl := security.ACL{In: make(map[security.BlessingPattern]security.LabelSet)}
670 for _, b := range blessings {
671 acl.In[security.BlessingPattern(b).MakeGlob()] = security.AllLabels
Ankur992269a2014-05-13 13:03:24 -0700672 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700673 return vsecurity.NewACLAuthorizer(acl)
Ankur992269a2014-05-13 13:03:24 -0700674}
675
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700676func (fs *flowServer) serve() error {
677 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700678
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700679 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700680
681 var traceResponse vtrace.Response
682 if fs.allowDebug {
683 traceResponse = ivtrace.Response(fs)
684 }
685
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700686 // Respond to the client with the response header and positional results.
687 response := ipc.Response{
688 Error: err,
689 EndStreamResults: true,
690 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700691 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700692 }
693 if err := fs.enc.Encode(response); err != nil {
694 return verror.BadProtocolf("ipc: response encoding failed: %v", err)
695 }
696 if response.Error != nil {
697 return response.Error
698 }
699 for ix, res := range results {
700 if err := fs.enc.EncodeValue(result2vom(res)); err != nil {
701 return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
702 }
703 }
704 // TODO(ashankar): Should unread data from the flow be drained?
705 //
706 // Reason to do so:
707 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
708 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
709 // slices will not be returned to the pool leading to possibly increased memory usage.
710 //
711 // Reason to not do so:
712 // Draining here will conflict with any Reads on the flow in a separate goroutine
713 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
714 //
715 // For now, go with the reason to not do so as having unread data in the stream
716 // should be a rare case.
717 return nil
718}
719
Matt Rosencrantz86897932014-10-02 09:34:34 -0700720func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700721 // Set a default timeout before reading from the flow. Without this timeout,
722 // a client that sends no request or a partial request will retain the flow
723 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700724 initTimer := newTimer(defaultCallTimeout)
725 defer initTimer.Stop()
726 fs.flow.SetDeadline(initTimer.C)
727
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700728 // Decode the initial request.
729 var req ipc.Request
730 if err := fs.dec.Decode(&req); err != nil {
731 return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
732 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700733 return &req, nil
734}
735
736func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
737 start := time.Now()
738
739 req, verr := fs.readIPCRequest()
740 if verr != nil {
741 return nil, verr
742 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700743 fs.method = req.Method
Matt Rosencrantz86897932014-10-02 09:34:34 -0700744
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700745 // TODO(mattr): Currently this allows users to trigger trace collection
746 // on the server even if they will not be allowed to collect the
747 // results later. This might be consider a DOS vector.
748 spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
749 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700750
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700751 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700752 if req.Timeout != ipc.NoTimeout {
753 fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700754 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700755 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700756 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700757 fs.flow.SetDeadline(fs.Done())
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700758
Matt Rosencrantz86897932014-10-02 09:34:34 -0700759 // Ensure that the context gets cancelled if the flow is closed
760 // due to a network error, or client cancellation.
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700761 go func() {
Matt Rosencrantzbae08212014-10-03 08:04:17 -0700762 select {
763 case <-fs.flow.Closed():
764 // Here we remove the contexts channel as a deadline to the flow.
765 // We do this to ensure clients get a consistent error when they read/write
766 // after the flow is closed. Since the flow is already closed, it doesn't
767 // matter that the context is also cancelled.
768 fs.flow.SetDeadline(nil)
769 cancel()
770 case <-fs.Done():
771 }
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700772 }()
773
Asim Shankarb54d7642014-06-05 13:08:04 -0700774 // If additional credentials are provided, make them available in the context
Asim Shankar8f05c222014-10-06 22:08:19 -0700775 var err error
776 if fs.blessings, err = security.NewBlessings(req.GrantedBlessings); err != nil {
777 return nil, verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
778 }
779 // Detect unusable blessings now, rather then discovering they are unusable on first use.
780 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides the
781 // server's identity as the blessing. Figure out what we want to do about this -
782 // should servers be able to assume that a blessing is something that does not
783 // have the authorizations that the server's own identity has?
784 if fs.blessings != nil && !reflect.DeepEqual(fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
785 return nil, verror.BadProtocolf("ipc: blessing granted not bound to this server(%v vs %v)", fs.blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Asim Shankarb54d7642014-06-05 13:08:04 -0700786 }
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700787 // Receive third party caveat discharges the client sent
788 for i := uint64(0); i < req.NumDischarges; i++ {
Ankurf044a8d2014-09-05 17:05:24 -0700789 var d security.Discharge
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700790 if err := fs.dec.Decode(&d); err != nil {
791 return nil, verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
792 }
Ankurf044a8d2014-09-05 17:05:24 -0700793 fs.discharges[d.ID()] = d
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700794 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700795 // Lookup the invoker.
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700796 invoker, auth, suffix, verr := fs.lookup(req.Suffix, req.Method)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700797 fs.suffix = suffix // with leading /'s stripped
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700798 if verr != nil {
799 return nil, verr
800 }
801 // Prepare invoker and decode args.
802 numArgs := int(req.NumPosArgs)
803 argptrs, label, err := invoker.Prepare(req.Method, numArgs)
804 fs.label = label
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700805 if err != nil {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700806 return nil, verror.Makef(verror.ErrorID(err), "%s: name: %q", err, req.Suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700807 }
808 if len(argptrs) != numArgs {
Cosmos Nicolaou9370ffa2014-06-02 11:01:42 -0700809 return nil, verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", req.Method, req.Suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700810 }
811 for ix, argptr := range argptrs {
812 if err := fs.dec.Decode(argptr); err != nil {
813 return nil, verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
814 }
815 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700816 if remoteID := fs.flow.RemoteID(); remoteID != nil {
Asim Shankar8f05c222014-10-06 22:08:19 -0700817 // TODO(ashankar): This whole check goes away once the old security model is ripped out.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700818 if fs.authorizedRemoteID, err = remoteID.Authorize(isecurity.NewContext(
819 isecurity.ContextArgs{
820 LocalID: fs.flow.LocalID(),
821 RemoteID: fs.flow.RemoteID(),
822 Method: fs.method,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700823 Suffix: fs.suffix,
824 Discharges: fs.discharges,
825 Label: fs.label})); err != nil {
826 return nil, errNotAuthorized(err)
827 }
828 }
829 // Check application's authorization policy and invoke the method.
830 if err := fs.authorize(auth); err != nil {
831 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error (err)?
Asim Shankar8f05c222014-10-06 22:08:19 -0700832 return nil, errNotAuthorized(fmt.Errorf("%v (PublicID:%v) not authorized for %q.%q: %v", fs.RemoteBlessings(), fs.RemoteID(), fs.Name(), fs.Method(), err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700833 }
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700834 // Check if the caller is permitted to view debug information.
835 fs.allowDebug = fs.authorizeForDebug(auth) == nil
836
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700837 results, err := invoker.Invoke(req.Method, fs, argptrs)
Robin Thellend8eb77522014-08-28 14:12:01 -0700838 fs.server.stats.record(req.Method, time.Since(start))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700839 return results, verror.Convert(err)
840}
841
842// lookup returns the invoker and authorizer responsible for serving the given
Robin Thellendd24f0842014-09-23 10:27:29 -0700843// name and method. The name is stripped of any leading slashes. If it begins
844// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
845// invoker. Otherwise, and we use the server's dispatcher. The (stripped) name
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -0700846// and dispatch suffix are also returned.
847func (fs *flowServer) lookup(name, method string) (ipc.Invoker, security.Authorizer, string, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700848 name = strings.TrimLeft(name, "/")
Robin Thellendc26c32e2014-10-06 17:44:04 -0700849 if method == "Glob" && len(name) == 0 {
850 return ipc.ReflectInvoker(&globInvoker{fs}), &acceptAllAuthorizer{}, name, nil
851 }
Robin Thellendd24f0842014-09-23 10:27:29 -0700852 disp := fs.disp
853 if name == ipc.DebugKeyword || strings.HasPrefix(name, ipc.DebugKeyword+"/") {
854 name = strings.TrimPrefix(name, ipc.DebugKeyword)
855 name = strings.TrimLeft(name, "/")
856 disp = fs.debugDisp
857 }
858 if disp != nil {
859 invoker, auth, err := disp.Lookup(name, method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700860 switch {
861 case err != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700862 return nil, nil, "", verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700863 case invoker != nil:
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700864 return invoker, auth, name, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700865 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700866 }
Robin Thellendc26c32e2014-10-06 17:44:04 -0700867 return nil, nil, "", verror.NoExistf("ipc: invoker not found for %q", name)
868}
869
870type acceptAllAuthorizer struct{}
871
872func (acceptAllAuthorizer) Authorize(security.Context) error {
873 return nil
874}
875
876type globInvoker struct {
877 fs *flowServer
878}
879
880// Glob matches the pattern against internal object names if the double-
881// underscore prefix is explicitly part of the pattern. Otherwise, it invokes
882// the service's Glob method.
883func (i *globInvoker) Glob(call ipc.ServerCall, pattern string) error {
884 g, err := glob.Parse(pattern)
885 if err != nil {
886 return err
887 }
888 if strings.HasPrefix(pattern, "__") {
889 var err error
890 // Match against internal object names.
891 internalLeaves := []string{ipc.DebugKeyword}
892 for _, leaf := range internalLeaves {
893 if ok, _, left := g.MatchInitialSegment(leaf); ok {
894 if ierr := i.invokeGlob(call, i.fs.debugDisp, leaf, left.String()); ierr != nil {
895 err = ierr
896 }
897 }
898 }
899 return err
900 }
901 // Invoke the service's method.
902 return i.invokeGlob(call, i.fs.disp, "", pattern)
903}
904
905func (i *globInvoker) invokeGlob(call ipc.ServerCall, d ipc.Dispatcher, prefix, pattern string) error {
906 if d == nil {
907 return nil
908 }
909 invoker, auth, err := d.Lookup("", "Glob")
910 if err != nil {
911 return err
912 }
913 if invoker == nil {
914 return verror.NoExistf("ipc: invoker not found for Glob")
915 }
916
917 argptrs, label, err := invoker.Prepare("Glob", 1)
918 i.fs.label = label
919 if err != nil {
920 return verror.Makef(verror.ErrorID(err), "%s", err)
921 }
922 if err := i.fs.authorize(auth); err != nil {
923 return errNotAuthorized(fmt.Errorf("%q not authorized for method %q: %v", i.fs.RemoteID(), i.fs.Method(), err))
924 }
925 leafCall := &localServerCall{call, prefix}
926 argptrs[0] = &pattern
927 results, err := invoker.Invoke("Glob", leafCall, argptrs)
928 if err != nil {
929 return err
930 }
931 if len(results) != 1 {
932 return verror.BadArgf("unexpected number of results. Got %d, want 1", len(results))
933 }
934 res := results[0]
935 if res == nil {
936 return nil
937 }
938 err, ok := res.(error)
939 if !ok {
940 return verror.BadArgf("unexpected result type. Got %T, want error", res)
941 }
942 return err
943}
944
945// An ipc.ServerCall that prepends a prefix to all the names in the streamed
946// MountEntry objects.
947type localServerCall struct {
948 ipc.ServerCall
949 prefix string
950}
951
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700952var _ ipc.ServerCall = (*localServerCall)(nil)
953var _ ipc.Stream = (*localServerCall)(nil)
954var _ ipc.ServerContext = (*localServerCall)(nil)
955
Robin Thellendc26c32e2014-10-06 17:44:04 -0700956func (c *localServerCall) Send(v interface{}) error {
957 me, ok := v.(mttypes.MountEntry)
958 if !ok {
959 return verror.BadArgf("unexpected stream type. Got %T, want MountEntry", v)
960 }
961 me.Name = naming.Join(c.prefix, me.Name)
962 return c.ServerCall.Send(me)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700963}
964
965func (fs *flowServer) authorize(auth security.Authorizer) error {
Asim Shankar8f05c222014-10-06 22:08:19 -0700966 if auth == nil {
967 auth = defaultAuthorizer(fs)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700968 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700969 return auth.Authorize(fs)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700970}
971
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700972// debugContext is a context which wraps another context but always returns
973// the debug label.
974type debugContext struct {
975 security.Context
976}
977
978func (debugContext) Label() security.Label { return security.DebugLabel }
979
980// TODO(mattr): Is DebugLabel the right thing to check?
981func (fs *flowServer) authorizeForDebug(auth security.Authorizer) error {
982 dc := debugContext{fs}
Asim Shankar8f05c222014-10-06 22:08:19 -0700983 if auth == nil {
984 auth = defaultAuthorizer(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700985 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700986 return auth.Authorize(dc)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700987}
988
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700989// Send implements the ipc.Stream method.
990func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700991 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700992 // The empty response header indicates what follows is a streaming result.
993 if err := fs.enc.Encode(ipc.Response{}); err != nil {
994 return err
995 }
996 return fs.enc.Encode(item)
997}
998
999// Recv implements the ipc.Stream method.
1000func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001001 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001002 var req ipc.Request
1003 if err := fs.dec.Decode(&req); err != nil {
1004 return err
1005 }
1006 if req.EndStreamArgs {
1007 fs.endStreamArgs = true
1008 return io.EOF
1009 }
1010 return fs.dec.Decode(itemptr)
1011}
1012
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001013// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001014
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001015func (fs *flowServer) Discharges() map[string]security.Discharge {
1016 //nologcall
1017 return fs.discharges
1018}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001019
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001020func (fs *flowServer) Server() ipc.Server {
1021 //nologcall
1022 return fs.server
1023}
1024func (fs *flowServer) Method() string {
1025 //nologcall
1026 return fs.method
1027}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001028
1029// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1030// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001031func (fs *flowServer) Name() string {
1032 //nologcall
1033 return fs.suffix
1034}
1035func (fs *flowServer) Suffix() string {
1036 //nologcall
1037 return fs.suffix
1038}
1039func (fs *flowServer) Label() security.Label {
1040 //nologcall
1041 return fs.label
1042}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001043
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001044func (fs *flowServer) LocalID() security.PublicID {
1045 //nologcall
1046 return fs.flow.LocalID()
1047}
1048func (fs *flowServer) RemoteID() security.PublicID {
1049 //nologcall
1050 return fs.authorizedRemoteID
1051}
1052func (fs *flowServer) LocalPrincipal() security.Principal {
1053 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001054 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001055}
1056func (fs *flowServer) LocalBlessings() security.Blessings {
1057 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001058 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001059}
1060func (fs *flowServer) RemoteBlessings() security.Blessings {
1061 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001062 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001063}
Asim Shankar8f05c222014-10-06 22:08:19 -07001064func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001065 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001066 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001067}
1068func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1069 //nologcall
1070 return fs.flow.LocalEndpoint()
1071}
1072func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1073 //nologcall
1074 return fs.flow.RemoteEndpoint()
1075}