blob: b1a67393f8b16716b32660a67ef8d7992b9976bf [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package ipc
2
3import (
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08004 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "fmt"
6 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07007 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "strings"
10 "sync"
11 "time"
12
Jiri Simsa6ac95222015-02-23 16:11:49 -080013 "v.io/v23/config"
14 "v.io/v23/context"
15 "v.io/v23/ipc"
16 "v.io/v23/naming"
David Why Use Two When One Will Do Presottod424c212015-02-25 11:05:26 -080017 "v.io/v23/naming/ns"
Jiri Simsa6ac95222015-02-23 16:11:49 -080018 "v.io/v23/options"
19 "v.io/v23/security"
20 "v.io/v23/services/security/access"
21 "v.io/v23/vdl"
22 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080023 "v.io/v23/vom"
24 "v.io/v23/vtrace"
Jiri Simsa337af232015-02-27 14:36:46 -080025 "v.io/x/lib/vlog"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080026 "v.io/x/ref/profiles/internal/ipc/stream"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070027
Matt Rosencrantz9d3278a2015-03-11 14:58:34 -070028 "v.io/x/lib/netstate"
Jiri Simsaffceefa2015-02-28 11:03:34 -080029 "v.io/x/ref/lib/stats"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080030 "v.io/x/ref/profiles/internal/ipc/stream/vc"
Matt Rosencrantz86ba1a12015-03-09 13:19:02 -070031 "v.io/x/ref/profiles/internal/lib/publisher"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080032 inaming "v.io/x/ref/profiles/internal/naming"
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080033
Todd Wangff73e1f2015-02-10 21:45:52 -080034 // TODO(cnicolaou): finish verror2 -> verror transition, in particular
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080035 // for communicating from server to client.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036)
37
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080038// state for each requested listen address
39type listenState struct {
40 protocol, address string
41 ln stream.Listener
42 lep naming.Endpoint
43 lnerr, eperr error
44 roaming bool
45 // We keep track of all of the endpoints, the port and a copy of
46 // the original listen endpoint for use with roaming network changes.
47 ieps []*inaming.Endpoint // list of currently active eps
48 port string // port to use for creating new eps
49 protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
50}
51
52// state for each requested proxy
53type proxyState struct {
54 endpoint naming.Endpoint
Mike Burrowsdc6b3602015-02-05 15:52:12 -080055 err error
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080056}
57
58type dhcpState struct {
59 name string
60 publisher *config.Publisher
61 stream *config.Stream
62 ch chan config.Setting // channel to receive dhcp settings over
63 err error // error status.
64 watchers map[chan<- ipc.NetworkChange]struct{}
65}
66
Jiri Simsa5293dcb2014-05-10 09:56:38 -070067type server struct {
68 sync.Mutex
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080069 // context used by the server to make internal RPCs, error messages etc.
70 ctx *context.T
Matt Rosencrantz1094d062015-01-30 06:43:12 -080071 cancel context.CancelFunc // function to cancel the above context.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080072 state serverState // track state of the server.
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080073 streamMgr stream.Manager // stream manager to listen for new flows.
74 publisher publisher.Publisher // publisher to publish mounttable mounts.
75 listenerOpts []stream.ListenerOpt // listener opts for Listen.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080076 dhcpState *dhcpState // dhcpState, nil if not using dhcp
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -070077 principal security.Principal
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080078
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080079 // maps that contain state on listeners.
80 listenState map[*listenState]struct{}
81 listeners map[stream.Listener]struct{}
82
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080083 // state of proxies keyed by the name of the proxy
84 proxies map[string]proxyState
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080085
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080086 // all endpoints generated and returned by this server
87 endpoints []naming.Endpoint
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080088
89 disp ipc.Dispatcher // dispatcher to serve RPCs
90 dispReserved ipc.Dispatcher // dispatcher for reserved methods
91 active sync.WaitGroup // active goroutines we've spawned.
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080092 stoppedChan chan struct{} // closed when the server has been stopped.
93 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Jungho Ahn25545d32015-01-26 15:14:14 -080094 // We cache the IP networks on the device since it is not that cheap to read
95 // network interfaces through os syscall.
96 // TODO(jhahn): Add monitoring the network interface changes.
97 ipNets []*net.IPNet
David Why Use Two When One Will Do Presottod424c212015-02-25 11:05:26 -080098 ns ns.Namespace
Jungho Ahn25545d32015-01-26 15:14:14 -080099 servesMountTable bool
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800100
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700101 // TODO(cnicolaou): add roaming stats to ipcStats
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800102 stats *ipcStats // stats for this server.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700103}
104
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800105type serverState int
106
107const (
108 initialized serverState = iota
109 listening
110 serving
111 publishing
112 stopping
113 stopped
114)
115
116// Simple state machine for the server implementation.
117type next map[serverState]bool
118type transitions map[serverState]next
119
120var (
121 states = transitions{
122 initialized: next{listening: true, stopping: true},
123 listening: next{listening: true, serving: true, stopping: true},
124 serving: next{publishing: true, stopping: true},
125 publishing: next{publishing: true, stopping: true},
126 stopping: next{},
127 stopped: next{},
128 }
129
130 externalStates = map[serverState]ipc.ServerState{
131 initialized: ipc.ServerInit,
132 listening: ipc.ServerActive,
133 serving: ipc.ServerActive,
134 publishing: ipc.ServerActive,
135 stopping: ipc.ServerStopping,
136 stopped: ipc.ServerStopped,
137 }
138)
139
140func (s *server) allowed(next serverState, method string) error {
141 if states[s.state][next] {
142 s.state = next
143 return nil
144 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800145 return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800146}
147
148func (s *server) isStopState() bool {
149 return s.state == stopping || s.state == stopped
150}
151
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700152var _ ipc.Server = (*server)(nil)
153
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700154func InternalNewServer(ctx *context.T, streamMgr stream.Manager, ns ns.Namespace, client ipc.Client, principal security.Principal, opts ...ipc.ServerOpt) (ipc.Server, error) {
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800155 ctx, cancel := context.WithRootCancel(ctx)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800156 ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
Bogdan Capritae7376312014-11-10 13:13:17 -0800157 statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700158 s := &server{
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800159 ctx: ctx,
160 cancel: cancel,
161 streamMgr: streamMgr,
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700162 principal: principal,
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800163 publisher: publisher.New(ctx, ns, publishPeriod),
164 listenState: make(map[*listenState]struct{}),
165 listeners: make(map[stream.Listener]struct{}),
166 proxies: make(map[string]proxyState),
167 stoppedChan: make(chan struct{}),
168 ipNets: ipNetworks(),
169 ns: ns,
170 stats: newIPCStats(statsPrefix),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700171 }
Bogdan Capritae7376312014-11-10 13:13:17 -0800172 var (
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800173 blessings security.Blessings
174 dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
Bogdan Capritae7376312014-11-10 13:13:17 -0800175 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700176 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700177 switch opt := opt.(type) {
178 case stream.ListenerOpt:
179 // Collect all ServerOpts that are also ListenerOpts.
180 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800181 switch opt := opt.(type) {
Bogdan Capritae7376312014-11-10 13:13:17 -0800182 case options.ServerBlessings:
183 blessings = opt.Blessings
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800184 case vc.DischargeExpiryBuffer:
185 dischargeExpiryBuffer = time.Duration(opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800186 }
Asim Shankarcc044212014-10-15 23:25:26 -0700187 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700188 s.servesMountTable = bool(opt)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800189 case ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800190 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800191 case PreferredServerResolveProtocols:
192 s.preferredProtocols = []string(opt)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700193 }
194 }
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800195 // Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
196 // the discharges by the time the VC asks for them.`
197 dc := InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800198 s.listenerOpts = append(s.listenerOpts, dc)
Benjamin Prosnitz9284a002015-02-23 14:57:25 -0800199 s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
Bogdan Capritae7376312014-11-10 13:13:17 -0800200 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
Asim Shankar2bf7b1e2015-02-27 00:45:12 -0800201 // TODO(caprita): revist printing the blessings with %s, and
202 // instead expose them as a list.
203 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", blessings))
204 if principal != nil { // principal should have been passed in, but just in case.
Bogdan Capritae7376312014-11-10 13:13:17 -0800205 stats.NewStringFunc(blessingsStatsName, func() string {
206 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
207 })
208 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700209 return s, nil
210}
211
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800212func (s *server) Status() ipc.ServerStatus {
213 status := ipc.ServerStatus{}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700214 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700215 s.Lock()
216 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800217 status.State = externalStates[s.state]
218 status.ServesMountTable = s.servesMountTable
219 status.Mounts = s.publisher.Status()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800220 status.Endpoints = []naming.Endpoint{}
221 for ls, _ := range s.listenState {
222 if ls.eperr != nil {
223 status.Errors = append(status.Errors, ls.eperr)
224 }
225 if ls.lnerr != nil {
226 status.Errors = append(status.Errors, ls.lnerr)
227 }
228 for _, iep := range ls.ieps {
229 status.Endpoints = append(status.Endpoints, iep)
230 }
231 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800232 status.Proxies = make([]ipc.ProxyStatus, 0, len(s.proxies))
233 for k, v := range s.proxies {
234 status.Proxies = append(status.Proxies, ipc.ProxyStatus{k, v.endpoint, v.err})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700235 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800236 return status
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700237}
238
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800239func (s *server) WatchNetwork(ch chan<- ipc.NetworkChange) {
240 defer vlog.LogCall()()
241 s.Lock()
242 defer s.Unlock()
243 if s.dhcpState != nil {
244 s.dhcpState.watchers[ch] = struct{}{}
245 }
246}
247
248func (s *server) UnwatchNetwork(ch chan<- ipc.NetworkChange) {
249 defer vlog.LogCall()()
250 s.Lock()
251 defer s.Unlock()
252 if s.dhcpState != nil {
253 delete(s.dhcpState.watchers, ch)
254 }
255}
256
Robin Thellend92b65a42014-12-17 14:30:16 -0800257// resolveToEndpoint resolves an object name or address to an endpoint.
258func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800259 var resolved *naming.MountEntry
260 var err error
Asim Shankardee311d2014-08-01 17:41:31 -0700261 if s.ns != nil {
Asim Shankaraae31802015-01-22 11:59:42 -0800262 if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700263 return "", err
264 }
265 } else {
Asim Shankaraae31802015-01-22 11:59:42 -0800266 // Fake a namespace resolution
267 resolved = &naming.MountEntry{Servers: []naming.MountedServer{
268 {Server: address},
269 }}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700270 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800271 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800272 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800273 return "", err
274 }
Asim Shankaraae31802015-01-22 11:59:42 -0800275 for _, n := range resolved.Names() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700276 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800277 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700278 continue
279 }
Asim Shankaraae31802015-01-22 11:59:42 -0800280 if ep, err := inaming.NewEndpoint(address); err == nil {
281 return ep.String(), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700282 }
283 }
Asim Shankardee311d2014-08-01 17:41:31 -0700284 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700285}
286
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800287// getPossbileAddrs returns an appropriate set of addresses that could be used
288// to contact the supplied protocol, host, port parameters using the supplied
289// chooser function. It returns an indication of whether the supplied address
290// was fully specified or not, returning false if the address was fully
291// specified, and true if it was not.
292func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800293
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800294 ip := net.ParseIP(host)
295 if ip == nil {
296 return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
297 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800298
299 addrFromIP := func(ip net.IP) ipc.Address {
300 return &netstate.AddrIfc{
301 Addr: &net.IPAddr{IP: ip},
302 }
303 }
304
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800305 if ip.IsUnspecified() {
306 if chooser != nil {
307 // Need to find a usable IP address since the call to listen
308 // didn't specify one.
309 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800310 a, err := chooser(protocol, addrs)
311 if err == nil && len(a) > 0 {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800312 return a, true, nil
313 }
314 }
315 }
316 // We don't have a chooser, so we just return the address the
317 // underlying system has chosen.
318 return []ipc.Address{addrFromIP(ip)}, true, nil
319 }
320 return []ipc.Address{addrFromIP(ip)}, false, nil
321}
322
323// createEndpoints creates appropriate inaming.Endpoint instances for
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800324// all of the externally accessible network addresses that can be used
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800325// to reach this server.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800326func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800327 iep, ok := lep.(*inaming.Endpoint)
328 if !ok {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800329 return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800330 }
331 if !strings.HasPrefix(iep.Protocol, "tcp") &&
332 !strings.HasPrefix(iep.Protocol, "ws") {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800333 // If not tcp, ws, or wsh, just return the endpoint we were given.
334 return []*inaming.Endpoint{iep}, "", false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800335 }
336
337 host, port, err := net.SplitHostPort(iep.Address)
338 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800339 return nil, "", false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800340 }
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800341 addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800342 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800343 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800344 }
345 ieps := make([]*inaming.Endpoint, 0, len(addrs))
346 for _, addr := range addrs {
347 n, err := inaming.NewEndpoint(lep.String())
348 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800349 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800350 }
351 n.IsMountTable = s.servesMountTable
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800352 n.Address = net.JoinHostPort(addr.Address().String(), port)
353 ieps = append(ieps, n)
354 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800355 return ieps, port, unspecified, nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800356}
357
358func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700359 defer vlog.LogCall()()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800360 useProxy := len(listenSpec.Proxy) > 0
361 if !useProxy && len(listenSpec.Addrs) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800362 return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800363 }
364
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700365 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800366 defer s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800367
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800368 if err := s.allowed(listening, "Listen"); err != nil {
369 return nil, err
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700370 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700371
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800372 // Start the proxy as early as possible, ignore duplicate requests
373 // for the same proxy.
374 if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800375 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800376 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800377 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800378 s.proxyListenLoop(listenSpec.Proxy)
379 s.active.Done()
380 }()
381 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700382
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800383 roaming := false
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800384 lnState := make([]*listenState, 0, len(listenSpec.Addrs))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800385 for _, addr := range listenSpec.Addrs {
386 if len(addr.Address) > 0 {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800387 // Listen if we have a local address to listen on.
388 ls := &listenState{
389 protocol: addr.Protocol,
390 address: addr.Address,
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800391 }
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700392 ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.principal, s.listenerOpts...)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800393 lnState = append(lnState, ls)
394 if ls.lnerr != nil {
Asim Shankar7171a252015-03-07 14:41:40 -0800395 vlog.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800396 continue
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800397 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800398 ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
399 if ls.roaming && ls.eperr == nil {
400 ls.protoIEP = *ls.lep.(*inaming.Endpoint)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800401 roaming = true
402 }
403 }
404 }
405
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800406 found := false
407 for _, ls := range lnState {
408 if ls.ln != nil {
409 found = true
410 break
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800411 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800412 }
413 if !found && !useProxy {
Jiri Simsa074bf362015-02-17 09:29:45 -0800414 return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800415 }
416
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800417 if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
418 // Create a dhcp listener if we haven't already done so.
419 dhcp := &dhcpState{
420 name: listenSpec.StreamName,
421 publisher: listenSpec.StreamPublisher,
422 watchers: make(map[chan<- ipc.NetworkChange]struct{}),
423 }
424 s.dhcpState = dhcp
425 dhcp.ch = make(chan config.Setting, 10)
426 dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
427 if dhcp.err == nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800428 // We have a goroutine to listen for dhcp changes.
429 s.active.Add(1)
430 go func() {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800431 s.dhcpLoop(dhcp.ch)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800432 s.active.Done()
433 }()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800434 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800435 }
436
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800437 eps := make([]naming.Endpoint, 0, 10)
438 for _, ls := range lnState {
439 s.listenState[ls] = struct{}{}
440 if ls.ln != nil {
441 // We have a goroutine per listener to accept new flows.
442 // Each flow is served from its own goroutine.
443 s.active.Add(1)
444 go func(ln stream.Listener, ep naming.Endpoint) {
445 s.listenLoop(ln, ep)
446 s.active.Done()
447 }(ls.ln, ls.lep)
448 }
449
450 for _, iep := range ls.ieps {
451 s.publisher.AddServer(iep.String(), s.servesMountTable)
452 eps = append(eps, iep)
453 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800454 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800455
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800456 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700457}
458
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800459func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800460 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800461 if err != nil {
462 return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
463 }
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700464 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.listenerOpts...)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800465 if err != nil {
466 return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
467 }
468 iep, ok := ep.(*inaming.Endpoint)
469 if !ok {
470 ln.Close()
471 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
472 }
473 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800474 s.proxies[proxy] = proxyState{iep, nil}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800475 s.Unlock()
Robin Thellende22920e2015-02-05 17:15:50 -0800476 iep.IsMountTable = s.servesMountTable
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800477 s.publisher.AddServer(iep.String(), s.servesMountTable)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800478 return iep, ln, nil
479}
480
481func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700482 const (
483 min = 5 * time.Millisecond
484 max = 5 * time.Minute
485 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800486
487 iep, ln, err := s.reconnectAndPublishProxy(proxy)
488 if err != nil {
489 vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
490 }
491 // the initial connection maybe have failed, but we enter the retry
492 // loop anyway so that we will continue to try and connect to the
493 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800494 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800495 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800496 s.Unlock()
497 return
498 }
499 s.Unlock()
500
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700501 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800502 if ln != nil && iep != nil {
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800503 err := s.listenLoop(ln, iep)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800504 // The listener is done, so:
505 // (1) Unpublish its name
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800506 s.publisher.RemoveServer(iep.String())
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800507 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800508 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800509 s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800510 } else {
Asim Shankar7171a252015-03-07 14:41:40 -0800511 // err will be nil if we're stopping.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800512 s.proxies[proxy] = proxyState{iep, nil}
513 s.Unlock()
514 return
515 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800516 s.Unlock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800517 }
518
519 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800520 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800521 s.Unlock()
522 return
523 }
524 s.Unlock()
525
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700526 // (2) Reconnect to the proxy unless the server has been stopped
527 backoff := min
528 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800529 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700530 select {
531 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700532 if backoff = backoff * 2; backoff > max {
533 backoff = max
534 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700535 case <-s.stoppedChan:
536 return
537 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800538 // (3) reconnect, publish new address
539 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
540 vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
541 } else {
542 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
543 break
544 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700545 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700546 }
547}
548
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800549// addListener adds the supplied listener taking care to
550// check to see if we're already stopping. It returns true
551// if the listener was added.
552func (s *server) addListener(ln stream.Listener) bool {
553 s.Lock()
554 defer s.Unlock()
555 if s.isStopState() {
556 return false
557 }
558 s.listeners[ln] = struct{}{}
559 return true
560}
561
562// rmListener removes the supplied listener taking care to
563// check if we're already stopping. It returns true if the
564// listener was removed.
565func (s *server) rmListener(ln stream.Listener) bool {
566 s.Lock()
567 defer s.Unlock()
568 if s.isStopState() {
569 return false
570 }
571 delete(s.listeners, ln)
572 return true
573}
574
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800575func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800576 defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800577 var calls sync.WaitGroup
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800578
579 if !s.addListener(ln) {
580 // We're stopping.
581 return nil
582 }
583
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700584 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800585 calls.Wait()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800586 s.rmListener(ln)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700587 }()
588 for {
589 flow, err := ln.Accept()
590 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800591 vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ep, err)
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800592 return err
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700593 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800594 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700595 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800596 defer calls.Done()
597 fs, err := newFlowServer(flow, s)
598 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800599 vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800600 return
601 }
602 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800603 // TODO(caprita): Logging errors here is too spammy. For example, "not
604 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800605 // TODO(cnicolaou): revisit this when verror2 transition is
606 // done.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800607 if err != io.EOF {
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800608 vlog.VI(2).Infof("Flow serve on %v failed: %v", ep, err)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800609 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700610 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700611 }(flow)
612 }
613}
614
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800615func (s *server) dhcpLoop(ch chan config.Setting) {
616 defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes")
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700617 vlog.VI(2).Infof("ipc: dhcp loop")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800618 for setting := range ch {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700619 if setting == nil {
620 return
621 }
622 switch v := setting.Value().(type) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800623 case []ipc.Address:
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700624 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800625 if s.isStopState() {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700626 s.Unlock()
627 return
628 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800629 var err error
630 var changed []naming.Endpoint
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700631 switch setting.Name() {
632 case ipc.NewAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800633 changed = s.addAddresses(v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700634 case ipc.RmAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800635 changed, err = s.removeAddresses(v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700636 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800637 change := ipc.NetworkChange{
638 Time: time.Now(),
639 State: externalStates[s.state],
640 Setting: setting,
641 Changed: changed,
642 Error: err,
643 }
644 vlog.VI(2).Infof("ipc: dhcp: change %v", change)
645 for ch, _ := range s.dhcpState.watchers {
646 select {
647 case ch <- change:
648 default:
649 }
650 }
651 s.Unlock()
652 default:
653 vlog.Errorf("ipc: dhcpLoop: unhandled setting type %T", v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700654 }
655 }
656}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800657
658func getHost(address ipc.Address) string {
659 host, _, err := net.SplitHostPort(address.Address().String())
660 if err == nil {
661 return host
662 }
663 return address.Address().String()
664
665}
666
667// Remove all endpoints that have the same host address as the supplied
668// address parameter.
669func (s *server) removeAddresses(addresses []ipc.Address) ([]naming.Endpoint, error) {
670 var removed []naming.Endpoint
671 for _, address := range addresses {
672 host := getHost(address)
673 for ls, _ := range s.listenState {
674 if ls != nil && ls.roaming && len(ls.ieps) > 0 {
675 remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
676 for _, iep := range ls.ieps {
677 lnHost, _, err := net.SplitHostPort(iep.Address)
678 if err != nil {
679 lnHost = iep.Address
680 }
681 if lnHost == host {
682 vlog.VI(2).Infof("ipc: dhcp removing: %s", iep)
683 removed = append(removed, iep)
684 s.publisher.RemoveServer(iep.String())
685 continue
686 }
687 remaining = append(remaining, iep)
688 }
689 ls.ieps = remaining
690 }
691 }
692 }
693 return removed, nil
694}
695
696// Add new endpoints for the new address. There is no way to know with
697// 100% confidence which new endpoints to publish without shutting down
698// all network connections and reinitializing everything from scratch.
699// Instead, we find all roaming listeners with at least one endpoint
700// and create a new endpoint with the same port as the existing ones
701// but with the new address supplied to us to by the dhcp code. As
702// an additional safeguard we reject the new address if it is not
703// externally accessible.
704// This places the onus on the dhcp/roaming code that sends us addresses
705// to ensure that those addresses are externally reachable.
706func (s *server) addAddresses(addresses []ipc.Address) []naming.Endpoint {
707 var added []naming.Endpoint
708 for _, address := range addresses {
709 if !netstate.IsAccessibleIP(address) {
710 return added
711 }
712 host := getHost(address)
713 for ls, _ := range s.listenState {
714 if ls != nil && ls.roaming {
715 niep := ls.protoIEP
716 niep.Address = net.JoinHostPort(host, ls.port)
717 ls.ieps = append(ls.ieps, &niep)
718 vlog.VI(2).Infof("ipc: dhcp adding: %s", niep)
719 s.publisher.AddServer(niep.String(), s.servesMountTable)
720 added = append(added, &niep)
721 }
722 }
723 }
724 return added
725}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700726
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800727type leafDispatcher struct {
728 invoker ipc.Invoker
729 auth security.Authorizer
730}
731
732func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
733 if suffix != "" {
Todd Wangff73e1f2015-02-10 21:45:52 -0800734 return nil, nil, ipc.NewErrUnknownSuffix(nil, suffix)
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800735 }
736 return d.invoker, d.auth, nil
737}
738
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800739func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800740 defer vlog.LogCall()()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800741 if obj == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800742 return verror.New(verror.ErrBadArg, s.ctx, "nil object")
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800743 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800744 invoker, err := objectToInvoker(obj)
745 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800746 return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800747 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800748 return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800749}
750
751func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800752 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800753 if disp == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800754 return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700755 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800756 s.Lock()
757 defer s.Unlock()
758 if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
759 return err
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700760 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800761 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800762 s.disp = disp
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700763 if len(name) > 0 {
764 s.publisher.AddName(name)
765 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700766 return nil
767}
768
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800769func (s *server) AddName(name string) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800770 defer vlog.LogCall()()
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800771 if len(name) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800772 return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800773 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800774 s.Lock()
775 defer s.Unlock()
776 if err := s.allowed(publishing, "AddName"); err != nil {
777 return err
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800778 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800779 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800780 s.publisher.AddName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800781 return nil
782}
783
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800784func (s *server) RemoveName(name string) {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800785 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800786 s.Lock()
787 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800788 if err := s.allowed(publishing, "RemoveName"); err != nil {
789 return
790 }
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800791 vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800792 s.publisher.RemoveName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800793}
794
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700795func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700796 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700797 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800798 if s.isStopState() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700799 s.Unlock()
800 return nil
801 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800802 s.state = stopping
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700803 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700804 s.Unlock()
805
Robin Thellenddf428232014-10-06 12:50:44 -0700806 // Delete the stats object.
807 s.stats.stop()
808
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700809 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
810 // server lock, since publisher is safe for concurrent access.
811
812 // Stop the publisher, which triggers unmounting of published names.
813 s.publisher.Stop()
814 // Wait for the publisher to be done unmounting before we can proceed to
815 // close the listeners (to minimize the number of mounted names pointing
816 // to endpoint that are no longer serving).
817 //
818 // TODO(caprita): See if make sense to fail fast on rejecting
819 // connections once listeners are closed, and parallelize the publisher
820 // and listener shutdown.
821 s.publisher.WaitForStop()
822
823 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800824
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700825 // Close all listeners. No new flows will be accepted, while in-flight
826 // flows will continue until they terminate naturally.
827 nListeners := len(s.listeners)
828 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700829
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800830 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700831 go func(ln stream.Listener) {
832 errCh <- ln.Close()
833 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800834 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800835
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800836 drain := func(ch chan config.Setting) {
837 for {
838 select {
839 case v := <-ch:
840 if v == nil {
841 return
842 }
843 default:
844 close(ch)
845 return
846 }
847 }
848 }
849
850 if dhcp := s.dhcpState; dhcp != nil {
Cosmos Nicolaouaceb8d92015-02-05 20:44:02 -0800851 // TODO(cnicolaou,caprita): investigate not having to close and drain
852 // the channel here. It's a little awkward right now since we have to
853 // be careful to not close the channel in two places, i.e. here and
854 // and from the publisher's Shutdown method.
855 if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
856 drain(dhcp.ch)
857 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700858 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800859
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700860 s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800861
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700862 var firstErr error
863 for i := 0; i < nListeners; i++ {
864 if err := <-errCh; err != nil && firstErr == nil {
865 firstErr = err
866 }
867 }
868 // At this point, we are guaranteed that no new requests are going to be
869 // accepted.
870
871 // Wait for the publisher and active listener + flows to finish.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800872 done := make(chan struct{}, 1)
873 go func() { s.active.Wait(); done <- struct{}{} }()
874
875 select {
876 case <-done:
877 case <-time.After(5 * time.Minute):
878 vlog.Errorf("Listener Close Error: %v", firstErr)
879 vlog.Errorf("Timedout waiting for goroutines to stop: listeners: %d", nListeners, len(s.listeners))
880 for ln, _ := range s.listeners {
881 vlog.Errorf("Listener: %p", ln)
882 }
883 for ls, _ := range s.listenState {
884 vlog.Errorf("ListenState: %v", ls)
885 }
886 <-done
887 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800888
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700889 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800890 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700891 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800892 if firstErr != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800893 return verror.New(verror.ErrInternal, s.ctx, firstErr)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800894 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800895 s.state = stopped
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800896 s.cancel()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800897 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700898}
899
900// flowServer implements the RPC server-side protocol for a single RPC, over a
901// flow that's already connected to the client.
902type flowServer struct {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800903 *context.T
Todd Wang5739dda2014-11-16 22:44:02 -0800904 server *server // ipc.Server that this flow server belongs to
905 disp ipc.Dispatcher // ipc.Dispatcher that will serve RPCs on this flow
Todd Wang3425a902015-01-21 18:43:59 -0800906 dec *vom.Decoder // to decode requests and args from the client
907 enc *vom.Encoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -0800908 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700909
Asim Shankar220a0152014-10-30 21:21:09 -0700910 // Fields filled in during the server invocation.
Suharsh Sivakumar380bf342015-02-27 15:38:27 -0800911 clientBlessings security.Blessings
912 ackBlessings bool
913 grantedBlessings security.Blessings
914 method, suffix string
915 tags []*vdl.Value
916 discharges map[string]security.Discharge
917 starttime time.Time
918 endStreamArgs bool // are the stream args at EOF?
919 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700920}
921
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700922var _ ipc.Stream = (*flowServer)(nil)
923
Todd Wang34ed4c62014-11-26 15:15:52 -0800924func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700925 server.Lock()
926 disp := server.disp
927 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700928
Todd Wang34ed4c62014-11-26 15:15:52 -0800929 fs := &flowServer{
930 T: server.ctx,
931 server: server,
932 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -0800933 flow: flow,
934 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700935 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800936 var err error
Todd Wang3425a902015-01-21 18:43:59 -0800937 if fs.dec, err = vom.NewDecoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800938 flow.Close()
939 return nil, err
940 }
Todd Wang8e17bff2015-02-18 11:18:56 -0800941 if fs.enc, err = vom.NewEncoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800942 flow.Close()
943 return nil, err
Todd Wang34ed4c62014-11-26 15:15:52 -0800944 }
945 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700946}
947
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700948func (fs *flowServer) serve() error {
949 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700950
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700951 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700952
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800953 vtrace.GetSpan(fs.T).Finish()
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700954
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700955 var traceResponse vtrace.Response
956 if fs.allowDebug {
Matt Rosencrantz2803fe92015-03-09 15:26:32 -0700957 traceResponse = vtrace.GetResponse(fs.T)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700958 }
959
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700960 // Respond to the client with the response header and positional results.
961 response := ipc.Response{
962 Error: err,
963 EndStreamResults: true,
964 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700965 TraceResponse: traceResponse,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800966 AckBlessings: fs.ackBlessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700967 }
968 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800969 if err == io.EOF {
970 return err
971 }
Todd Wang9548d852015-02-10 16:15:59 -0800972 return fmt.Errorf("ipc: response encoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700973 }
974 if response.Error != nil {
975 return response.Error
976 }
977 for ix, res := range results {
Todd Wangf519f8f2015-01-21 10:07:41 -0800978 if err := fs.enc.Encode(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800979 if err == io.EOF {
980 return err
981 }
Todd Wang9548d852015-02-10 16:15:59 -0800982 return fmt.Errorf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700983 }
984 }
985 // TODO(ashankar): Should unread data from the flow be drained?
986 //
987 // Reason to do so:
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -0800988 // The common stream.Flow implementation (veyron/profiles/internal/ipc/stream/vc/reader.go)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700989 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
990 // slices will not be returned to the pool leading to possibly increased memory usage.
991 //
992 // Reason to not do so:
993 // Draining here will conflict with any Reads on the flow in a separate goroutine
994 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
995 //
996 // For now, go with the reason to not do so as having unread data in the stream
997 // should be a rare case.
998 return nil
999}
1000
Todd Wang9548d852015-02-10 16:15:59 -08001001func (fs *flowServer) readIPCRequest() (*ipc.Request, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001002 // Set a default timeout before reading from the flow. Without this timeout,
1003 // a client that sends no request or a partial request will retain the flow
1004 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -07001005 initTimer := newTimer(defaultCallTimeout)
1006 defer initTimer.Stop()
1007 fs.flow.SetDeadline(initTimer.C)
1008
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001009 // Decode the initial request.
1010 var req ipc.Request
1011 if err := fs.dec.Decode(&req); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001012 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadRequest(fs.T, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001013 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001014 return &req, nil
1015}
1016
Todd Wang9548d852015-02-10 16:15:59 -08001017func (fs *flowServer) processRequest() ([]interface{}, error) {
Asim Shankar0cad0832014-11-04 01:27:38 -08001018 fs.starttime = time.Now()
Todd Wang9548d852015-02-10 16:15:59 -08001019 req, err := fs.readIPCRequest()
1020 if err != nil {
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001021 // We don't know what the ipc call was supposed to be, but we'll create
1022 // a placeholder span so we can capture annotations.
Matt Rosencrantz5f98d942015-01-08 13:48:30 -08001023 fs.T, _ = vtrace.SetNewSpan(fs.T, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
Todd Wang9548d852015-02-10 16:15:59 -08001024 return nil, err
Matt Rosencrantz86897932014-10-02 09:34:34 -07001025 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001026 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -08001027 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -07001028
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001029 // TODO(mattr): Currently this allows users to trigger trace collection
1030 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -08001031 // results later. This might be considered a DOS vector.
1032 spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
Matt Rosencrantz18da0372015-02-12 16:04:56 -08001033 fs.T, _ = vtrace.SetContinuedTrace(fs.T, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001034
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001035 var cancel context.CancelFunc
Todd Wangf6a06882015-02-27 17:38:01 -08001036 if !req.Deadline.IsZero() {
1037 fs.T, cancel = context.WithDeadline(fs.T, req.Deadline.Time)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001038 } else {
Matt Rosencrantz89445a42015-01-05 13:32:37 -08001039 fs.T, cancel = context.WithCancel(fs.T)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001040 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001041 fs.flow.SetDeadline(fs.Done())
Todd Wang5739dda2014-11-16 22:44:02 -08001042 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001043
Todd Wang5739dda2014-11-16 22:44:02 -08001044 // Initialize security: blessings, discharges, etc.
Todd Wang9548d852015-02-10 16:15:59 -08001045 if err := fs.initSecurity(req); err != nil {
1046 return nil, err
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001047 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001048 // Lookup the invoker.
Todd Wangebb3b012015-02-09 21:59:05 -08001049 invoker, auth, err := fs.lookup(fs.suffix, &fs.method)
1050 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001051 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001052 }
1053 // Prepare invoker and decode args.
1054 numArgs := int(req.NumPosArgs)
Robin Thellendb16d7162014-11-07 13:47:26 -08001055 argptrs, tags, err := invoker.Prepare(fs.method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -08001056 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001057 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001058 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001059 }
Todd Wang9548d852015-02-10 16:15:59 -08001060 if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
Jiri Simsa074bf362015-02-17 09:29:45 -08001061 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001062 }
1063 for ix, argptr := range argptrs {
1064 if err := fs.dec.Decode(argptr); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001065 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001066 }
1067 }
Todd Wang5739dda2014-11-16 22:44:02 -08001068 // Check application's authorization policy.
Todd Wang9548d852015-02-10 16:15:59 -08001069 if err := authorize(fs, auth); err != nil {
1070 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001071 }
Todd Wang5739dda2014-11-16 22:44:02 -08001072 // Check if the caller is permitted to view debug information.
Asim Shankar68885192014-11-26 12:48:35 -08001073 // TODO(mattr): Is access.Debug the right thing to check?
Todd Wang5739dda2014-11-16 22:44:02 -08001074 fs.allowDebug = authorize(debugContext{fs}, auth) == nil
1075 // Invoke the method.
Robin Thellendb16d7162014-11-07 13:47:26 -08001076 results, err := invoker.Invoke(fs.method, fs, argptrs)
1077 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Todd Wang9548d852015-02-10 16:15:59 -08001078 return results, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001079}
1080
Todd Wang5739dda2014-11-16 22:44:02 -08001081func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1082 // Ensure that the context gets cancelled if the flow is closed
1083 // due to a network error, or client cancellation.
1084 select {
1085 case <-fs.flow.Closed():
1086 // Here we remove the contexts channel as a deadline to the flow.
1087 // We do this to ensure clients get a consistent error when they read/write
1088 // after the flow is closed. Since the flow is already closed, it doesn't
1089 // matter that the context is also cancelled.
1090 fs.flow.SetDeadline(nil)
1091 cancel()
1092 case <-fs.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001093 }
Todd Wang5739dda2014-11-16 22:44:02 -08001094}
1095
1096// lookup returns the invoker and authorizer responsible for serving the given
1097// name and method. The suffix is stripped of any leading slashes. If it begins
1098// with ipc.DebugKeyword, we use the internal debug dispatcher to look up the
1099// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1100// value may be modified to match the actual suffix and method to use.
Todd Wangebb3b012015-02-09 21:59:05 -08001101func (fs *flowServer) lookup(suffix string, method *string) (ipc.Invoker, security.Authorizer, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001102 if naming.IsReserved(*method) {
1103 // All reserved methods are trapped and handled here, by removing the
1104 // reserved prefix and invoking them on reservedMethods. E.g. "__Glob"
1105 // invokes reservedMethods.Glob.
1106 *method = naming.StripReserved(*method)
1107 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
1108 }
1109 disp := fs.disp
1110 if naming.IsReserved(suffix) {
1111 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -07001112 }
1113 if disp != nil {
Robin Thellenda02fe8f2014-11-19 09:58:29 -08001114 obj, auth, err := disp.Lookup(suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001115 switch {
1116 case err != nil:
Todd Wang9548d852015-02-10 16:15:59 -08001117 return nil, nil, err
Todd Wang5739dda2014-11-16 22:44:02 -08001118 case obj != nil:
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001119 invoker, err := objectToInvoker(obj)
1120 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001121 return nil, nil, verror.New(verror.ErrInternal, fs.T, "invalid received object", err)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001122 }
1123 return invoker, auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001124 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001125 }
Todd Wangff73e1f2015-02-10 21:45:52 -08001126 return nil, nil, ipc.NewErrUnknownSuffix(nil, suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001127}
1128
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001129func objectToInvoker(obj interface{}) (ipc.Invoker, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001130 if obj == nil {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001131 return nil, errors.New("nil object")
Todd Wang5739dda2014-11-16 22:44:02 -08001132 }
1133 if invoker, ok := obj.(ipc.Invoker); ok {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001134 return invoker, nil
Todd Wang5739dda2014-11-16 22:44:02 -08001135 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001136 return ipc.ReflectInvoker(obj)
Todd Wang5739dda2014-11-16 22:44:02 -08001137}
1138
Todd Wang9548d852015-02-10 16:15:59 -08001139func (fs *flowServer) initSecurity(req *ipc.Request) error {
Ankurb905dae2015-03-04 12:38:20 -08001140 // LocalPrincipal is nil which means we are operating under
1141 // VCSecurityNone.
1142 if fs.flow.LocalPrincipal() == nil {
1143 return nil
1144 }
1145
Todd Wang5739dda2014-11-16 22:44:02 -08001146 // If additional credentials are provided, make them available in the context
Todd Wang5739dda2014-11-16 22:44:02 -08001147 // Detect unusable blessings now, rather then discovering they are unusable on
1148 // first use.
1149 //
1150 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1151 // the server's identity as the blessing. Figure out what we want to do about
1152 // this - should servers be able to assume that a blessing is something that
1153 // does not have the authorizations that the server's own identity has?
Ankurb905dae2015-03-04 12:38:20 -08001154 if got, want := req.GrantedBlessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1155 return verror.New(verror.ErrNoAccess, fs.T, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
Todd Wang5739dda2014-11-16 22:44:02 -08001156 }
Asim Shankarb07ec692015-02-27 23:40:44 -08001157 fs.grantedBlessings = req.GrantedBlessings
Ankurb905dae2015-03-04 12:38:20 -08001158
Asim Shankarb07ec692015-02-27 23:40:44 -08001159 var err error
1160 if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001161 // When the server can't access the blessings cache, the client is not following
1162 // protocol, so the server closes the VCs corresponding to the client endpoint.
1163 // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
1164 // of all VCs connected to the RemoteEndpoint.
1165 fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
Jiri Simsa074bf362015-02-17 09:29:45 -08001166 return verror.New(verror.ErrBadProtocol, fs.T, newErrBadBlessingsCache(fs.T, err))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001167 }
Ankurb905dae2015-03-04 12:38:20 -08001168 // Verify that the blessings sent by the client in the request have the same public
1169 // key as those sent by the client during VC establishment.
1170 if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1171 return verror.New(verror.ErrNoAccess, fs.T, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
1172 }
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001173 fs.ackBlessings = true
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001174
Asim Shankar3ad0b8a2015-02-25 00:37:21 -08001175 for _, d := range req.Discharges {
Asim Shankar08642822015-03-02 21:21:09 -08001176 fs.discharges[d.ID()] = d
Todd Wang5739dda2014-11-16 22:44:02 -08001177 }
1178 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001179}
1180
1181type acceptAllAuthorizer struct{}
1182
Matt Rosencrantz5c7ed212015-02-27 22:42:35 -08001183func (acceptAllAuthorizer) Authorize(security.Call) error {
Robin Thellendc26c32e2014-10-06 17:44:04 -07001184 return nil
1185}
1186
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001187func authorize(call ipc.ServerCall, auth security.Authorizer) error {
1188 if call.LocalPrincipal() == nil {
Todd Wang5739dda2014-11-16 22:44:02 -08001189 // LocalPrincipal is nil means that the server wanted to avoid
1190 // authentication, and thus wanted to skip authorization as well.
1191 return nil
1192 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001193 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001194 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001195 }
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001196 if err := auth.Authorize(call); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -07001197 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001198 return verror.New(verror.ErrNoAccess, call.Context(), newErrBadAuth(call.Context(), call.Suffix(), call.Method(), err))
Asim Shankara5457f02014-10-24 23:23:07 -07001199 }
1200 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001201}
1202
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001203// debugContext is a context which wraps another context but always returns
Asim Shankar68885192014-11-26 12:48:35 -08001204// the debug tag.
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001205type debugContext struct {
Matt Rosencrantz5c7ed212015-02-27 22:42:35 -08001206 ipc.ServerCall
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001207}
1208
Todd Wangb31da592015-02-20 12:50:39 -08001209func (debugContext) MethodTags() []*vdl.Value {
1210 return []*vdl.Value{vdl.ValueOf(access.Debug)}
Asim Shankar68885192014-11-26 12:48:35 -08001211}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001212
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001213// Send implements the ipc.Stream method.
1214func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001215 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001216 // The empty response header indicates what follows is a streaming result.
1217 if err := fs.enc.Encode(ipc.Response{}); err != nil {
1218 return err
1219 }
1220 return fs.enc.Encode(item)
1221}
1222
1223// Recv implements the ipc.Stream method.
1224func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001225 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001226 var req ipc.Request
1227 if err := fs.dec.Decode(&req); err != nil {
1228 return err
1229 }
1230 if req.EndStreamArgs {
1231 fs.endStreamArgs = true
1232 return io.EOF
1233 }
1234 return fs.dec.Decode(itemptr)
1235}
1236
Matt Rosencrantz5c7ed212015-02-27 22:42:35 -08001237// Implementations of ipc.ServerCall methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001238
Ankuredd74ee2015-03-04 16:38:45 -08001239func (fs *flowServer) LocalDischarges() map[string]security.Discharge {
1240 //nologcall
1241 return fs.flow.LocalDischarges()
1242}
Asim Shankar2519cc12014-11-10 21:16:53 -08001243func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001244 //nologcall
1245 return fs.discharges
1246}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001247func (fs *flowServer) Server() ipc.Server {
1248 //nologcall
1249 return fs.server
1250}
Asim Shankar0cad0832014-11-04 01:27:38 -08001251func (fs *flowServer) Timestamp() time.Time {
1252 //nologcall
1253 return fs.starttime
1254}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001255func (fs *flowServer) Method() string {
1256 //nologcall
1257 return fs.method
1258}
Todd Wangb31da592015-02-20 12:50:39 -08001259func (fs *flowServer) MethodTags() []*vdl.Value {
Asim Shankar0cad0832014-11-04 01:27:38 -08001260 //nologcall
1261 return fs.tags
1262}
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -08001263func (fs *flowServer) Context() *context.T {
Matt Rosencrantz04d197c2014-12-12 08:39:25 -08001264 return fs.T
1265}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001266
Benjamin Prosnitz9284a002015-02-23 14:57:25 -08001267func (fs *flowServer) VanadiumContext() *context.T {
1268 return fs.T
1269}
1270
Matt Rosencrantz5c7ed212015-02-27 22:42:35 -08001271// TODO(cnicolaou): remove Name from ipc.ServerCall and all of
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001272// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001273func (fs *flowServer) Name() string {
1274 //nologcall
1275 return fs.suffix
1276}
1277func (fs *flowServer) Suffix() string {
1278 //nologcall
1279 return fs.suffix
1280}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001281func (fs *flowServer) LocalPrincipal() security.Principal {
1282 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001283 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001284}
1285func (fs *flowServer) LocalBlessings() security.Blessings {
1286 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001287 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001288}
1289func (fs *flowServer) RemoteBlessings() security.Blessings {
1290 //nologcall
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001291 if !fs.clientBlessings.IsZero() {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001292 return fs.clientBlessings
1293 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001294 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001295}
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001296func (fs *flowServer) GrantedBlessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001297 //nologcall
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001298 return fs.grantedBlessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001299}
1300func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1301 //nologcall
1302 return fs.flow.LocalEndpoint()
1303}
1304func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1305 //nologcall
1306 return fs.flow.RemoteEndpoint()
1307}