blob: 432242e0f7a91cddda301d0fbe5e6b74eeb73944 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08004 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "fmt"
6 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07007 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "strings"
10 "sync"
11 "time"
12
Jiri Simsa764efb72014-12-25 20:57:03 -080013 "v.io/core/veyron2/config"
14 "v.io/core/veyron2/context"
15 "v.io/core/veyron2/ipc"
16 "v.io/core/veyron2/ipc/stream"
17 "v.io/core/veyron2/naming"
18 "v.io/core/veyron2/options"
19 "v.io/core/veyron2/security"
20 "v.io/core/veyron2/services/security/access"
21 "v.io/core/veyron2/vdl"
22 old_verror "v.io/core/veyron2/verror"
23 verror "v.io/core/veyron2/verror2"
24 "v.io/core/veyron2/vlog"
25 "v.io/core/veyron2/vom"
26 "v.io/core/veyron2/vom2"
27 "v.io/core/veyron2/vtrace"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070028
Jiri Simsa764efb72014-12-25 20:57:03 -080029 "v.io/core/veyron/lib/netstate"
30 "v.io/core/veyron/lib/stats"
31 "v.io/core/veyron/runtimes/google/ipc/stream/vc"
32 "v.io/core/veyron/runtimes/google/lib/publisher"
33 inaming "v.io/core/veyron/runtimes/google/naming"
34 ivtrace "v.io/core/veyron/runtimes/google/vtrace"
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080035
36 // TODO(cnicolaou): finish verror -> verror2 transition, in particular
37 // for communicating from server to client.
38 // TODO(cnicolaou): remove the vom1 code now that vom2 is in place.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070039)
40
41var (
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -080042 // TODO(cnicolaou): this should be BadState in verror2.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080043 errServerStopped = old_verror.Abortedf("ipc: server is stopped")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044)
45
Jiri Simsa5293dcb2014-05-10 09:56:38 -070046type server struct {
47 sync.Mutex
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -080048 ctx *context.T // context used by the server to make internal RPCs.
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080049 streamMgr stream.Manager // stream manager to listen for new flows.
50 publisher publisher.Publisher // publisher to publish mounttable mounts.
51 listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
52 listeners map[stream.Listener]struct{} // listeners created by Listen.
53 dhcpListeners map[*dhcpListener]struct{} // dhcpListeners created by Listen.
54
55 disp ipc.Dispatcher // dispatcher to serve RPCs
56 dispReserved ipc.Dispatcher // dispatcher for reserved methods
57 active sync.WaitGroup // active goroutines we've spawned.
58 stopped bool // whether the server has been stopped.
59 stoppedChan chan struct{} // closed when the server has been stopped.
60 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Nicolas LaCasse55a10f32014-11-26 13:25:53 -080061 ns naming.Namespace
62 servesMountTable bool
Cosmos Nicolaou92dba582014-11-05 17:24:10 -080063 // TODO(cnicolaou): remove this when the publisher tracks published names
64 // and can return an appropriate error for RemoveName on a name that
65 // wasn't 'Added' for this server.
Todd Wang5739dda2014-11-16 22:44:02 -080066 names map[string]struct{}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070067 // TODO(cnicolaou): add roaming stats to ipcStats
Matt Rosencrantz5f98d942015-01-08 13:48:30 -080068 stats *ipcStats // stats for this server.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070069}
70
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070071var _ ipc.Server = (*server)(nil)
72
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070073type dhcpListener struct {
74 sync.Mutex
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080075 publisher *config.Publisher // publisher used to fork the stream
76 name string // name of the publisher stream
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080077 eps []*inaming.Endpoint // endpoint returned after listening
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080078 pubAddrs []ipc.Address // addresses to publish
79 pubPort string // port to use with the publish addresses
Cosmos Nicolaouef323db2014-09-07 22:13:28 -070080 ch chan config.Setting // channel to receive settings over
Jiri Simsa5293dcb2014-05-10 09:56:38 -070081}
82
Nicolas LaCasse55a10f32014-11-26 13:25:53 -080083// This option is used to sort and filter the endpoints when resolving the
84// proxy name from a mounttable.
85type PreferredServerResolveProtocols []string
86
87func (PreferredServerResolveProtocols) IPCServerOpt() {}
88
Matt Rosencrantz5f98d942015-01-08 13:48:30 -080089func InternalNewServer(ctx *context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
90 ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
Bogdan Capritae7376312014-11-10 13:13:17 -080091 statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -070092 s := &server{
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080093 ctx: ctx,
94 streamMgr: streamMgr,
95 publisher: publisher.New(ctx, ns, publishPeriod),
96 listeners: make(map[stream.Listener]struct{}),
97 dhcpListeners: make(map[*dhcpListener]struct{}),
98 stoppedChan: make(chan struct{}),
99 ns: ns,
100 stats: newIPCStats(statsPrefix),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700101 }
Bogdan Capritae7376312014-11-10 13:13:17 -0800102 var (
103 principal security.Principal
104 blessings security.Blessings
105 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700106 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700107 switch opt := opt.(type) {
108 case stream.ListenerOpt:
109 // Collect all ServerOpts that are also ListenerOpts.
110 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800111 switch opt := opt.(type) {
112 case vc.LocalPrincipal:
113 principal = opt.Principal
114 case options.ServerBlessings:
115 blessings = opt.Blessings
116 }
Asim Shankarcc044212014-10-15 23:25:26 -0700117 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700118 s.servesMountTable = bool(opt)
Cosmos Nicolaou8246a8b2014-11-01 09:32:36 -0700119 case options.ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800120 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800121 case PreferredServerResolveProtocols:
122 s.preferredProtocols = []string(opt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700123 }
124 }
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800125 // TODO(suharshs,mattr): Get a client from the context.
126 client, err := InternalNewClient(streamMgr, ns)
127 if err != nil {
128 return nil, fmt.Errorf("failed to create discharge-client: %v", err)
129 }
130 dc := InternalNewDischargeClient(ctx, client)
131 s.listenerOpts = append(s.listenerOpts, dc)
Bogdan Capritae7376312014-11-10 13:13:17 -0800132 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
133 if blessings != nil {
134 // TODO(caprita): revist printing the blessings with %s, and
135 // instead expose them as a list.
136 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", blessings))
137 } else if principal != nil { // principal should have been passed in, but just in case.
138 stats.NewStringFunc(blessingsStatsName, func() string {
139 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
140 })
141 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700142 return s, nil
143}
144
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700145func (s *server) Published() ([]string, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700146 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700147 s.Lock()
148 defer s.Unlock()
149 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800150 return nil, s.newBadState("ipc.Server.Stop already called")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700151 }
152 return s.publisher.Published(), nil
153}
154
Robin Thellend92b65a42014-12-17 14:30:16 -0800155// resolveToEndpoint resolves an object name or address to an endpoint.
156func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankardee311d2014-08-01 17:41:31 -0700157 var names []string
158 if s.ns != nil {
Ryan Brown6153c6c2014-12-11 13:10:09 -0800159 var entry *naming.MountEntry
Asim Shankardee311d2014-08-01 17:41:31 -0700160 var err error
Ryan Brown6153c6c2014-12-11 13:10:09 -0800161 if entry, err = s.ns.ResolveX(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700162 return "", err
163 }
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800164 names = entry.Names()
Asim Shankardee311d2014-08-01 17:41:31 -0700165 } else {
166 names = append(names, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800168 // An empty set of protocols means all protocols...
169 ordered, err := filterAndOrderServers(names, s.preferredProtocols)
170 if err != nil {
171 return "", err
172 }
173 for _, n := range ordered {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700174 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800175 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700176 continue
177 }
178 if _, err := inaming.NewEndpoint(address); err == nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700179 return address, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700180 }
181 }
Asim Shankardee311d2014-08-01 17:41:31 -0700182 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700183}
184
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800185func addrFromIP(ip net.IP) ipc.Address {
186 return &netstate.AddrIfc{
187 Addr: &net.IPAddr{IP: ip},
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800188 }
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800189}
190
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800191/*
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800192// getIPRoamingAddrs finds an appropriate set of addresss to publish
193// externally and also determines if it's sensible to allow roaming.
194// It returns the host address of the first suitable address that
195// can be used and the port number that can be used with all addresses.
196// The host is required to allow the caller to construct an endpoint
197// that can be returned to the caller of Listen.
198func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
199 host, port, err = net.SplitHostPort(iep.Address)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800200 if err != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800201 return nil, "", "", false, err
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800202 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800203 ip := net.ParseIP(host)
204 if ip == nil {
205 return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
206 }
207 if ip.IsUnspecified() && chooser != nil {
208 // Need to find a usable IP address since the call to listen
209 // didn't specify one.
210 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
211 if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
212 phost := a[0].Address().String()
213 iep.Address = net.JoinHostPort(phost, port)
214 return a, phost, port, true, nil
215 }
216 }
217 return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
218 }
219 // Listen used a fixed IP address, which we take to mean that
220 // roaming is not desired.
221 return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
222}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800223*/
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800224
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800225// getPossbileAddrs returns an appropriate set of addresses that could be used
226// to contact the supplied protocol, host, port parameters using the supplied
227// chooser function. It returns an indication of whether the supplied address
228// was fully specified or not, returning false if the address was fully
229// specified, and true if it was not.
230func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
231 ip := net.ParseIP(host)
232 if ip == nil {
233 return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
234 }
235 if ip.IsUnspecified() {
236 if chooser != nil {
237 // Need to find a usable IP address since the call to listen
238 // didn't specify one.
239 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800240 a, err := chooser(protocol, addrs)
241 if err == nil && len(a) > 0 {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800242 return a, true, nil
243 }
244 }
245 }
246 // We don't have a chooser, so we just return the address the
247 // underlying system has chosen.
248 return []ipc.Address{addrFromIP(ip)}, true, nil
249 }
250 return []ipc.Address{addrFromIP(ip)}, false, nil
251}
252
253// createEndpoints creates appropriate inaming.Endpoint instances for
254// all of the externally accessible networrk addresses that can be used
255// to reach this server.
256func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
257 iep, ok := lep.(*inaming.Endpoint)
258 if !ok {
259 return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
260 }
261 if !strings.HasPrefix(iep.Protocol, "tcp") &&
262 !strings.HasPrefix(iep.Protocol, "ws") {
263 // If not tcp or ws, just return the endpoint we were given.
264 return []*inaming.Endpoint{iep}, false, nil
265 }
266
267 host, port, err := net.SplitHostPort(iep.Address)
268 if err != nil {
269 return nil, false, err
270 }
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800271 addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800272 if err != nil {
273 return nil, false, err
274 }
275 ieps := make([]*inaming.Endpoint, 0, len(addrs))
276 for _, addr := range addrs {
277 n, err := inaming.NewEndpoint(lep.String())
278 if err != nil {
279 return nil, false, err
280 }
281 n.IsMountTable = s.servesMountTable
282 //n.Protocol = addr.Address().Network()
283 n.Address = net.JoinHostPort(addr.Address().String(), port)
284 ieps = append(ieps, n)
285 }
286 return ieps, unspecified, nil
287}
288
289/*
290// configureEPAndRoaming configures the endpoint by filling in its Address
291// portion with the appropriately selected network address, it also
292// returns an indication of whether this endpoint is appropriate for
293// roaming and the set of addresses that should be published.
294func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800295 iep, ok := ep.(*inaming.Endpoint)
296 if !ok {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800297 return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
Cosmos Nicolaouaef5e372014-11-07 16:59:59 -0800298 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800299 if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
300 !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
301 return false, nil, iep, nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800302 }
303 pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
304 if err != nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800305 return false, nil, iep, err
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800306 }
307 iep.Address = net.JoinHostPort(pubHost, pubPort)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800308 return roaming, pubAddrs, iep, nil
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700309}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800310*/
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700311
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800312type listenError struct {
313 err verror.E
314 errors map[struct{ Protocol, Address string }]error
315}
316
317func newError() *listenError {
318 return &listenError{errors: make(map[struct{ Protocol, Address string }]error)}
319}
320
321func ErrorDetails(le *listenError) map[struct{ Protocol, Address string }]error {
322 return le.errors
323}
324
325// Implements error
326func (le *listenError) Error() string {
327 s := le.err.Error()
328 for k, v := range le.errors {
329 s += fmt.Sprintf("(%s,%s:%s) ", k.Protocol, k.Address, v)
330 }
331 return strings.TrimRight(s, " ")
332}
333
334func (le *listenError) ErrorID() old_verror.ID {
335 return le.err.ErrorID()
336}
337
338func (le *listenError) Action() verror.ActionCode {
339 return le.err.Action()
340}
341
342func (le *listenError) Params() []interface{} {
343 return le.err.Params()
344}
345
346func (le *listenError) HasMessage() bool {
347 return le.err.HasMessage()
348}
349
350func (le *listenError) Stack() verror.PCs {
351 return le.err.Stack()
352}
353
354func (s *server) newBadState(m string) *listenError {
355 return &listenError{err: verror.Make(verror.BadState, s.ctx, m)}
356}
357
358func (s *server) newBadArg(m string) *listenError {
359 return &listenError{err: verror.Make(verror.BadArg, s.ctx, m)}
360}
361
362func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700363 defer vlog.LogCall()()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700364 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800365
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700366 // Shortcut if the server is stopped, to avoid needlessly creating a
367 // listener.
368 if s.stopped {
369 s.Unlock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800370 return nil, s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700371 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700372
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800373 useProxy := len(listenSpec.Proxy) > 0
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800374
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800375 // Start the proxy as early as possible.
376 if useProxy {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800377 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800378 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800379 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800380 s.proxyListenLoop(listenSpec.Proxy)
381 s.active.Done()
382 }()
383 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800384 s.Unlock()
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700385
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800386 var ieps []*inaming.Endpoint
387
388 type lnInfo struct {
389 ln stream.Listener
390 ep naming.Endpoint
391 }
392 linfo := []lnInfo{}
393 closeAll := func(lni []lnInfo) {
394 for _, li := range lni {
395 li.ln.Close()
396 }
397 }
398
399 roaming := false
400 for _, addr := range listenSpec.Addrs {
401 if len(addr.Address) > 0 {
402 // Listen if we have a local address to listen on. Some situations
403 // just need a proxy (e.g. a browser extension).
404 tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
405 if err != nil {
406 closeAll(linfo)
407 vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
408 return nil, err
409 }
410 linfo = append(linfo, lnInfo{tmpln, lep})
411 tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
412 if err != nil {
413 closeAll(linfo)
414 return nil, err
415 }
416 ieps = append(ieps, tmpieps...)
417 if tmpRoaming {
418 roaming = true
419 }
420 }
421 }
422
423 // TODO(cnicolaou): write a test for all of these error cases.
424 if len(ieps) == 0 {
425 if useProxy {
426 return nil, nil
427 }
428 // no proxy.
429 if len(listenSpec.Addrs) > 0 {
430 return nil, fmt.Errorf("no endpoints")
431 }
432 return nil, fmt.Errorf("no proxy and no addresses requested")
433 }
434
435 // TODO(cnicolaou): return all of the eps and their errors....
436 s.Lock()
437 defer s.Unlock()
438 if s.stopped {
439 closeAll(linfo)
440 return nil, errServerStopped
441 }
442
443 if roaming && listenSpec.StreamPublisher != nil {
444 // TODO(cnicolaou): renable roaming in a followup CL.
445 /*
446 var dhcpl *dhcpListener
447 streamName := listenSpec.StreamName
448 ch := make(chan config.Setting)
449 if _, err := publisher.ForkStream(streamName, ch); err != nil {
450 return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
451 }
452 dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
453 // We have a goroutine to listen for dhcp changes.
454 s.active.Add(1)
455 go func() {
456 s.dhcpLoop(dhcpl)
457 s.active.Done()
458 }()
459 s.dhcpListeners[dhcpl] = struct{}{}
460 */
461 }
462
463 for _, li := range linfo {
464 s.listeners[li.ln] = struct{}{}
465 // We have a goroutine per listener to accept new flows.
466 // Each flow is served from its own goroutine.
467 s.active.Add(1)
468 go func(ln stream.Listener, ep naming.Endpoint) {
469 s.listenLoop(ln, ep)
470 s.active.Done()
471 }(li.ln, li.ep)
472 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800473 eps := make([]naming.Endpoint, len(ieps))
474 for i, iep := range ieps {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800475 s.publisher.AddServer(iep.String(), s.servesMountTable)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800476 eps[i] = iep
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800477 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800478 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700479}
480
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800481func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800482 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800483 if err != nil {
484 return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
485 }
486 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
487 if err != nil {
488 return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
489 }
490 iep, ok := ep.(*inaming.Endpoint)
491 if !ok {
492 ln.Close()
493 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
494 }
495 s.Lock()
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800496 s.listeners[ln] = struct{}{}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800497 s.Unlock()
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800498 s.publisher.AddServer(iep.String(), s.servesMountTable)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800499 return iep, ln, nil
500}
501
502func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700503 const (
504 min = 5 * time.Millisecond
505 max = 5 * time.Minute
506 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800507
508 iep, ln, err := s.reconnectAndPublishProxy(proxy)
509 if err != nil {
510 vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
511 }
512 // the initial connection maybe have failed, but we enter the retry
513 // loop anyway so that we will continue to try and connect to the
514 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800515 s.Lock()
516 if s.stopped {
517 s.Unlock()
518 return
519 }
520 s.Unlock()
521
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700522 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800523 if ln != nil && iep != nil {
524 s.listenLoop(ln, iep)
525 // The listener is done, so:
526 // (1) Unpublish its name
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800527 s.publisher.RemoveServer(iep.String())
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800528 }
529
530 s.Lock()
531 if s.stopped {
532 s.Unlock()
533 return
534 }
535 s.Unlock()
536
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700537 // (2) Reconnect to the proxy unless the server has been stopped
538 backoff := min
539 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800540 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700541 select {
542 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700543 if backoff = backoff * 2; backoff > max {
544 backoff = max
545 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700546 case <-s.stoppedChan:
547 return
548 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800549 // (3) reconnect, publish new address
550 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
551 vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
552 } else {
553 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
554 break
555 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700556 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700557 }
558}
559
560func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800561 defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800562 var calls sync.WaitGroup
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700563 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800564 calls.Wait()
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700565 s.Lock()
566 delete(s.listeners, ln)
567 s.Unlock()
568 }()
569 for {
570 flow, err := ln.Accept()
571 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800572 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ep, err)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700573 return
574 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800575 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700576 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800577 defer calls.Done()
578 fs, err := newFlowServer(flow, s)
579 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800580 vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800581 return
582 }
583 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800584 // TODO(caprita): Logging errors here is too spammy. For example, "not
585 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800586 if err != io.EOF {
587 vlog.Errorf("Flow serve on %v failed: %v", ep, err)
588 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700589 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700590 }(flow)
591 }
592}
593
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800594/*
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700595func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
596 dhcpl.Lock()
597 defer dhcpl.Unlock()
598 for _, a := range addrs {
599 if ip := netstate.AsIP(a); ip != nil {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800600 dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800601 fn(dhcpl.ep.String())
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700602 }
603 }
604}
605
606func (s *server) dhcpLoop(dhcpl *dhcpListener) {
607 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
608 vlog.VI(2).Infof("ipc: dhcp loop")
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800609
610 ep := *dhcpl.ep
611 // Publish all of the addresses
612 for _, pubAddr := range dhcpl.pubAddrs {
613 ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800614 s.publisher.AddServer(ep.String(), s.servesMountTable)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800615 }
616
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700617 for setting := range dhcpl.ch {
618 if setting == nil {
619 return
620 }
621 switch v := setting.Value().(type) {
622 case bool:
623 return
624 case []net.Addr:
625 s.Lock()
626 if s.stopped {
627 s.Unlock()
628 return
629 }
630 publisher := s.publisher
631 s.Unlock()
632 switch setting.Name() {
633 case ipc.NewAddrsSetting:
634 vlog.Infof("Added some addresses: %q", v)
David Why Use Two When One Will Do Presotto3da1c792014-10-03 11:15:53 -0700635 s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700636 case ipc.RmAddrsSetting:
637 vlog.Infof("Removed some addresses: %q", v)
638 s.applyChange(dhcpl, v, publisher.RemoveServer)
639 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700640 }
641 }
642}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800643*/
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700644
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800645type leafDispatcher struct {
646 invoker ipc.Invoker
647 auth security.Authorizer
648}
649
650func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
651 if suffix != "" {
652 return nil, nil, old_verror.NoExistf("ipc: dispatcher lookup on non-empty suffix not supported: " + suffix)
653 }
654 return d.invoker, d.auth, nil
655}
656
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800657func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800658 defer vlog.LogCall()()
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800659 invoker, err := objectToInvoker(obj)
660 if err != nil {
661 return s.newBadArg(fmt.Sprintf("bad object: %v", err))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800662 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800663 return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800664}
665
666func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800667 defer vlog.LogCall()()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700668 s.Lock()
669 defer s.Unlock()
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800670 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Matt Rosencrantz3e76f282014-11-10 09:38:57 -0800671
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700672 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800673 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700674 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800675 if disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800676 return s.newBadArg("nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700677 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800678 if s.disp != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800679 return s.newBadState("ipc.Server.Serve/ServeDispatcher already called")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700680 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800681 s.disp = disp
682 s.names = make(map[string]struct{})
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700683 if len(name) > 0 {
684 s.publisher.AddName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800685 s.names[name] = struct{}{}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700686 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700687 return nil
688}
689
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800690func (s *server) AddName(name string) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800691 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800692 s.Lock()
693 defer s.Unlock()
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800694 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800695 if len(name) == 0 {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800696 return s.newBadArg("name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800697 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800698 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800699 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800700 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800701 if s.disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800702 return s.newBadState("adding a name before calling Serve or ServeDispatcher is not allowed")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800703 }
704 s.publisher.AddName(name)
705 // TODO(cnicolaou): remove this map when the publisher's RemoveName
706 // method returns an error.
707 s.names[name] = struct{}{}
708 return nil
709}
710
711func (s *server) RemoveName(name string) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800712 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800713 s.Lock()
714 defer s.Unlock()
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800715 vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800716 if s.stopped {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800717 return s.newBadState("ipc.Server.Stop already called")
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800718 }
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800719 if s.disp == nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800720 return s.newBadState("removing name before calling Serve or ServeDispatcher is not allowed")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800721 }
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800722 if _, present := s.names[name]; !present {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800723 return s.newBadArg(fmt.Sprintf("%q has not been previously used for this server", name))
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800724 }
725 s.publisher.RemoveName(name)
726 delete(s.names, name)
727 return nil
728}
729
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700730func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700731 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700732 s.Lock()
733 if s.stopped {
734 s.Unlock()
735 return nil
736 }
737 s.stopped = true
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700738 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700739 s.Unlock()
740
Robin Thellenddf428232014-10-06 12:50:44 -0700741 // Delete the stats object.
742 s.stats.stop()
743
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700744 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
745 // server lock, since publisher is safe for concurrent access.
746
747 // Stop the publisher, which triggers unmounting of published names.
748 s.publisher.Stop()
749 // Wait for the publisher to be done unmounting before we can proceed to
750 // close the listeners (to minimize the number of mounted names pointing
751 // to endpoint that are no longer serving).
752 //
753 // TODO(caprita): See if make sense to fail fast on rejecting
754 // connections once listeners are closed, and parallelize the publisher
755 // and listener shutdown.
756 s.publisher.WaitForStop()
757
758 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800759
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700760 // Close all listeners. No new flows will be accepted, while in-flight
761 // flows will continue until they terminate naturally.
762 nListeners := len(s.listeners)
763 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700764
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800765 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700766 go func(ln stream.Listener) {
767 errCh <- ln.Close()
768 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800769 }
770 for dhcpl, _ := range s.dhcpListeners {
771 dhcpl.Lock()
772 dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
773 dhcpl.ch <- config.NewBool("EOF", "stop", true)
774 dhcpl.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700775 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800776
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700777 s.Unlock()
778 var firstErr error
779 for i := 0; i < nListeners; i++ {
780 if err := <-errCh; err != nil && firstErr == nil {
781 firstErr = err
782 }
783 }
784 // At this point, we are guaranteed that no new requests are going to be
785 // accepted.
786
787 // Wait for the publisher and active listener + flows to finish.
788 s.active.Wait()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800789
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700790 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800791 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700792 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800793 if firstErr != nil {
794 return verror.Make(verror.Internal, s.ctx, firstErr)
795 }
796 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700797}
798
Todd Wang34ed4c62014-11-26 15:15:52 -0800799// TODO(toddw): Remove these interfaces after the vom2 transition.
800type vomEncoder interface {
801 Encode(v interface{}) error
802}
803
804type vomDecoder interface {
805 Decode(v interface{}) error
806}
807
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700808// flowServer implements the RPC server-side protocol for a single RPC, over a
809// flow that's already connected to the client.
810type flowServer struct {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800811 *context.T
Todd Wang5739dda2014-11-16 22:44:02 -0800812 server *server // ipc.Server that this flow server belongs to
813 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
Todd Wang34ed4c62014-11-26 15:15:52 -0800814 dec vomDecoder // to decode requests and args from the client
815 enc vomEncoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -0800816 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817
Asim Shankar220a0152014-10-30 21:21:09 -0700818 // Fields filled in during the server invocation.
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800819 clientBlessings security.Blessings
820 ackBlessings bool
821 blessings security.Blessings
822 method, suffix string
823 tags []interface{}
824 discharges map[string]security.Discharge
825 starttime time.Time
826 endStreamArgs bool // are the stream args at EOF?
827 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700828}
829
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700830var _ ipc.Stream = (*flowServer)(nil)
831
Todd Wang34ed4c62014-11-26 15:15:52 -0800832func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700833 server.Lock()
834 disp := server.disp
835 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700836
Todd Wang34ed4c62014-11-26 15:15:52 -0800837 fs := &flowServer{
838 T: server.ctx,
839 server: server,
840 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -0800841 flow: flow,
842 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700843 }
Todd Wang34ed4c62014-11-26 15:15:52 -0800844 if vom2.IsEnabled() {
845 var err error
846 if fs.dec, err = vom2.NewDecoder(flow); err != nil {
847 flow.Close()
848 return nil, err
849 }
850 if fs.enc, err = vom2.NewBinaryEncoder(flow); err != nil {
851 flow.Close()
852 return nil, err
853 }
854 } else {
855 fs.dec = vom.NewDecoder(flow)
856 fs.enc = vom.NewEncoder(flow)
857 }
858 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700859}
860
861// Vom does not encode untyped nils.
862// Consequently, the ipc system does not allow nil results with an interface
863// type from server methods. The one exception being errors.
864//
865// For now, the following hacky assumptions are made, which will be revisited when
866// a decision is made on how untyped nils should be encoded/decoded in
867// vom/vom2:
868//
869// - Server methods return 0 or more results
870// - Any values returned by the server that have an interface type are either
871// non-nil or of type error.
Todd Wang34ed4c62014-11-26 15:15:52 -0800872func vomErrorHack(res interface{}) vom.Value {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700873 v := vom.ValueOf(res)
874 if !v.IsValid() {
875 // Untyped nils are assumed to be nil-errors.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800876 var boxed old_verror.E
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700877 return vom.ValueOf(&boxed).Elem()
878 }
879 if err, iserr := res.(error); iserr {
880 // Convert errors to verror since errors are often not
881 // serializable via vom/gob (errors.New and fmt.Errorf return a
882 // type with no exported fields).
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800883 return vom.ValueOf(old_verror.Convert(err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700884 }
885 return v
886}
887
Todd Wang34ed4c62014-11-26 15:15:52 -0800888// TODO(toddw): Remove this function and encodeValueHack after the vom2 transition.
889func vom2ErrorHack(res interface{}) interface{} {
890 if err, ok := res.(error); ok {
891 return &err
892 }
893 return res
894}
895
896// TODO(toddw): Remove this function and vom2ErrorHack after the vom2 transition.
897func (fs *flowServer) encodeValueHack(res interface{}) error {
898 if vom2.IsEnabled() {
899 return fs.enc.Encode(vom2ErrorHack(res))
900 }
901 return fs.enc.(*vom.Encoder).EncodeValue(vomErrorHack(res))
902}
903
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700904func (fs *flowServer) serve() error {
905 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700906
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700907 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700908
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800909 vtrace.GetSpan(fs.T).Finish()
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700910
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700911 var traceResponse vtrace.Response
912 if fs.allowDebug {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800913 traceResponse = ivtrace.Response(fs.T)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700914 }
915
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700916 // Respond to the client with the response header and positional results.
917 response := ipc.Response{
918 Error: err,
919 EndStreamResults: true,
920 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700921 TraceResponse: traceResponse,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800922 AckBlessings: fs.ackBlessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700923 }
924 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800925 if err == io.EOF {
926 return err
927 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800928 return old_verror.BadProtocolf("ipc: response encoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700929 }
930 if response.Error != nil {
931 return response.Error
932 }
933 for ix, res := range results {
Todd Wang34ed4c62014-11-26 15:15:52 -0800934 if err := fs.encodeValueHack(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800935 if err == io.EOF {
936 return err
937 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800938 return old_verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700939 }
940 }
941 // TODO(ashankar): Should unread data from the flow be drained?
942 //
943 // Reason to do so:
944 // The common stream.Flow implementation (veyron/runtimes/google/ipc/stream/vc/reader.go)
945 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
946 // slices will not be returned to the pool leading to possibly increased memory usage.
947 //
948 // Reason to not do so:
949 // Draining here will conflict with any Reads on the flow in a separate goroutine
950 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
951 //
952 // For now, go with the reason to not do so as having unread data in the stream
953 // should be a rare case.
954 return nil
955}
956
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800957func (fs *flowServer) readIPCRequest() (*ipc.Request, old_verror.E) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700958 // Set a default timeout before reading from the flow. Without this timeout,
959 // a client that sends no request or a partial request will retain the flow
960 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -0700961 initTimer := newTimer(defaultCallTimeout)
962 defer initTimer.Stop()
963 fs.flow.SetDeadline(initTimer.C)
964
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700965 // Decode the initial request.
966 var req ipc.Request
967 if err := fs.dec.Decode(&req); err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800968 return nil, old_verror.BadProtocolf("ipc: request decoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700969 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700970 return &req, nil
971}
972
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800973func (fs *flowServer) processRequest() ([]interface{}, old_verror.E) {
Asim Shankar0cad0832014-11-04 01:27:38 -0800974 fs.starttime = time.Now()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700975 req, verr := fs.readIPCRequest()
976 if verr != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700977 // We don't know what the ipc call was supposed to be, but we'll create
978 // a placeholder span so we can capture annotations.
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800979 fs.T, _ = vtrace.SetNewSpan(fs.T, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
Matt Rosencrantz86897932014-10-02 09:34:34 -0700980 return nil, verr
981 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700982 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -0800983 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -0700984
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700985 // TODO(mattr): Currently this allows users to trigger trace collection
986 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -0800987 // results later. This might be considered a DOS vector.
988 spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800989 fs.T, _ = ivtrace.SetContinuedSpan(fs.T, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700990
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700991 var cancel context.CancelFunc
Matt Rosencrantz86897932014-10-02 09:34:34 -0700992 if req.Timeout != ipc.NoTimeout {
Matt Rosencrantz89445a42015-01-05 13:32:37 -0800993 fs.T, cancel = context.WithDeadline(fs.T, fs.starttime.Add(time.Duration(req.Timeout)))
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700994 } else {
Matt Rosencrantz89445a42015-01-05 13:32:37 -0800995 fs.T, cancel = context.WithCancel(fs.T)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700996 }
Matt Rosencrantz86897932014-10-02 09:34:34 -0700997 fs.flow.SetDeadline(fs.Done())
Todd Wang5739dda2014-11-16 22:44:02 -0800998 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -0700999
Todd Wang5739dda2014-11-16 22:44:02 -08001000 // Initialize security: blessings, discharges, etc.
1001 if verr := fs.initSecurity(req); verr != nil {
1002 return nil, verr
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001003 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001004 // Lookup the invoker.
Todd Wang5739dda2014-11-16 22:44:02 -08001005 invoker, auth, verr := fs.lookup(fs.suffix, &fs.method)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001006 if verr != nil {
1007 return nil, verr
1008 }
1009 // Prepare invoker and decode args.
1010 numArgs := int(req.NumPosArgs)
Robin Thellendb16d7162014-11-07 13:47:26 -08001011 argptrs, tags, err := invoker.Prepare(fs.method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -08001012 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001013 if err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001014 return nil, old_verror.Makef(old_verror.ErrorID(err), "%s: name: %q", err, fs.suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001015 }
1016 if len(argptrs) != numArgs {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001017 return nil, old_verror.BadProtocolf(fmt.Sprintf("ipc: wrong number of input arguments for method %q, name %q (called with %d args, expected %d)", fs.method, fs.suffix, numArgs, len(argptrs)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001018 }
1019 for ix, argptr := range argptrs {
1020 if err := fs.dec.Decode(argptr); err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001021 return nil, old_verror.BadProtocolf("ipc: arg %d decoding failed: %v", ix, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001022 }
1023 }
Todd Wang5739dda2014-11-16 22:44:02 -08001024 // Check application's authorization policy.
1025 if verr := authorize(fs, auth); verr != nil {
1026 return nil, verr
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001027 }
Todd Wang5739dda2014-11-16 22:44:02 -08001028 // Check if the caller is permitted to view debug information.
Asim Shankar68885192014-11-26 12:48:35 -08001029 // TODO(mattr): Is access.Debug the right thing to check?
Todd Wang5739dda2014-11-16 22:44:02 -08001030 fs.allowDebug = authorize(debugContext{fs}, auth) == nil
1031 // Invoke the method.
Robin Thellendb16d7162014-11-07 13:47:26 -08001032 results, err := invoker.Invoke(fs.method, fs, argptrs)
1033 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001034 return results, old_verror.Convert(err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001035}
1036
Todd Wang5739dda2014-11-16 22:44:02 -08001037func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1038 // Ensure that the context gets cancelled if the flow is closed
1039 // due to a network error, or client cancellation.
1040 select {
1041 case <-fs.flow.Closed():
1042 // Here we remove the contexts channel as a deadline to the flow.
1043 // We do this to ensure clients get a consistent error when they read/write
1044 // after the flow is closed. Since the flow is already closed, it doesn't
1045 // matter that the context is also cancelled.
1046 fs.flow.SetDeadline(nil)
1047 cancel()
1048 case <-fs.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001049 }
Todd Wang5739dda2014-11-16 22:44:02 -08001050}
1051
1052// lookup returns the invoker and authorizer responsible for serving the given
1053// name and method. The suffix is stripped of any leading slashes. If it begins
1054// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
1055// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1056// value may be modified to match the actual suffix and method to use.
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001057func (fs *flowServer) lookup(suffix string, method *string) (ipc.Invoker, security.Authorizer, old_verror.E) {
Todd Wang5739dda2014-11-16 22:44:02 -08001058 if naming.IsReserved(*method) {
1059 // All reserved methods are trapped and handled here, by removing the
1060 // reserved prefix and invoking them on reservedMethods. E.g. "__Glob"
1061 // invokes reservedMethods.Glob.
1062 *method = naming.StripReserved(*method)
1063 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
1064 }
1065 disp := fs.disp
1066 if naming.IsReserved(suffix) {
1067 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -07001068 }
1069 if disp != nil {
Robin Thellenda02fe8f2014-11-19 09:58:29 -08001070 obj, auth, err := disp.Lookup(suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001071 switch {
1072 case err != nil:
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001073 return nil, nil, old_verror.Convert(err)
Todd Wang5739dda2014-11-16 22:44:02 -08001074 case obj != nil:
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001075 invoker, err := objectToInvoker(obj)
1076 if err != nil {
1077 return nil, nil, old_verror.Internalf("ipc: invalid received object: %v", err)
1078 }
1079 return invoker, auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001080 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001081 }
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001082 return nil, nil, old_verror.NoExistf("ipc: invoker not found for %q", suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001083}
1084
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001085func objectToInvoker(obj interface{}) (ipc.Invoker, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001086 if obj == nil {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001087 return nil, errors.New("nil object")
Todd Wang5739dda2014-11-16 22:44:02 -08001088 }
1089 if invoker, ok := obj.(ipc.Invoker); ok {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001090 return invoker, nil
Todd Wang5739dda2014-11-16 22:44:02 -08001091 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001092 return ipc.ReflectInvoker(obj)
Todd Wang5739dda2014-11-16 22:44:02 -08001093}
1094
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001095func (fs *flowServer) initSecurity(req *ipc.Request) old_verror.E {
Todd Wang5739dda2014-11-16 22:44:02 -08001096 // If additional credentials are provided, make them available in the context
1097 blessings, err := security.NewBlessings(req.GrantedBlessings)
1098 if err != nil {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001099 return old_verror.BadProtocolf("ipc: failed to decode granted blessings: %v", err)
Todd Wang5739dda2014-11-16 22:44:02 -08001100 }
1101 fs.blessings = blessings
1102 // Detect unusable blessings now, rather then discovering they are unusable on
1103 // first use.
1104 //
1105 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1106 // the server's identity as the blessing. Figure out what we want to do about
1107 // this - should servers be able to assume that a blessing is something that
1108 // does not have the authorizations that the server's own identity has?
1109 if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001110 return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
Todd Wang5739dda2014-11-16 22:44:02 -08001111 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001112 fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats)
1113 if err != nil {
1114 // When the server can't access the blessings cache, the client is not following
1115 // protocol, so the server closes the VCs corresponding to the client endpoint.
1116 // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
1117 // of all VCs connected to the RemoteEndpoint.
1118 fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
1119 return old_verror.BadProtocolf("ipc: blessings cache failed: %v", err)
1120 }
1121 fs.ackBlessings = true
1122
1123 // TODO(suharshs, ataly): Make security.Discharge a vdl type.
1124 for i, d := range req.Discharges {
1125 if dis, ok := d.(security.Discharge); ok {
1126 fs.discharges[dis.ID()] = dis
1127 continue
Todd Wang5739dda2014-11-16 22:44:02 -08001128 }
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001129 if v, ok := d.(*vdl.Value); ok {
1130 return old_verror.BadProtocolf("ipc: discharge #%d of type %s isn't registered", i, v.Type())
1131 }
1132 return old_verror.BadProtocolf("ipc: discharge #%d of type %T doesn't implement security.Discharge", i, d)
Todd Wang5739dda2014-11-16 22:44:02 -08001133 }
1134 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001135}
1136
1137type acceptAllAuthorizer struct{}
1138
1139func (acceptAllAuthorizer) Authorize(security.Context) error {
1140 return nil
1141}
1142
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001143func authorize(ctx security.Context, auth security.Authorizer) old_verror.E {
Todd Wang5739dda2014-11-16 22:44:02 -08001144 if ctx.LocalPrincipal() == nil {
1145 // LocalPrincipal is nil means that the server wanted to avoid
1146 // authentication, and thus wanted to skip authorization as well.
1147 return nil
1148 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001149 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001150 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001151 }
Todd Wang5739dda2014-11-16 22:44:02 -08001152 if err := auth.Authorize(ctx); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -07001153 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001154 return old_verror.NoAccessf("ipc: not authorized to call %q.%q (%v)", ctx.Suffix(), ctx.Method(), err)
Asim Shankara5457f02014-10-24 23:23:07 -07001155 }
1156 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001157}
1158
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001159// debugContext is a context which wraps another context but always returns
Asim Shankar68885192014-11-26 12:48:35 -08001160// the debug tag.
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001161type debugContext struct {
1162 security.Context
1163}
1164
Asim Shankar68885192014-11-26 12:48:35 -08001165func (debugContext) MethodTags() []interface{} {
1166 return []interface{}{access.Debug}
1167}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001168
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001169// Send implements the ipc.Stream method.
1170func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001171 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001172 // The empty response header indicates what follows is a streaming result.
1173 if err := fs.enc.Encode(ipc.Response{}); err != nil {
1174 return err
1175 }
1176 return fs.enc.Encode(item)
1177}
1178
1179// Recv implements the ipc.Stream method.
1180func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001181 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001182 var req ipc.Request
1183 if err := fs.dec.Decode(&req); err != nil {
1184 return err
1185 }
1186 if req.EndStreamArgs {
1187 fs.endStreamArgs = true
1188 return io.EOF
1189 }
1190 return fs.dec.Decode(itemptr)
1191}
1192
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -07001193// Implementations of ipc.ServerContext methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001194
Asim Shankar2519cc12014-11-10 21:16:53 -08001195func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001196 //nologcall
1197 return fs.discharges
1198}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001199func (fs *flowServer) Server() ipc.Server {
1200 //nologcall
1201 return fs.server
1202}
Asim Shankar0cad0832014-11-04 01:27:38 -08001203func (fs *flowServer) Timestamp() time.Time {
1204 //nologcall
1205 return fs.starttime
1206}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001207func (fs *flowServer) Method() string {
1208 //nologcall
1209 return fs.method
1210}
Asim Shankar0cad0832014-11-04 01:27:38 -08001211func (fs *flowServer) MethodTags() []interface{} {
1212 //nologcall
1213 return fs.tags
1214}
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -08001215func (fs *flowServer) Context() *context.T {
Matt Rosencrantz04d197c2014-12-12 08:39:25 -08001216 return fs.T
1217}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001218
1219// TODO(cnicolaou): remove Name from ipc.ServerContext and all of
1220// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001221func (fs *flowServer) Name() string {
1222 //nologcall
1223 return fs.suffix
1224}
1225func (fs *flowServer) Suffix() string {
1226 //nologcall
1227 return fs.suffix
1228}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001229func (fs *flowServer) LocalPrincipal() security.Principal {
1230 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001231 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001232}
1233func (fs *flowServer) LocalBlessings() security.Blessings {
1234 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001235 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001236}
1237func (fs *flowServer) RemoteBlessings() security.Blessings {
1238 //nologcall
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001239 if fs.clientBlessings != nil {
1240 return fs.clientBlessings
1241 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001242 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001243}
Asim Shankar8f05c222014-10-06 22:08:19 -07001244func (fs *flowServer) Blessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001245 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001246 return fs.blessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001247}
1248func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1249 //nologcall
1250 return fs.flow.LocalEndpoint()
1251}
1252func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1253 //nologcall
1254 return fs.flow.RemoteEndpoint()
1255}