blob: 00e06889c93a036088ec037225383c1f124133a1 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
4 "fmt"
5 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07006 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07007 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07008 "strings"
9 "sync"
10 "time"
11
Jiri Simsa519c5072014-09-17 21:37:57 -070012 "veyron.io/veyron/veyron2/config"
13 "veyron.io/veyron/veyron2/context"
14 "veyron.io/veyron/veyron2/ipc"
15 "veyron.io/veyron/veyron2/ipc/stream"
16 "veyron.io/veyron/veyron2/naming"
Asim Shankarcc044212014-10-15 23:25:26 -070017 "veyron.io/veyron/veyron2/options"
Jiri Simsa519c5072014-09-17 21:37:57 -070018 "veyron.io/veyron/veyron2/security"
19 "veyron.io/veyron/veyron2/verror"
20 "veyron.io/veyron/veyron2/vlog"
21 "veyron.io/veyron/veyron2/vom"
22 "veyron.io/veyron/veyron2/vtrace"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070023
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070024 "veyron.io/veyron/veyron/lib/netstate"
Bogdan Capritae7376312014-11-10 13:13:17 -080025 "veyron.io/veyron/veyron/lib/stats"
26 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070027 "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
28 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070029 ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070030)
31
32var (
Jiri Simsa5293dcb2014-05-10 09:56:38 -070033 errServerStopped = verror.Abortedf("ipc: server is stopped")
34)
35
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036type server struct {
37 sync.Mutex
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070038 ctx context.T // context used by the server to make internal RPCs.
39 streamMgr stream.Manager // stream manager to listen for new flows.
40 publisher publisher.Publisher // publisher to publish mounttable mounts.
41 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
42 listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
43 disp ipc.Dispatcher // dispatcher to serve RPCs
Todd Wang5739dda2014-11-16 22:44:02 -080044 dispReserved ipc.Dispatcher // dispatcher for reserved methods
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070045 active sync.WaitGroup // active goroutines we've spawned.
46 stopped bool // whether the server has been stopped.
47 stoppedChan chan struct{} // closed when the server has been stopped.
Cosmos Nicolaou4e029972014-06-13 14:53:08 -070048 ns naming.Namespace
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -070049 servesMountTable bool
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080050 // TODO(cnicolaou): remove this when the publisher tracks published names
51 // and can return an appropriate error for RemoveName on a name that
52 // wasn't 'Added' for this server.
Todd Wang5739dda2014-11-16 22:44:02 -080053 names map[string]struct{}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070054 // TODO(cnicolaou): add roaming stats to ipcStats
Matt Rosencrantz3e76f282014-11-10 09:38:57 -080055 stats *ipcStats // stats for this server.
56 traceStore *ivtrace.Store // store for vtrace traces.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070057}
58
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070059var _ ipc.Server = (*server)(nil)
60
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070061type dhcpListener struct {
62 sync.Mutex
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080063 publisher *config.Publisher // publisher used to fork the stream
64 name string // name of the publisher stream
65 ep *inaming.Endpoint // endpoint returned after listening
66 pubAddrs []ipc.Address // addresses to publish
67 pubPort string // port to use with the publish addresses
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070068 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070069}
70
Matt Rosencrantz3e76f282014-11-10 09:38:57 -080071func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, store *ivtrace.Store, opts ...ipc.ServerOpt) (ipc.Server, error) {
72 ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
Bogdan Capritae7376312014-11-10 13:13:17 -080073 statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070074 s := &server{
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070075 ctx: ctx,
76 streamMgr: streamMgr,
77 publisher: publisher.New(ctx, ns, publishPeriod),
78 listeners: make(map[stream.Listener]*dhcpListener),
79 stoppedChan: make(chan struct{}),
80 ns: ns,
Bogdan Capritae7376312014-11-10 13:13:17 -080081 stats: newIPCStats(statsPrefix),
Matt Rosencrantz3e76f282014-11-10 09:38:57 -080082 traceStore: store,
Jiri Simsa5293dcb2014-05-10 09:56:38 -070083 }
Bogdan Capritae7376312014-11-10 13:13:17 -080084 var (
85 principal security.Principal
86 blessings security.Blessings
87 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -070088 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -070089 switch opt := opt.(type) {
90 case stream.ListenerOpt:
91 // Collect all ServerOpts that are also ListenerOpts.
92 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -080093 switch opt := opt.(type) {
94 case vc.LocalPrincipal:
95 principal = opt.Principal
96 case options.ServerBlessings:
97 blessings = opt.Blessings
98 }
Asim Shankarcc044212014-10-15 23:25:26 -070099 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700100 s.servesMountTable = bool(opt)
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700101 case options.ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800102 s.dispReserved = opt.Dispatcher
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103 }
104 }
Bogdan Capritae7376312014-11-10 13:13:17 -0800105 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
106 if blessings != nil {
107 // TODO(caprita): revist printing the blessings with %s, and
108 // instead expose them as a list.
109 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", blessings))
110 } else if principal != nil { // principal should have been passed in, but just in case.
111 stats.NewStringFunc(blessingsStatsName, func() string {
112 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
113 })
114 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 return s, nil
116}
117
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700118func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700119 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700120 s.Lock()
121 defer s.Unlock()
122 if s.stopped {
123 return nil, errServerStopped
124 }
125 return s.publisher.Published(), nil
126}
127
128// resolveToAddress will try to resolve the input to an address using the
129// mount table, if the input is not already an address.
Asim Shankardee311d2014-08-01 17:41:31 -0700130func (s *server) resolveToAddress(address string) (string, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700131 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700132 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700133 }
Asim Shankardee311d2014-08-01 17:41:31 -0700134 var names []string
135 if s.ns != nil {
136 var err error
137 if names, err = s.ns.Resolve(s.ctx, address); err != nil {
138 return "", err
139 }
140 } else {
141 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700142 }
143 for _, n := range names {
144 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800145 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700146 continue
147 }
148 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700149 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700150 }
151 }
Asim Shankardee311d2014-08-01 17:41:31 -0700152 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700153}
154
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800155func addrFromIP(ip net.IP) ipc.Address {
156 return &netstate.AddrIfc{
157 Addr: &net.IPAddr{IP: ip},
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800158 }
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800159}
160
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800161// getIPRoamingAddrs finds an appropriate set of addresss to publish
162// externally and also determines if it's sensible to allow roaming.
163// It returns the host address of the first suitable address that
164// can be used and the port number that can be used with all addresses.
165// The host is required to allow the caller to construct an endpoint
166// that can be returned to the caller of Listen.
167func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
168 host, port, err = net.SplitHostPort(iep.Address)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800169 if err != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800170 return nil, "", "", false, err
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800171 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800172 ip := net.ParseIP(host)
173 if ip == nil {
174 return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
175 }
176 if ip.IsUnspecified() && chooser != nil {
177 // Need to find a usable IP address since the call to listen
178 // didn't specify one.
179 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
180 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
181 phost := a[0].Address().String()
182 iep.Address = net.JoinHostPort(phost, port)
183 return a, phost, port, true, nil
184 }
185 }
186 return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
187 }
188 // Listen used a fixed IP address, which we take to mean that
189 // roaming is not desired.
190 return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
191}
192
193// configureEPAndRoaming configures the endpoint and roaming. In particular,
194// it fills in the Address portion of the endpoint with the appropriately
195// selected network address and creates a dhcpListener struct if roaming
196// is enabled.
197func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800198 iep, ok := ep.(*inaming.Endpoint)
199 if !ok {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800200 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800201 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800202 if !strings.HasPrefix(spec.Protocol, "tcp") {
203 return nil, iep, nil
204 }
205 pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
206 if err != nil {
207 return nil, iep, err
208 }
209 iep.Address = net.JoinHostPort(pubHost, pubPort)
210 if !roaming {
211 vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
212 }
213 publisher := spec.StreamPublisher
214 if roaming && publisher != nil {
215 streamName := spec.StreamName
216 ch := make(chan config.Setting)
217 if _, err := publisher.ForkStream(streamName, ch); err != nil {
218 return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800219 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800220 return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800221 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800222 return nil, iep, nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700223}
224
Cosmos Nicolaouf8d4c2b2014-10-23 22:36:38 -0700225func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700226 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700227 s.Lock()
228 // Shortcut if the server is stopped, to avoid needlessly creating a
229 // listener.
230 if s.stopped {
231 s.Unlock()
232 return nil, errServerStopped
233 }
234 s.Unlock()
235
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800236 var iep *inaming.Endpoint
237 var dhcpl *dhcpListener
238 var ln stream.Listener
239
240 if len(listenSpec.Address) > 0 {
241 // Listen if we have a local address to listen on. Some situations
242 // just need a proxy (e.g. a browser extension).
243 tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
244 if err != nil {
245 vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
246 return nil, err
247 }
248 ln = tmpln
249 if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
250 ln.Close()
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700251 return nil, err
252 } else {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800253 dhcpl = tmpdhcpl
254 iep = tmpiep
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700255 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700256 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700257
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700258 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800259 defer s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700260 if s.stopped {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700261 ln.Close()
262 return nil, errServerStopped
263 }
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700264
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800265 if dhcpl != nil {
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -0700266 // We have a goroutine to listen for dhcp changes.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800267 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800268 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800269 s.dhcpLoop(dhcpl)
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700270 s.active.Done()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800271 }()
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700272 s.listeners[ln] = dhcpl
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800273 } else if ln != nil {
Cosmos Nicolaoubf350a62014-09-12 08:16:24 -0700274 s.listeners[ln] = nil
275 }
276
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800277 if iep != nil {
278 // We have a goroutine per listener to accept new flows.
279 // Each flow is served from its own goroutine.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800280 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800281 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800282 s.listenLoop(ln, iep)
Cosmos Nicolaou5ce6ede2014-11-06 11:58:50 -0800283 s.active.Done()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800284 }()
285 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800286 if strings.HasPrefix(iep.Protocol, "tcp") {
287 epCopy := *iep
288 epCopy.Protocol = "ws"
289 s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
290 }
Cosmos Nicolaou5ce6ede2014-11-06 11:58:50 -0800291 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800292
293 if len(listenSpec.Proxy) > 0 {
294 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800295 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800296 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800297 s.proxyListenLoop(listenSpec.Proxy)
298 s.active.Done()
299 }()
300 }
301 return iep, nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700302}
303
David Why Use Two When One Will Do Presotto8b4dbbf2014-11-06 10:50:14 -0800304// TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700305func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700306 var name string
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700307 ep.IsMountTable = servesMountTable
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700308 return naming.JoinAddressName(ep.String(), name)
309}
310
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800311func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
312 resolved, err := s.resolveToAddress(proxy)
313 if err != nil {
314 return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
315 }
316 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
317 if err != nil {
318 return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
319 }
320 iep, ok := ep.(*inaming.Endpoint)
321 if !ok {
322 ln.Close()
323 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
324 }
325 s.Lock()
326 s.listeners[ln] = nil
327 s.Unlock()
328 s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800329
330 if strings.HasPrefix(iep.Protocol, "tcp") {
331 epCopy := *iep
332 epCopy.Protocol = "ws"
333 s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
334 }
335
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800336 return iep, ln, nil
337}
338
339func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700340 const (
341 min = 5 * time.Millisecond
342 max = 5 * time.Minute
343 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800344
345 iep, ln, err := s.reconnectAndPublishProxy(proxy)
346 if err != nil {
347 vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
348 }
349 // the initial connection maybe have failed, but we enter the retry
350 // loop anyway so that we will continue to try and connect to the
351 // proxy.
352
353 s.Lock()
354 if s.stopped {
355 s.Unlock()
356 return
357 }
358 s.Unlock()
359
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700360 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800361 if ln != nil && iep != nil {
362 s.listenLoop(ln, iep)
363 // The listener is done, so:
364 // (1) Unpublish its name
365 s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800366 if strings.HasPrefix(iep.Protocol, "tcp") {
367 iepCopy := *iep
368 iepCopy.Protocol = "ws"
369 s.publisher.RemoveServer(s.publishEP(&iepCopy, s.servesMountTable))
370 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800371 }
372
373 s.Lock()
374 if s.stopped {
375 s.Unlock()
376 return
377 }
378 s.Unlock()
379
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700380 // (2) Reconnect to the proxy unless the server has been stopped
381 backoff := min
382 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800383 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700384 select {
385 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700386 if backoff = backoff * 2; backoff > max {
387 backoff = max
388 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700389 case <-s.stoppedChan:
390 return
391 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800392 // (3) reconnect, publish new address
393 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
394 vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
395 } else {
396 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
397 break
398 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700399 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700400 }
401}
402
403func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
404 defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800405 var calls sync.WaitGroup
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700406 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800407 calls.Wait()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700408 s.Lock()
409 delete(s.listeners, ln)
410 s.Unlock()
411 }()
412 for {
413 flow, err := ln.Accept()
414 if err != nil {
415 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
416 return
417 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800418 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700419 go func(flow stream.Flow) {
420 if err := newFlowServer(flow, s).serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800421 // TODO(caprita): Logging errors here is too spammy. For example, "not
422 // authorized" errors shouldn't be logged as server errors.
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700423 vlog.Errorf("Flow serve on %v failed: %v", ln, err)
424 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800425 calls.Done()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700426 }(flow)
427 }
428}
429
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700430func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
431 dhcpl.Lock()
432 defer dhcpl.Unlock()
433 for _, a := range addrs {
434 if ip := netstate.AsIP(a); ip != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800435 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
Cosmos Nicolaouf4107592014-10-09 17:17:11 -0700436 fn(s.publishEP(dhcpl.ep, s.servesMountTable))
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700437 }
438 }
439}
440
441func (s *server) dhcpLoop(dhcpl *dhcpListener) {
442 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
443 vlog.VI(2).Infof("ipc: dhcp loop")
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800444
445 ep := *dhcpl.ep
446 // Publish all of the addresses
447 for _, pubAddr := range dhcpl.pubAddrs {
448 ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
449 s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
450 }
451
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700452 for setting := range dhcpl.ch {
453 if setting == nil {
454 return
455 }
456 switch v := setting.Value().(type) {
457 case bool:
458 return
459 case []net.Addr:
460 s.Lock()
461 if s.stopped {
462 s.Unlock()
463 return
464 }
465 publisher := s.publisher
466 s.Unlock()
467 switch setting.Name() {
468 case ipc.NewAddrsSetting:
469 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700470 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700471 case ipc.RmAddrsSetting:
472 vlog.Infof("Removed some addresses: %q", v)
473 s.applyChange(dhcpl, v, publisher.RemoveServer)
474 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700475 }
476 }
477}
478
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800479func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800480 if obj == nil {
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800481 // The ReflectInvoker inside the LeafDispatcher will panic
482 // if called for a nil value.
483 return fmt.Errorf("A nil object is not allowed")
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800484 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800485 return s.ServeDispatcher(name, ipc.LeafDispatcher(obj, authorizer))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800486}
487
488func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700489 s.Lock()
490 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800491 ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
492
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700493 if s.stopped {
494 return errServerStopped
495 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800496 if disp == nil {
497 return fmt.Errorf("A nil dispacther is not allowed")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700498 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800499 if s.disp != nil {
500 return fmt.Errorf("Serve or ServeDispatcher has already been called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700501 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800502 s.disp = disp
503 s.names = make(map[string]struct{})
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700504 if len(name) > 0 {
505 s.publisher.AddName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800506 s.names[name] = struct{}{}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700507 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700508 return nil
509}
510
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800511func (s *server) AddName(name string) error {
512 s.Lock()
513 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800514 ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800515 if len(name) == 0 {
516 return fmt.Errorf("empty name")
517 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800518 if s.stopped {
519 return errServerStopped
520 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800521 if s.disp == nil {
522 return fmt.Errorf("Adding name before calling Serve or ServeDispatcher is not allowed")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800523 }
524 s.publisher.AddName(name)
525 // TODO(cnicolaou): remove this map when the publisher's RemoveName
526 // method returns an error.
527 s.names[name] = struct{}{}
528 return nil
529}
530
531func (s *server) RemoveName(name string) error {
532 s.Lock()
533 defer s.Unlock()
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800534 ivtrace.FromContext(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800535 if s.stopped {
536 return errServerStopped
537 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800538 if s.disp == nil {
539 return fmt.Errorf("Removing name before calling Serve or ServeDispatcher is not allowed")
540 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800541 if _, present := s.names[name]; !present {
542 return fmt.Errorf("%q has not been previously used for this server", name)
543 }
544 s.publisher.RemoveName(name)
545 delete(s.names, name)
546 return nil
547}
548
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700549func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700550 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700551 s.Lock()
552 if s.stopped {
553 s.Unlock()
554 return nil
555 }
556 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700557 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700558 s.Unlock()
559
Robin Thellenddf428232014-10-06 12:50:44 -0700560 // Delete the stats object.
561 s.stats.stop()
562
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700563 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
564 // server lock, since publisher is safe for concurrent access.
565
566 // Stop the publisher, which triggers unmounting of published names.
567 s.publisher.Stop()
568 // Wait for the publisher to be done unmounting before we can proceed to
569 // close the listeners (to minimize the number of mounted names pointing
570 // to endpoint that are no longer serving).
571 //
572 // TODO(caprita): See if make sense to fail fast on rejecting
573 // connections once listeners are closed, and parallelize the publisher
574 // and listener shutdown.
575 s.publisher.WaitForStop()
576
577 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800578
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700579 // Close all listeners. No new flows will be accepted, while in-flight
580 // flows will continue until they terminate naturally.
581 nListeners := len(s.listeners)
582 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700583
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700584 for ln, dhcpl := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700585 go func(ln stream.Listener) {
586 errCh <- ln.Close()
587 }(ln)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700588 if dhcpl != nil {
589 dhcpl.Lock()
590 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
591 dhcpl.ch <- config.NewBool("EOF", "stop", true)
592 dhcpl.Unlock()
593 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700594 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800595
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700596 s.Unlock()
597 var firstErr error
598 for i := 0; i < nListeners; i++ {
599 if err := <-errCh; err != nil && firstErr == nil {
600 firstErr = err
601 }
602 }
603 // At this point, we are guaranteed that no new requests are going to be
604 // accepted.
605
606 // Wait for the publisher and active listener + flows to finish.
607 s.active.Wait()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800608
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700609 s.Lock()
610 s.disp = nil
611 s.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700612 return firstErr
613}
614
615// flowServer implements the RPC server-side protocol for a single RPC, over a
616// flow that's already connected to the client.
617type flowServer struct {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700618 context.T
Todd Wang5739dda2014-11-16 22:44:02 -0800619 server *server // ipc.Server that this flow server belongs to
620 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
621 dec *vom.Decoder // to decode requests and args from the client
622 enc *vom.Encoder // to encode responses and results to the client
623 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700624
Asim Shankar220a0152014-10-30 21:21:09 -0700625 // Fields filled in during the server invocation.
626 blessings security.Blessings
627 method, suffix string
Asim Shankar0cad0832014-11-04 01:27:38 -0800628 tags []interface{}
Asim Shankar220a0152014-10-30 21:21:09 -0700629 discharges map[string]security.Discharge
Asim Shankar0cad0832014-11-04 01:27:38 -0800630 starttime time.Time
Asim Shankar220a0152014-10-30 21:21:09 -0700631 endStreamArgs bool // are the stream args at EOF?
632 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700633}
634
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700635var _ ipc.Stream = (*flowServer)(nil)
636
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700637func newFlowServer(flow stream.Flow, server *server) *flowServer {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700638 server.Lock()
639 disp := server.disp
640 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700641
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700642 return &flowServer{
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800643 T: server.ctx,
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700644 server: server,
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700645 disp: disp,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700646 // TODO(toddw): Support different codecs
Todd Wang5739dda2014-11-16 22:44:02 -0800647 dec: vom.NewDecoder(flow),
648 enc: vom.NewEncoder(flow),
649 flow: flow,
650 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700651 }
652}
653
654// Vom does not encode untyped nils.
655// Consequently, the ipc system does not allow nil results with an interface
656// type from server methods. The one exception being errors.
657//
658// For now, the following hacky assumptions are made, which will be revisited when
659// a decision is made on how untyped nils should be encoded/decoded in
660// vom/vom2:
661//
662// - Server methods return 0 or more results
663// - Any values returned by the server that have an interface type are either
664// non-nil or of type error.
665func result2vom(res interface{}) vom.Value {
666 v := vom.ValueOf(res)
667 if !v.IsValid() {
668 // Untyped nils are assumed to be nil-errors.
669 var boxed verror.E
670 return vom.ValueOf(&boxed).Elem()
671 }
672 if err, iserr := res.(error); iserr {
673 // Convert errors to verror since errors are often not
674 // serializable via vom/gob (errors.New and fmt.Errorf return a
675 // type with no exported fields).
676 return vom.ValueOf(verror.Convert(err))
677 }
678 return v
679}
680
681func (fs *flowServer) serve() error {
682 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700683
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700684 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700685
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700686 ivtrace.FromContext(fs).Finish()
687
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700688 var traceResponse vtrace.Response
689 if fs.allowDebug {
690 traceResponse = ivtrace.Response(fs)
691 }
692
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700693 // Respond to the client with the response header and positional results.
694 response := ipc.Response{
695 Error: err,
696 EndStreamResults: true,
697 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700698 TraceResponse: traceResponse,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700699 }
700 if err := fs.enc.Encode(response); err != nil {
701 return verror.BadProtocolf("ipc: response encoding failed: %v", err)
702 }
703 if response.Error != nil {
704 return response.Error
705 }
706 for ix, res := range results {
707 if err := fs.enc.EncodeValue(result2vom(res)); err != nil {
708 return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
709 }
710 }
711 // TODO(ashankar): Should unread data from the flow be drained?
712 //
713 // Reason to do so:
714 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
715 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
716 // slices will not be returned to the pool leading to possibly increased memory usage.
717 //
718 // Reason to not do so:
719 // Draining here will conflict with any Reads on the flow in a separate goroutine
720 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
721 //
722 // For now, go with the reason to not do so as having unread data in the stream
723 // should be a rare case.
724 return nil
725}
726
Matt Rosencrantz86897932014-10-02 09:34:34 -0700727func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700728 // Set a default timeout before reading from the flow. Without this timeout,
729 // a client that sends no request or a partial request will retain the flow
730 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700731 initTimer := newTimer(defaultCallTimeout)
732 defer initTimer.Stop()
733 fs.flow.SetDeadline(initTimer.C)
734
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700735 // Decode the initial request.
736 var req ipc.Request
737 if err := fs.dec.Decode(&req); err != nil {
738 return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
739 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700740 return &req, nil
741}
742
743func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
Asim Shankar0cad0832014-11-04 01:27:38 -0800744 fs.starttime = time.Now()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700745 req, verr := fs.readIPCRequest()
746 if verr != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700747 // We don't know what the ipc call was supposed to be, but we'll create
748 // a placeholder span so we can capture annotations.
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800749 fs.T, _ = ivtrace.WithNewSpan(fs, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
Matt Rosencrantz86897932014-10-02 09:34:34 -0700750 return nil, verr
751 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700752 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -0800753 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -0700754
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700755 // TODO(mattr): Currently this allows users to trigger trace collection
756 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800757 // results later. This might be considered a DOS vector.
758 spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800759 fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest, fs.server.traceStore)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700760
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700761 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700762 if req.Timeout != ipc.NoTimeout {
Asim Shankar0cad0832014-11-04 01:27:38 -0800763 fs.T, cancel = fs.WithDeadline(fs.starttime.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700764 } else {
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700765 fs.T, cancel = fs.WithCancel()
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700766 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700767 fs.flow.SetDeadline(fs.Done())
Todd Wang5739dda2014-11-16 22:44:02 -0800768 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700769
Todd Wang5739dda2014-11-16 22:44:02 -0800770 // Initialize security: blessings, discharges, etc.
771 if verr := fs.initSecurity(req); verr != nil {
772 return nil, verr
Andres Erbsenb7f95f32014-07-07 12:07:56 -0700773 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700774 // Lookup the invoker.
Todd Wang5739dda2014-11-16 22:44:02 -0800775 invoker, auth, verr := fs.lookup(fs.suffix, &fs.method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700776 if verr != nil {
777 return nil, verr
778 }
779 // Prepare invoker and decode args.
780 numArgs := int(req.NumPosArgs)
Robin Thellendb16d7162014-11-07 13:47:26 -0800781 argptrs, tags, err := invoker.Prepare(fs.method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -0800782 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700783 if err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800784 return nil, verror.Makef(verror.ErrorID(err), "%s: name: %q", err, fs.suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700785 }
786 if len(argptrs) != numArgs {
Todd Wang5739dda2014-11-16 22:44:02 -0800787 return nil, verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", fs.method, fs.suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700788 }
789 for ix, argptr := range argptrs {
790 if err := fs.dec.Decode(argptr); err != nil {
791 return nil, verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
792 }
793 }
Todd Wang5739dda2014-11-16 22:44:02 -0800794 // Check application's authorization policy.
795 if verr := authorize(fs, auth); verr != nil {
796 return nil, verr
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700797 }
Todd Wang5739dda2014-11-16 22:44:02 -0800798 // Check if the caller is permitted to view debug information.
799 // TODO(mattr): Is DebugLabel the right thing to check?
800 fs.allowDebug = authorize(debugContext{fs}, auth) == nil
801 // Invoke the method.
Robin Thellendb16d7162014-11-07 13:47:26 -0800802 results, err := invoker.Invoke(fs.method, fs, argptrs)
803 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700804 return results, verror.Convert(err)
805}
806
Todd Wang5739dda2014-11-16 22:44:02 -0800807func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
808 // Ensure that the context gets cancelled if the flow is closed
809 // due to a network error, or client cancellation.
810 select {
811 case <-fs.flow.Closed():
812 // Here we remove the contexts channel as a deadline to the flow.
813 // We do this to ensure clients get a consistent error when they read/write
814 // after the flow is closed. Since the flow is already closed, it doesn't
815 // matter that the context is also cancelled.
816 fs.flow.SetDeadline(nil)
817 cancel()
818 case <-fs.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -0700819 }
Todd Wang5739dda2014-11-16 22:44:02 -0800820}
821
822// lookup returns the invoker and authorizer responsible for serving the given
823// name and method. The suffix is stripped of any leading slashes. If it begins
824// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
825// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
826// value may be modified to match the actual suffix and method to use.
827func (fs *flowServer) lookup(suffix string, method *string) (ipc.Invoker, security.Authorizer, verror.E) {
828 if naming.IsReserved(*method) {
829 // All reserved methods are trapped and handled here, by removing the
830 // reserved prefix and invoking them on reservedMethods. E.g. "__Glob"
831 // invokes reservedMethods.Glob.
832 *method = naming.StripReserved(*method)
833 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
834 }
835 disp := fs.disp
836 if naming.IsReserved(suffix) {
837 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -0700838 }
839 if disp != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800840 obj, auth, err := disp.Lookup(suffix, *method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700841 switch {
842 case err != nil:
Robin Thellendb16d7162014-11-07 13:47:26 -0800843 return nil, nil, verror.Convert(err)
Todd Wang5739dda2014-11-16 22:44:02 -0800844 case obj != nil:
845 return objectToInvoker(obj), auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700846 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700847 }
Todd Wang5739dda2014-11-16 22:44:02 -0800848 return nil, nil, verror.NoExistf("ipc: invoker not found for %q", suffix)
849}
850
851func objectToInvoker(obj interface{}) ipc.Invoker {
852 if obj == nil {
853 return nil
854 }
855 if invoker, ok := obj.(ipc.Invoker); ok {
856 return invoker
857 }
858 return ipc.ReflectInvoker(obj)
859}
860
861func (fs *flowServer) initSecurity(req *ipc.Request) verror.E {
862 // If additional credentials are provided, make them available in the context
863 blessings, err := security.NewBlessings(req.GrantedBlessings)
864 if err != nil {
865 return verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
866 }
867 fs.blessings = blessings
868 // Detect unusable blessings now, rather then discovering they are unusable on
869 // first use.
870 //
871 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
872 // the server's identity as the blessing. Figure out what we want to do about
873 // this - should servers be able to assume that a blessing is something that
874 // does not have the authorizations that the server's own identity has?
875 if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
876 return verror.BadProtocolf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
877 }
878 // Receive third party caveat discharges the client sent
879 for i := uint64(0); i < req.NumDischarges; i++ {
880 var d security.Discharge
881 if err := fs.dec.Decode(&d); err != nil {
882 return verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
883 }
884 fs.discharges[d.ID()] = d
885 }
886 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -0700887}
888
889type acceptAllAuthorizer struct{}
890
891func (acceptAllAuthorizer) Authorize(security.Context) error {
892 return nil
893}
894
Todd Wang5739dda2014-11-16 22:44:02 -0800895func authorize(ctx security.Context, auth security.Authorizer) verror.E {
896 if ctx.LocalPrincipal() == nil {
897 // LocalPrincipal is nil means that the server wanted to avoid
898 // authentication, and thus wanted to skip authorization as well.
899 return nil
900 }
Asim Shankar8f05c222014-10-06 22:08:19 -0700901 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -0700902 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700903 }
Todd Wang5739dda2014-11-16 22:44:02 -0800904 if err := auth.Authorize(ctx); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -0700905 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Todd Wang5739dda2014-11-16 22:44:02 -0800906 return verror.NoAccessf("ipc: not authorized to call %q.%q (%v)", ctx.Suffix(), ctx.Method(), err)
Asim Shankara5457f02014-10-24 23:23:07 -0700907 }
908 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700909}
910
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700911// debugContext is a context which wraps another context but always returns
912// the debug label.
913type debugContext struct {
914 security.Context
915}
916
917func (debugContext) Label() security.Label { return security.DebugLabel }
918
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700919// Send implements the ipc.Stream method.
920func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700921 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700922 // The empty response header indicates what follows is a streaming result.
923 if err := fs.enc.Encode(ipc.Response{}); err != nil {
924 return err
925 }
926 return fs.enc.Encode(item)
927}
928
929// Recv implements the ipc.Stream method.
930func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700931 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700932 var req ipc.Request
933 if err := fs.dec.Decode(&req); err != nil {
934 return err
935 }
936 if req.EndStreamArgs {
937 fs.endStreamArgs = true
938 return io.EOF
939 }
940 return fs.dec.Decode(itemptr)
941}
942
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700943// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700944
Asim Shankar2519cc12014-11-10 21:16:53 -0800945func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700946 //nologcall
947 return fs.discharges
948}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700949func (fs *flowServer) Server() ipc.Server {
950 //nologcall
951 return fs.server
952}
Asim Shankar0cad0832014-11-04 01:27:38 -0800953func (fs *flowServer) Timestamp() time.Time {
954 //nologcall
955 return fs.starttime
956}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700957func (fs *flowServer) Method() string {
958 //nologcall
959 return fs.method
960}
Asim Shankar0cad0832014-11-04 01:27:38 -0800961func (fs *flowServer) MethodTags() []interface{} {
962 //nologcall
963 return fs.tags
964}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700965
966// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
967// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700968func (fs *flowServer) Name() string {
969 //nologcall
970 return fs.suffix
971}
972func (fs *flowServer) Suffix() string {
973 //nologcall
974 return fs.suffix
975}
976func (fs *flowServer) Label() security.Label {
977 //nologcall
Todd Wang5739dda2014-11-16 22:44:02 -0800978 return security.LabelFromMethodTags(fs.tags)
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700979}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700980func (fs *flowServer) LocalPrincipal() security.Principal {
981 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -0700982 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700983}
984func (fs *flowServer) LocalBlessings() security.Blessings {
985 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -0700986 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700987}
988func (fs *flowServer) RemoteBlessings() security.Blessings {
989 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -0700990 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700991}
Asim Shankar8f05c222014-10-06 22:08:19 -0700992func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700993 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -0700994 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700995}
996func (fs *flowServer) LocalEndpoint() naming.Endpoint {
997 //nologcall
998 return fs.flow.LocalEndpoint()
999}
1000func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1001 //nologcall
1002 return fs.flow.RemoteEndpoint()
1003}