blob: 3e9b57f892aaf51a172542ebb7e726ca6eb39039 [file] [log] [blame]
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07002
3import (
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08004 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005 "fmt"
6 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -07007 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -07008 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "strings"
10 "sync"
11 "time"
12
Jiri Simsa6ac95222015-02-23 16:11:49 -080013 "v.io/v23/config"
14 "v.io/v23/context"
Jiri Simsa6ac95222015-02-23 16:11:49 -080015 "v.io/v23/naming"
David Why Use Two When One Will Do Presottod424c212015-02-25 11:05:26 -080016 "v.io/v23/naming/ns"
Jiri Simsa6ac95222015-02-23 16:11:49 -080017 "v.io/v23/options"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070018 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080019 "v.io/v23/security"
20 "v.io/v23/services/security/access"
21 "v.io/v23/vdl"
22 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080023 "v.io/v23/vom"
24 "v.io/v23/vtrace"
Jiri Simsa337af232015-02-27 14:36:46 -080025 "v.io/x/lib/vlog"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070026 "v.io/x/ref/profiles/internal/rpc/stream"
Cosmos Nicolaouf889c732014-10-16 20:46:54 -070027
Matt Rosencrantz9d3278a2015-03-11 14:58:34 -070028 "v.io/x/lib/netstate"
Jiri Simsaffceefa2015-02-28 11:03:34 -080029 "v.io/x/ref/lib/stats"
Matt Rosencrantz86ba1a12015-03-09 13:19:02 -070030 "v.io/x/ref/profiles/internal/lib/publisher"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080031 inaming "v.io/x/ref/profiles/internal/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070032 "v.io/x/ref/profiles/internal/rpc/stream/vc"
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080033
Todd Wangff73e1f2015-02-10 21:45:52 -080034 // TODO(cnicolaou): finish verror2 -> verror transition, in particular
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080035 // for communicating from server to client.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036)
37
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080038// state for each requested listen address
39type listenState struct {
40 protocol, address string
41 ln stream.Listener
42 lep naming.Endpoint
43 lnerr, eperr error
44 roaming bool
45 // We keep track of all of the endpoints, the port and a copy of
46 // the original listen endpoint for use with roaming network changes.
47 ieps []*inaming.Endpoint // list of currently active eps
48 port string // port to use for creating new eps
49 protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
50}
51
52// state for each requested proxy
53type proxyState struct {
54 endpoint naming.Endpoint
Mike Burrowsdc6b3602015-02-05 15:52:12 -080055 err error
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080056}
57
58type dhcpState struct {
59 name string
60 publisher *config.Publisher
61 stream *config.Stream
62 ch chan config.Setting // channel to receive dhcp settings over
63 err error // error status.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070064 watchers map[chan<- rpc.NetworkChange]struct{}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080065}
66
Jiri Simsa5293dcb2014-05-10 09:56:38 -070067type server struct {
68 sync.Mutex
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080069 // context used by the server to make internal RPCs, error messages etc.
70 ctx *context.T
Matt Rosencrantz1094d062015-01-30 06:43:12 -080071 cancel context.CancelFunc // function to cancel the above context.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080072 state serverState // track state of the server.
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080073 streamMgr stream.Manager // stream manager to listen for new flows.
74 publisher publisher.Publisher // publisher to publish mounttable mounts.
75 listenerOpts []stream.ListenerOpt // listener opts for Listen.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080076 dhcpState *dhcpState // dhcpState, nil if not using dhcp
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -070077 principal security.Principal
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -070078 blessings security.Blessings
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080079
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080080 // maps that contain state on listeners.
81 listenState map[*listenState]struct{}
82 listeners map[stream.Listener]struct{}
83
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080084 // state of proxies keyed by the name of the proxy
85 proxies map[string]proxyState
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080086
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080087 // all endpoints generated and returned by this server
88 endpoints []naming.Endpoint
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080089
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070090 disp rpc.Dispatcher // dispatcher to serve RPCs
91 dispReserved rpc.Dispatcher // dispatcher for reserved methods
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080092 active sync.WaitGroup // active goroutines we've spawned.
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -080093 stoppedChan chan struct{} // closed when the server has been stopped.
94 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Jungho Ahn25545d32015-01-26 15:14:14 -080095 // We cache the IP networks on the device since it is not that cheap to read
96 // network interfaces through os syscall.
97 // TODO(jhahn): Add monitoring the network interface changes.
98 ipNets []*net.IPNet
David Why Use Two When One Will Do Presottod424c212015-02-25 11:05:26 -080099 ns ns.Namespace
Jungho Ahn25545d32015-01-26 15:14:14 -0800100 servesMountTable bool
Robin Thellend89e95232015-03-24 13:48:48 -0700101 isLeaf bool
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800102
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700103 // TODO(cnicolaou): add roaming stats to rpcStats
104 stats *rpcStats // stats for this server.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700105}
106
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800107type serverState int
108
109const (
110 initialized serverState = iota
111 listening
112 serving
113 publishing
114 stopping
115 stopped
116)
117
118// Simple state machine for the server implementation.
119type next map[serverState]bool
120type transitions map[serverState]next
121
122var (
123 states = transitions{
124 initialized: next{listening: true, stopping: true},
125 listening: next{listening: true, serving: true, stopping: true},
126 serving: next{publishing: true, stopping: true},
127 publishing: next{publishing: true, stopping: true},
128 stopping: next{},
129 stopped: next{},
130 }
131
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700132 externalStates = map[serverState]rpc.ServerState{
133 initialized: rpc.ServerInit,
134 listening: rpc.ServerActive,
135 serving: rpc.ServerActive,
136 publishing: rpc.ServerActive,
137 stopping: rpc.ServerStopping,
138 stopped: rpc.ServerStopped,
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800139 }
140)
141
142func (s *server) allowed(next serverState, method string) error {
143 if states[s.state][next] {
144 s.state = next
145 return nil
146 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800147 return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800148}
149
150func (s *server) isStopState() bool {
151 return s.state == stopping || s.state == stopped
152}
153
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700154var _ rpc.Server = (*server)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700155
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700156func InternalNewServer(ctx *context.T, streamMgr stream.Manager, ns ns.Namespace, client rpc.Client, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800157 ctx, cancel := context.WithRootCancel(ctx)
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800158 ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700159 statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700160 s := &server{
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800161 ctx: ctx,
162 cancel: cancel,
163 streamMgr: streamMgr,
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700164 principal: principal,
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800165 publisher: publisher.New(ctx, ns, publishPeriod),
166 listenState: make(map[*listenState]struct{}),
167 listeners: make(map[stream.Listener]struct{}),
168 proxies: make(map[string]proxyState),
169 stoppedChan: make(chan struct{}),
170 ipNets: ipNetworks(),
171 ns: ns,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700172 stats: newRPCStats(statsPrefix),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700173 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700174 var (
175 dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
176 securityLevel options.SecurityLevel
177 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700178 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700179 switch opt := opt.(type) {
180 case stream.ListenerOpt:
181 // Collect all ServerOpts that are also ListenerOpts.
182 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800183 switch opt := opt.(type) {
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800184 case vc.DischargeExpiryBuffer:
185 dischargeExpiryBuffer = time.Duration(opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800186 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700187 case options.ServerBlessings:
188 s.blessings = opt.Blessings
Asim Shankarcc044212014-10-15 23:25:26 -0700189 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700190 s.servesMountTable = bool(opt)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800191 case ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800192 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800193 case PreferredServerResolveProtocols:
194 s.preferredProtocols = []string(opt)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700195 case options.SecurityLevel:
196 securityLevel = opt
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700197 }
198 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700199 if s.blessings.IsZero() && principal != nil {
200 s.blessings = principal.BlessingStore().Default()
201 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700202 if securityLevel == options.SecurityNone {
203 s.principal = nil
204 s.blessings = security.Blessings{}
205 s.dispReserved = nil
206 }
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800207 // Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
208 // the discharges by the time the VC asks for them.`
209 dc := InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800210 s.listenerOpts = append(s.listenerOpts, dc)
Benjamin Prosnitz9284a002015-02-23 14:57:25 -0800211 s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
Bogdan Capritae7376312014-11-10 13:13:17 -0800212 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
Asim Shankar2bf7b1e2015-02-27 00:45:12 -0800213 // TODO(caprita): revist printing the blessings with %s, and
214 // instead expose them as a list.
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700215 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700216 if principal != nil {
Bogdan Capritae7376312014-11-10 13:13:17 -0800217 stats.NewStringFunc(blessingsStatsName, func() string {
218 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
219 })
220 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700221 return s, nil
222}
223
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700224func (s *server) Status() rpc.ServerStatus {
225 status := rpc.ServerStatus{}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700226 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700227 s.Lock()
228 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800229 status.State = externalStates[s.state]
230 status.ServesMountTable = s.servesMountTable
231 status.Mounts = s.publisher.Status()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800232 status.Endpoints = []naming.Endpoint{}
233 for ls, _ := range s.listenState {
234 if ls.eperr != nil {
235 status.Errors = append(status.Errors, ls.eperr)
236 }
237 if ls.lnerr != nil {
238 status.Errors = append(status.Errors, ls.lnerr)
239 }
240 for _, iep := range ls.ieps {
241 status.Endpoints = append(status.Endpoints, iep)
242 }
243 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700244 status.Proxies = make([]rpc.ProxyStatus, 0, len(s.proxies))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800245 for k, v := range s.proxies {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700246 status.Proxies = append(status.Proxies, rpc.ProxyStatus{k, v.endpoint, v.err})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700247 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800248 return status
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700249}
250
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700251func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800252 defer vlog.LogCall()()
253 s.Lock()
254 defer s.Unlock()
255 if s.dhcpState != nil {
256 s.dhcpState.watchers[ch] = struct{}{}
257 }
258}
259
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700260func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800261 defer vlog.LogCall()()
262 s.Lock()
263 defer s.Unlock()
264 if s.dhcpState != nil {
265 delete(s.dhcpState.watchers, ch)
266 }
267}
268
Robin Thellend92b65a42014-12-17 14:30:16 -0800269// resolveToEndpoint resolves an object name or address to an endpoint.
270func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800271 var resolved *naming.MountEntry
272 var err error
Asim Shankardee311d2014-08-01 17:41:31 -0700273 if s.ns != nil {
Asim Shankaraae31802015-01-22 11:59:42 -0800274 if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700275 return "", err
276 }
277 } else {
Asim Shankaraae31802015-01-22 11:59:42 -0800278 // Fake a namespace resolution
279 resolved = &naming.MountEntry{Servers: []naming.MountedServer{
280 {Server: address},
281 }}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700282 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800283 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800284 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800285 return "", err
286 }
Asim Shankaraae31802015-01-22 11:59:42 -0800287 for _, n := range resolved.Names() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700288 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800289 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700290 continue
291 }
Asim Shankaraae31802015-01-22 11:59:42 -0800292 if ep, err := inaming.NewEndpoint(address); err == nil {
293 return ep.String(), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700294 }
295 }
Asim Shankardee311d2014-08-01 17:41:31 -0700296 return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700297}
298
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800299// getPossbileAddrs returns an appropriate set of addresses that could be used
300// to contact the supplied protocol, host, port parameters using the supplied
301// chooser function. It returns an indication of whether the supplied address
302// was fully specified or not, returning false if the address was fully
303// specified, and true if it was not.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700304func getPossibleAddrs(protocol, host, port string, chooser rpc.AddressChooser) ([]rpc.Address, bool, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800305
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800306 ip := net.ParseIP(host)
307 if ip == nil {
308 return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
309 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800310
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700311 addrFromIP := func(ip net.IP) rpc.Address {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800312 return &netstate.AddrIfc{
313 Addr: &net.IPAddr{IP: ip},
314 }
315 }
316
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800317 if ip.IsUnspecified() {
318 if chooser != nil {
319 // Need to find a usable IP address since the call to listen
320 // didn't specify one.
321 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800322 a, err := chooser(protocol, addrs)
323 if err == nil && len(a) > 0 {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800324 return a, true, nil
325 }
326 }
327 }
328 // We don't have a chooser, so we just return the address the
329 // underlying system has chosen.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700330 return []rpc.Address{addrFromIP(ip)}, true, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800331 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700332 return []rpc.Address{addrFromIP(ip)}, false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800333}
334
335// createEndpoints creates appropriate inaming.Endpoint instances for
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800336// all of the externally accessible network addresses that can be used
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800337// to reach this server.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700338func (s *server) createEndpoints(lep naming.Endpoint, chooser rpc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800339 iep, ok := lep.(*inaming.Endpoint)
340 if !ok {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800341 return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800342 }
343 if !strings.HasPrefix(iep.Protocol, "tcp") &&
344 !strings.HasPrefix(iep.Protocol, "ws") {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800345 // If not tcp, ws, or wsh, just return the endpoint we were given.
346 return []*inaming.Endpoint{iep}, "", false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800347 }
348
349 host, port, err := net.SplitHostPort(iep.Address)
350 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800351 return nil, "", false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800352 }
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800353 addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800354 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800355 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800356 }
357 ieps := make([]*inaming.Endpoint, 0, len(addrs))
358 for _, addr := range addrs {
359 n, err := inaming.NewEndpoint(lep.String())
360 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800361 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800362 }
363 n.IsMountTable = s.servesMountTable
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800364 n.Address = net.JoinHostPort(addr.Address().String(), port)
365 ieps = append(ieps, n)
366 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800367 return ieps, port, unspecified, nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800368}
369
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700370func (s *server) Listen(listenSpec rpc.ListenSpec) ([]naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700371 defer vlog.LogCall()()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800372 useProxy := len(listenSpec.Proxy) > 0
373 if !useProxy && len(listenSpec.Addrs) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800374 return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800375 }
376
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700377 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800378 defer s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800379
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800380 if err := s.allowed(listening, "Listen"); err != nil {
381 return nil, err
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700382 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700383
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800384 // Start the proxy as early as possible, ignore duplicate requests
385 // for the same proxy.
386 if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800387 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800388 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800389 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800390 s.proxyListenLoop(listenSpec.Proxy)
391 s.active.Done()
392 }()
393 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700394
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800395 roaming := false
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800396 lnState := make([]*listenState, 0, len(listenSpec.Addrs))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800397 for _, addr := range listenSpec.Addrs {
398 if len(addr.Address) > 0 {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800399 // Listen if we have a local address to listen on.
400 ls := &listenState{
401 protocol: addr.Protocol,
402 address: addr.Address,
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800403 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700404 ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.principal, s.blessings, s.listenerOpts...)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800405 lnState = append(lnState, ls)
406 if ls.lnerr != nil {
Asim Shankar7171a252015-03-07 14:41:40 -0800407 vlog.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800408 continue
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800409 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800410 ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
411 if ls.roaming && ls.eperr == nil {
412 ls.protoIEP = *ls.lep.(*inaming.Endpoint)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800413 roaming = true
414 }
415 }
416 }
417
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800418 found := false
419 for _, ls := range lnState {
420 if ls.ln != nil {
421 found = true
422 break
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800423 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800424 }
425 if !found && !useProxy {
Jiri Simsa074bf362015-02-17 09:29:45 -0800426 return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800427 }
428
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800429 if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
430 // Create a dhcp listener if we haven't already done so.
431 dhcp := &dhcpState{
432 name: listenSpec.StreamName,
433 publisher: listenSpec.StreamPublisher,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700434 watchers: make(map[chan<- rpc.NetworkChange]struct{}),
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800435 }
436 s.dhcpState = dhcp
437 dhcp.ch = make(chan config.Setting, 10)
438 dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
439 if dhcp.err == nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800440 // We have a goroutine to listen for dhcp changes.
441 s.active.Add(1)
442 go func() {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800443 s.dhcpLoop(dhcp.ch)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800444 s.active.Done()
445 }()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800446 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800447 }
448
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800449 eps := make([]naming.Endpoint, 0, 10)
450 for _, ls := range lnState {
451 s.listenState[ls] = struct{}{}
452 if ls.ln != nil {
453 // We have a goroutine per listener to accept new flows.
454 // Each flow is served from its own goroutine.
455 s.active.Add(1)
456 go func(ln stream.Listener, ep naming.Endpoint) {
457 s.listenLoop(ln, ep)
458 s.active.Done()
459 }(ls.ln, ls.lep)
460 }
461
462 for _, iep := range ls.ieps {
Robin Thellend89e95232015-03-24 13:48:48 -0700463 s.publisher.AddServer(iep.String())
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800464 eps = append(eps, iep)
465 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800466 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800467
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800468 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700469}
470
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800471func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800472 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800473 if err != nil {
474 return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
475 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700476 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, s.listenerOpts...)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800477 if err != nil {
478 return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
479 }
480 iep, ok := ep.(*inaming.Endpoint)
481 if !ok {
482 ln.Close()
483 return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
484 }
485 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800486 s.proxies[proxy] = proxyState{iep, nil}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800487 s.Unlock()
Robin Thellende22920e2015-02-05 17:15:50 -0800488 iep.IsMountTable = s.servesMountTable
Robin Thellend89e95232015-03-24 13:48:48 -0700489 s.publisher.AddServer(iep.String())
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800490 return iep, ln, nil
491}
492
493func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700494 const (
495 min = 5 * time.Millisecond
496 max = 5 * time.Minute
497 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800498
499 iep, ln, err := s.reconnectAndPublishProxy(proxy)
500 if err != nil {
501 vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
502 }
503 // the initial connection maybe have failed, but we enter the retry
504 // loop anyway so that we will continue to try and connect to the
505 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800506 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800507 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800508 s.Unlock()
509 return
510 }
511 s.Unlock()
512
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700513 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800514 if ln != nil && iep != nil {
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800515 err := s.listenLoop(ln, iep)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800516 // The listener is done, so:
517 // (1) Unpublish its name
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800518 s.publisher.RemoveServer(iep.String())
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800519 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800520 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800521 s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800522 } else {
Asim Shankar7171a252015-03-07 14:41:40 -0800523 // err will be nil if we're stopping.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800524 s.proxies[proxy] = proxyState{iep, nil}
525 s.Unlock()
526 return
527 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800528 s.Unlock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800529 }
530
531 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800532 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800533 s.Unlock()
534 return
535 }
536 s.Unlock()
537
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700538 // (2) Reconnect to the proxy unless the server has been stopped
539 backoff := min
540 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800541 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700542 select {
543 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700544 if backoff = backoff * 2; backoff > max {
545 backoff = max
546 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700547 case <-s.stoppedChan:
548 return
549 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800550 // (3) reconnect, publish new address
551 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
552 vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
553 } else {
554 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
555 break
556 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700557 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700558 }
559}
560
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800561// addListener adds the supplied listener taking care to
562// check to see if we're already stopping. It returns true
563// if the listener was added.
564func (s *server) addListener(ln stream.Listener) bool {
565 s.Lock()
566 defer s.Unlock()
567 if s.isStopState() {
568 return false
569 }
570 s.listeners[ln] = struct{}{}
571 return true
572}
573
574// rmListener removes the supplied listener taking care to
575// check if we're already stopping. It returns true if the
576// listener was removed.
577func (s *server) rmListener(ln stream.Listener) bool {
578 s.Lock()
579 defer s.Unlock()
580 if s.isStopState() {
581 return false
582 }
583 delete(s.listeners, ln)
584 return true
585}
586
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800587func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700588 defer vlog.VI(1).Infof("rpc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800589 var calls sync.WaitGroup
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800590
591 if !s.addListener(ln) {
592 // We're stopping.
593 return nil
594 }
595
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700596 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800597 calls.Wait()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800598 s.rmListener(ln)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700599 }()
600 for {
601 flow, err := ln.Accept()
602 if err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700603 vlog.VI(10).Infof("rpc: Accept on %v failed: %v", ep, err)
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800604 return err
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700605 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800606 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700607 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800608 defer calls.Done()
609 fs, err := newFlowServer(flow, s)
610 if err != nil {
Todd Wang03fee962014-12-08 19:33:10 -0800611 vlog.Errorf("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800612 return
613 }
614 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800615 // TODO(caprita): Logging errors here is too spammy. For example, "not
616 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800617 // TODO(cnicolaou): revisit this when verror2 transition is
618 // done.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800619 if err != io.EOF {
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800620 vlog.VI(2).Infof("Flow serve on %v failed: %v", ep, err)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800621 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700622 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700623 }(flow)
624 }
625}
626
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800627func (s *server) dhcpLoop(ch chan config.Setting) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700628 defer vlog.VI(1).Infof("rpc: Stopped listen for dhcp changes")
629 vlog.VI(2).Infof("rpc: dhcp loop")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800630 for setting := range ch {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700631 if setting == nil {
632 return
633 }
634 switch v := setting.Value().(type) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700635 case []rpc.Address:
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700636 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800637 if s.isStopState() {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700638 s.Unlock()
639 return
640 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800641 var err error
642 var changed []naming.Endpoint
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700643 switch setting.Name() {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700644 case rpc.NewAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800645 changed = s.addAddresses(v)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700646 case rpc.RmAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800647 changed, err = s.removeAddresses(v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700648 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700649 change := rpc.NetworkChange{
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800650 Time: time.Now(),
651 State: externalStates[s.state],
652 Setting: setting,
653 Changed: changed,
654 Error: err,
655 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700656 vlog.VI(2).Infof("rpc: dhcp: change %v", change)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800657 for ch, _ := range s.dhcpState.watchers {
658 select {
659 case ch <- change:
660 default:
661 }
662 }
663 s.Unlock()
664 default:
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700665 vlog.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700666 }
667 }
668}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800669
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700670func getHost(address rpc.Address) string {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800671 host, _, err := net.SplitHostPort(address.Address().String())
672 if err == nil {
673 return host
674 }
675 return address.Address().String()
676
677}
678
679// Remove all endpoints that have the same host address as the supplied
680// address parameter.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700681func (s *server) removeAddresses(addresses []rpc.Address) ([]naming.Endpoint, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800682 var removed []naming.Endpoint
683 for _, address := range addresses {
684 host := getHost(address)
685 for ls, _ := range s.listenState {
686 if ls != nil && ls.roaming && len(ls.ieps) > 0 {
687 remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
688 for _, iep := range ls.ieps {
689 lnHost, _, err := net.SplitHostPort(iep.Address)
690 if err != nil {
691 lnHost = iep.Address
692 }
693 if lnHost == host {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700694 vlog.VI(2).Infof("rpc: dhcp removing: %s", iep)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800695 removed = append(removed, iep)
696 s.publisher.RemoveServer(iep.String())
697 continue
698 }
699 remaining = append(remaining, iep)
700 }
701 ls.ieps = remaining
702 }
703 }
704 }
705 return removed, nil
706}
707
708// Add new endpoints for the new address. There is no way to know with
709// 100% confidence which new endpoints to publish without shutting down
710// all network connections and reinitializing everything from scratch.
711// Instead, we find all roaming listeners with at least one endpoint
712// and create a new endpoint with the same port as the existing ones
713// but with the new address supplied to us to by the dhcp code. As
714// an additional safeguard we reject the new address if it is not
715// externally accessible.
716// This places the onus on the dhcp/roaming code that sends us addresses
717// to ensure that those addresses are externally reachable.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700718func (s *server) addAddresses(addresses []rpc.Address) []naming.Endpoint {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800719 var added []naming.Endpoint
720 for _, address := range addresses {
721 if !netstate.IsAccessibleIP(address) {
722 return added
723 }
724 host := getHost(address)
725 for ls, _ := range s.listenState {
726 if ls != nil && ls.roaming {
727 niep := ls.protoIEP
728 niep.Address = net.JoinHostPort(host, ls.port)
729 ls.ieps = append(ls.ieps, &niep)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700730 vlog.VI(2).Infof("rpc: dhcp adding: %s", niep)
Robin Thellend89e95232015-03-24 13:48:48 -0700731 s.publisher.AddServer(niep.String())
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800732 added = append(added, &niep)
733 }
734 }
735 }
736 return added
737}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700738
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800739type leafDispatcher struct {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700740 invoker rpc.Invoker
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800741 auth security.Authorizer
742}
743
744func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
745 if suffix != "" {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700746 return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800747 }
748 return d.invoker, d.auth, nil
749}
750
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800751func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800752 defer vlog.LogCall()()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800753 if obj == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800754 return verror.New(verror.ErrBadArg, s.ctx, "nil object")
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800755 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800756 invoker, err := objectToInvoker(obj)
757 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800758 return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800759 }
Robin Thellend89e95232015-03-24 13:48:48 -0700760 s.isLeaf = true
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800761 return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800762}
763
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700764func (s *server) ServeDispatcher(name string, disp rpc.Dispatcher) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800765 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800766 if disp == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800767 return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700768 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800769 s.Lock()
770 defer s.Unlock()
771 if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
772 return err
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700773 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800774 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800775 s.disp = disp
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700776 if len(name) > 0 {
Robin Thellend89e95232015-03-24 13:48:48 -0700777 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700778 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700779 return nil
780}
781
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800782func (s *server) AddName(name string) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800783 defer vlog.LogCall()()
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800784 if len(name) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800785 return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800786 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800787 s.Lock()
788 defer s.Unlock()
789 if err := s.allowed(publishing, "AddName"); err != nil {
790 return err
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800791 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800792 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Robin Thellend89e95232015-03-24 13:48:48 -0700793 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800794 return nil
795}
796
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800797func (s *server) RemoveName(name string) {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800798 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800799 s.Lock()
800 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800801 if err := s.allowed(publishing, "RemoveName"); err != nil {
802 return
803 }
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800804 vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800805 s.publisher.RemoveName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800806}
807
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700808func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700809 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700810 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800811 if s.isStopState() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700812 s.Unlock()
813 return nil
814 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800815 s.state = stopping
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700816 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817 s.Unlock()
818
Robin Thellenddf428232014-10-06 12:50:44 -0700819 // Delete the stats object.
820 s.stats.stop()
821
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700822 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
823 // server lock, since publisher is safe for concurrent access.
824
825 // Stop the publisher, which triggers unmounting of published names.
826 s.publisher.Stop()
827 // Wait for the publisher to be done unmounting before we can proceed to
828 // close the listeners (to minimize the number of mounted names pointing
829 // to endpoint that are no longer serving).
830 //
831 // TODO(caprita): See if make sense to fail fast on rejecting
832 // connections once listeners are closed, and parallelize the publisher
833 // and listener shutdown.
834 s.publisher.WaitForStop()
835
836 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800837
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700838 // Close all listeners. No new flows will be accepted, while in-flight
839 // flows will continue until they terminate naturally.
840 nListeners := len(s.listeners)
841 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700842
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800843 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700844 go func(ln stream.Listener) {
845 errCh <- ln.Close()
846 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800847 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800848
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800849 drain := func(ch chan config.Setting) {
850 for {
851 select {
852 case v := <-ch:
853 if v == nil {
854 return
855 }
856 default:
857 close(ch)
858 return
859 }
860 }
861 }
862
863 if dhcp := s.dhcpState; dhcp != nil {
Cosmos Nicolaouaceb8d92015-02-05 20:44:02 -0800864 // TODO(cnicolaou,caprita): investigate not having to close and drain
865 // the channel here. It's a little awkward right now since we have to
866 // be careful to not close the channel in two places, i.e. here and
867 // and from the publisher's Shutdown method.
868 if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
869 drain(dhcp.ch)
870 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700871 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800872
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700873 s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800874
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700875 var firstErr error
876 for i := 0; i < nListeners; i++ {
877 if err := <-errCh; err != nil && firstErr == nil {
878 firstErr = err
879 }
880 }
881 // At this point, we are guaranteed that no new requests are going to be
882 // accepted.
883
884 // Wait for the publisher and active listener + flows to finish.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800885 done := make(chan struct{}, 1)
886 go func() { s.active.Wait(); done <- struct{}{} }()
887
888 select {
889 case <-done:
890 case <-time.After(5 * time.Minute):
891 vlog.Errorf("Listener Close Error: %v", firstErr)
Bogdan Caprita2d04f0e2015-03-13 15:39:13 -0700892 vlog.Errorf("Timedout waiting for goroutines to stop: listeners: %d (currently: %d)", nListeners, len(s.listeners))
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800893 for ln, _ := range s.listeners {
894 vlog.Errorf("Listener: %p", ln)
895 }
896 for ls, _ := range s.listenState {
897 vlog.Errorf("ListenState: %v", ls)
898 }
899 <-done
900 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800901
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700902 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800903 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700904 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800905 if firstErr != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800906 return verror.New(verror.ErrInternal, s.ctx, firstErr)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800907 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800908 s.state = stopped
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800909 s.cancel()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800910 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700911}
912
913// flowServer implements the RPC server-side protocol for a single RPC, over a
914// flow that's already connected to the client.
915type flowServer struct {
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -0800916 *context.T
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700917 server *server // rpc.Server that this flow server belongs to
918 disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
Todd Wang3425a902015-01-21 18:43:59 -0800919 dec *vom.Decoder // to decode requests and args from the client
920 enc *vom.Encoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -0800921 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700922
Asim Shankar220a0152014-10-30 21:21:09 -0700923 // Fields filled in during the server invocation.
Suharsh Sivakumar380bf342015-02-27 15:38:27 -0800924 clientBlessings security.Blessings
925 ackBlessings bool
926 grantedBlessings security.Blessings
927 method, suffix string
928 tags []*vdl.Value
929 discharges map[string]security.Discharge
930 starttime time.Time
931 endStreamArgs bool // are the stream args at EOF?
932 allowDebug bool // true if the caller is permitted to view debug information.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700933}
934
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700935var _ rpc.Stream = (*flowServer)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700936
Todd Wang34ed4c62014-11-26 15:15:52 -0800937func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700938 server.Lock()
939 disp := server.disp
940 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700941
Todd Wang34ed4c62014-11-26 15:15:52 -0800942 fs := &flowServer{
943 T: server.ctx,
944 server: server,
945 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -0800946 flow: flow,
947 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700948 }
Matt Rosencrantz232c1ba2015-03-18 10:57:41 -0700949 // Attach the flow server to fs.T (the embedded *context.T) to act
950 // as a security.Call.
Matt Rosencrantz250558f2015-03-17 11:37:31 -0700951 fs.T = security.SetCall(fs.T, fs)
Todd Wangf519f8f2015-01-21 10:07:41 -0800952 var err error
Todd Wang3425a902015-01-21 18:43:59 -0800953 if fs.dec, err = vom.NewDecoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800954 flow.Close()
955 return nil, err
956 }
Todd Wang8e17bff2015-02-18 11:18:56 -0800957 if fs.enc, err = vom.NewEncoder(flow); err != nil {
Todd Wangf519f8f2015-01-21 10:07:41 -0800958 flow.Close()
959 return nil, err
Todd Wang34ed4c62014-11-26 15:15:52 -0800960 }
961 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700962}
963
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700964func (fs *flowServer) serve() error {
965 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -0700966
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700967 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700968
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800969 vtrace.GetSpan(fs.T).Finish()
Matt Rosencrantz1fa32772014-10-28 11:31:46 -0700970
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700971 var traceResponse vtrace.Response
972 if fs.allowDebug {
Matt Rosencrantz2803fe92015-03-09 15:26:32 -0700973 traceResponse = vtrace.GetResponse(fs.T)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700974 }
975
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700976 // Respond to the client with the response header and positional results.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700977 response := rpc.Response{
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700978 Error: err,
979 EndStreamResults: true,
980 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700981 TraceResponse: traceResponse,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -0800982 AckBlessings: fs.ackBlessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700983 }
984 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800985 if err == io.EOF {
986 return err
987 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700988 return fmt.Errorf("rpc: response encoding failed: %v", err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700989 }
990 if response.Error != nil {
991 return response.Error
992 }
993 for ix, res := range results {
Todd Wangf519f8f2015-01-21 10:07:41 -0800994 if err := fs.enc.Encode(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800995 if err == io.EOF {
996 return err
997 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700998 return fmt.Errorf("rpc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700999 }
1000 }
1001 // TODO(ashankar): Should unread data from the flow be drained?
1002 //
1003 // Reason to do so:
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -07001004 // The common stream.Flow implementation (v.io/x/ref/profiles/internal/rpc/stream/vc/reader.go)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001005 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
1006 // slices will not be returned to the pool leading to possibly increased memory usage.
1007 //
1008 // Reason to not do so:
1009 // Draining here will conflict with any Reads on the flow in a separate goroutine
1010 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
1011 //
1012 // For now, go with the reason to not do so as having unread data in the stream
1013 // should be a rare case.
1014 return nil
1015}
1016
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001017func (fs *flowServer) readRPCRequest() (*rpc.Request, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001018 // Set a default timeout before reading from the flow. Without this timeout,
1019 // a client that sends no request or a partial request will retain the flow
1020 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -07001021 initTimer := newTimer(defaultCallTimeout)
1022 defer initTimer.Stop()
1023 fs.flow.SetDeadline(initTimer.C)
1024
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001025 // Decode the initial request.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001026 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001027 if err := fs.dec.Decode(&req); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001028 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadRequest(fs.T, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001029 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001030 return &req, nil
1031}
1032
Todd Wang9548d852015-02-10 16:15:59 -08001033func (fs *flowServer) processRequest() ([]interface{}, error) {
Asim Shankar0cad0832014-11-04 01:27:38 -08001034 fs.starttime = time.Now()
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001035 req, err := fs.readRPCRequest()
Todd Wang9548d852015-02-10 16:15:59 -08001036 if err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001037 // We don't know what the rpc call was supposed to be, but we'll create
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001038 // a placeholder span so we can capture annotations.
Matt Rosencrantz5f98d942015-01-08 13:48:30 -08001039 fs.T, _ = vtrace.SetNewSpan(fs.T, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
Todd Wang9548d852015-02-10 16:15:59 -08001040 return nil, err
Matt Rosencrantz86897932014-10-02 09:34:34 -07001041 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001042 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -08001043 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -07001044
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001045 // TODO(mattr): Currently this allows users to trigger trace collection
1046 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -08001047 // results later. This might be considered a DOS vector.
1048 spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
Matt Rosencrantz18da0372015-02-12 16:04:56 -08001049 fs.T, _ = vtrace.SetContinuedTrace(fs.T, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001050
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001051 var cancel context.CancelFunc
Todd Wangf6a06882015-02-27 17:38:01 -08001052 if !req.Deadline.IsZero() {
1053 fs.T, cancel = context.WithDeadline(fs.T, req.Deadline.Time)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001054 } else {
Matt Rosencrantz89445a42015-01-05 13:32:37 -08001055 fs.T, cancel = context.WithCancel(fs.T)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001056 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001057 fs.flow.SetDeadline(fs.Done())
Todd Wang5739dda2014-11-16 22:44:02 -08001058 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001059
Todd Wang5739dda2014-11-16 22:44:02 -08001060 // Initialize security: blessings, discharges, etc.
Todd Wang9548d852015-02-10 16:15:59 -08001061 if err := fs.initSecurity(req); err != nil {
1062 return nil, err
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001063 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001064 // Lookup the invoker.
Todd Wangebb3b012015-02-09 21:59:05 -08001065 invoker, auth, err := fs.lookup(fs.suffix, &fs.method)
1066 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001067 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001068 }
1069 // Prepare invoker and decode args.
1070 numArgs := int(req.NumPosArgs)
Robin Thellendb16d7162014-11-07 13:47:26 -08001071 argptrs, tags, err := invoker.Prepare(fs.method, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -08001072 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001073 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001074 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001075 }
Todd Wang9548d852015-02-10 16:15:59 -08001076 if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
Jiri Simsa074bf362015-02-17 09:29:45 -08001077 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadNumInputArgs(fs.T, fs.suffix, fs.method, called, want))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001078 }
1079 for ix, argptr := range argptrs {
1080 if err := fs.dec.Decode(argptr); err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001081 return nil, verror.New(verror.ErrBadProtocol, fs.T, newErrBadInputArg(fs.T, fs.suffix, fs.method, uint64(ix), err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001082 }
1083 }
Todd Wang5739dda2014-11-16 22:44:02 -08001084 // Check application's authorization policy.
Todd Wang9548d852015-02-10 16:15:59 -08001085 if err := authorize(fs, auth); err != nil {
1086 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001087 }
Todd Wang5739dda2014-11-16 22:44:02 -08001088 // Check if the caller is permitted to view debug information.
Asim Shankar68885192014-11-26 12:48:35 -08001089 // TODO(mattr): Is access.Debug the right thing to check?
Todd Wang5739dda2014-11-16 22:44:02 -08001090 fs.allowDebug = authorize(debugContext{fs}, auth) == nil
1091 // Invoke the method.
Robin Thellendb16d7162014-11-07 13:47:26 -08001092 results, err := invoker.Invoke(fs.method, fs, argptrs)
1093 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Todd Wang9548d852015-02-10 16:15:59 -08001094 return results, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001095}
1096
Todd Wang5739dda2014-11-16 22:44:02 -08001097func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1098 // Ensure that the context gets cancelled if the flow is closed
1099 // due to a network error, or client cancellation.
1100 select {
1101 case <-fs.flow.Closed():
1102 // Here we remove the contexts channel as a deadline to the flow.
1103 // We do this to ensure clients get a consistent error when they read/write
1104 // after the flow is closed. Since the flow is already closed, it doesn't
1105 // matter that the context is also cancelled.
1106 fs.flow.SetDeadline(nil)
1107 cancel()
1108 case <-fs.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001109 }
Todd Wang5739dda2014-11-16 22:44:02 -08001110}
1111
1112// lookup returns the invoker and authorizer responsible for serving the given
1113// name and method. The suffix is stripped of any leading slashes. If it begins
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001114// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
Todd Wang5739dda2014-11-16 22:44:02 -08001115// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1116// value may be modified to match the actual suffix and method to use.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001117func (fs *flowServer) lookup(suffix string, method *string) (rpc.Invoker, security.Authorizer, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001118 if naming.IsReserved(*method) {
1119 // All reserved methods are trapped and handled here, by removing the
1120 // reserved prefix and invoking them on reservedMethods. E.g. "__Glob"
1121 // invokes reservedMethods.Glob.
1122 *method = naming.StripReserved(*method)
1123 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
1124 }
1125 disp := fs.disp
1126 if naming.IsReserved(suffix) {
1127 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -07001128 }
1129 if disp != nil {
Robin Thellenda02fe8f2014-11-19 09:58:29 -08001130 obj, auth, err := disp.Lookup(suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001131 switch {
1132 case err != nil:
Todd Wang9548d852015-02-10 16:15:59 -08001133 return nil, nil, err
Todd Wang5739dda2014-11-16 22:44:02 -08001134 case obj != nil:
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001135 invoker, err := objectToInvoker(obj)
1136 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001137 return nil, nil, verror.New(verror.ErrInternal, fs.T, "invalid received object", err)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001138 }
1139 return invoker, auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001140 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001141 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001142 return nil, nil, rpc.NewErrUnknownSuffix(nil, suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001143}
1144
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001145func objectToInvoker(obj interface{}) (rpc.Invoker, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001146 if obj == nil {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001147 return nil, errors.New("nil object")
Todd Wang5739dda2014-11-16 22:44:02 -08001148 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001149 if invoker, ok := obj.(rpc.Invoker); ok {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001150 return invoker, nil
Todd Wang5739dda2014-11-16 22:44:02 -08001151 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001152 return rpc.ReflectInvoker(obj)
Todd Wang5739dda2014-11-16 22:44:02 -08001153}
1154
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001155func (fs *flowServer) initSecurity(req *rpc.Request) error {
Ankurb905dae2015-03-04 12:38:20 -08001156 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001157 // SecurityNone.
Ankurb905dae2015-03-04 12:38:20 -08001158 if fs.flow.LocalPrincipal() == nil {
1159 return nil
1160 }
1161
Todd Wang5739dda2014-11-16 22:44:02 -08001162 // If additional credentials are provided, make them available in the context
Todd Wang5739dda2014-11-16 22:44:02 -08001163 // Detect unusable blessings now, rather then discovering they are unusable on
1164 // first use.
1165 //
1166 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1167 // the server's identity as the blessing. Figure out what we want to do about
1168 // this - should servers be able to assume that a blessing is something that
1169 // does not have the authorizations that the server's own identity has?
Ankurb905dae2015-03-04 12:38:20 -08001170 if got, want := req.GrantedBlessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1171 return verror.New(verror.ErrNoAccess, fs.T, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
Todd Wang5739dda2014-11-16 22:44:02 -08001172 }
Asim Shankarb07ec692015-02-27 23:40:44 -08001173 fs.grantedBlessings = req.GrantedBlessings
Ankurb905dae2015-03-04 12:38:20 -08001174
Asim Shankarb07ec692015-02-27 23:40:44 -08001175 var err error
1176 if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001177 // When the server can't access the blessings cache, the client is not following
1178 // protocol, so the server closes the VCs corresponding to the client endpoint.
1179 // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
1180 // of all VCs connected to the RemoteEndpoint.
1181 fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
Jiri Simsa074bf362015-02-17 09:29:45 -08001182 return verror.New(verror.ErrBadProtocol, fs.T, newErrBadBlessingsCache(fs.T, err))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001183 }
Ankurb905dae2015-03-04 12:38:20 -08001184 // Verify that the blessings sent by the client in the request have the same public
1185 // key as those sent by the client during VC establishment.
1186 if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1187 return verror.New(verror.ErrNoAccess, fs.T, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
1188 }
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001189 fs.ackBlessings = true
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001190
Asim Shankar3ad0b8a2015-02-25 00:37:21 -08001191 for _, d := range req.Discharges {
Asim Shankar08642822015-03-02 21:21:09 -08001192 fs.discharges[d.ID()] = d
Todd Wang5739dda2014-11-16 22:44:02 -08001193 }
1194 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001195}
1196
1197type acceptAllAuthorizer struct{}
1198
Matt Rosencrantz250558f2015-03-17 11:37:31 -07001199func (acceptAllAuthorizer) Authorize(*context.T) error {
Robin Thellendc26c32e2014-10-06 17:44:04 -07001200 return nil
1201}
1202
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001203func authorize(call rpc.ServerCall, auth security.Authorizer) error {
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001204 if call.LocalPrincipal() == nil {
Todd Wang5739dda2014-11-16 22:44:02 -08001205 // LocalPrincipal is nil means that the server wanted to avoid
1206 // authentication, and thus wanted to skip authorization as well.
1207 return nil
1208 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001209 if auth == nil {
Asim Shankar0c73fbf2014-10-31 15:34:02 -07001210 auth = defaultAuthorizer{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001211 }
Matt Rosencrantz250558f2015-03-17 11:37:31 -07001212 ctx := call.Context()
1213 if err := auth.Authorize(ctx); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -07001214 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Matt Rosencrantz250558f2015-03-17 11:37:31 -07001215 return verror.New(verror.ErrNoAccess, ctx, newErrBadAuth(ctx, call.Suffix(), call.Method(), err))
Asim Shankara5457f02014-10-24 23:23:07 -07001216 }
1217 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001218}
1219
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001220// debugContext is a context which wraps another context but always returns
Asim Shankar68885192014-11-26 12:48:35 -08001221// the debug tag.
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001222type debugContext struct {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001223 rpc.ServerCall
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001224}
1225
Todd Wangb31da592015-02-20 12:50:39 -08001226func (debugContext) MethodTags() []*vdl.Value {
1227 return []*vdl.Value{vdl.ValueOf(access.Debug)}
Asim Shankar68885192014-11-26 12:48:35 -08001228}
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001229
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001230// Send implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001231func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001232 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001233 // The empty response header indicates what follows is a streaming result.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001234 if err := fs.enc.Encode(rpc.Response{}); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001235 return err
1236 }
1237 return fs.enc.Encode(item)
1238}
1239
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001240// Recv implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001241func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001242 defer vlog.LogCall()()
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001243 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001244 if err := fs.dec.Decode(&req); err != nil {
1245 return err
1246 }
1247 if req.EndStreamArgs {
1248 fs.endStreamArgs = true
1249 return io.EOF
1250 }
1251 return fs.dec.Decode(itemptr)
1252}
1253
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001254// Implementations of rpc.ServerCall methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001255
Ankuredd74ee2015-03-04 16:38:45 -08001256func (fs *flowServer) LocalDischarges() map[string]security.Discharge {
1257 //nologcall
1258 return fs.flow.LocalDischarges()
1259}
Asim Shankar2519cc12014-11-10 21:16:53 -08001260func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001261 //nologcall
1262 return fs.discharges
1263}
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001264func (fs *flowServer) Server() rpc.Server {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001265 //nologcall
1266 return fs.server
1267}
Asim Shankar0cad0832014-11-04 01:27:38 -08001268func (fs *flowServer) Timestamp() time.Time {
1269 //nologcall
1270 return fs.starttime
1271}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001272func (fs *flowServer) Method() string {
1273 //nologcall
1274 return fs.method
1275}
Todd Wangb31da592015-02-20 12:50:39 -08001276func (fs *flowServer) MethodTags() []*vdl.Value {
Asim Shankar0cad0832014-11-04 01:27:38 -08001277 //nologcall
1278 return fs.tags
1279}
Matt Rosencrantz4f8ac602014-12-29 14:42:48 -08001280func (fs *flowServer) Context() *context.T {
Matt Rosencrantz04d197c2014-12-12 08:39:25 -08001281 return fs.T
1282}
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001283
Benjamin Prosnitz9284a002015-02-23 14:57:25 -08001284func (fs *flowServer) VanadiumContext() *context.T {
1285 return fs.T
1286}
1287
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001288// TODO(cnicolaou): remove Name from rpc.ServerCall and all of
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001289// its implementations
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001290func (fs *flowServer) Name() string {
1291 //nologcall
1292 return fs.suffix
1293}
1294func (fs *flowServer) Suffix() string {
1295 //nologcall
1296 return fs.suffix
1297}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001298func (fs *flowServer) LocalPrincipal() security.Principal {
1299 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001300 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001301}
1302func (fs *flowServer) LocalBlessings() security.Blessings {
1303 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001304 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001305}
1306func (fs *flowServer) RemoteBlessings() security.Blessings {
1307 //nologcall
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001308 if !fs.clientBlessings.IsZero() {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001309 return fs.clientBlessings
1310 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001311 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001312}
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001313func (fs *flowServer) GrantedBlessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001314 //nologcall
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001315 return fs.grantedBlessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001316}
1317func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1318 //nologcall
1319 return fs.flow.LocalEndpoint()
1320}
1321func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1322 //nologcall
1323 return fs.flow.RemoteEndpoint()
1324}