blob: 91b3a1a943950f161fb0eee412f6f4528568b607 [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07005package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006
7import (
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08008 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "fmt"
10 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -070011 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -070012 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070013 "strings"
14 "sync"
15 "time"
16
Jiri Simsa6ac95222015-02-23 16:11:49 -080017 "v.io/v23/config"
18 "v.io/v23/context"
Todd Wang5082a552015-04-02 10:56:11 -070019 "v.io/v23/namespace"
Jiri Simsa6ac95222015-02-23 16:11:49 -080020 "v.io/v23/naming"
21 "v.io/v23/options"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070022 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080023 "v.io/v23/security"
Todd Wang387d8a42015-03-30 17:09:05 -070024 "v.io/v23/security/access"
Jiri Simsa6ac95222015-02-23 16:11:49 -080025 "v.io/v23/vdl"
26 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080027 "v.io/v23/vom"
28 "v.io/v23/vtrace"
Matt Rosencrantz9d3278a2015-03-11 14:58:34 -070029 "v.io/x/lib/netstate"
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -070030 "v.io/x/lib/vlog"
Jiri Simsaffceefa2015-02-28 11:03:34 -080031 "v.io/x/ref/lib/stats"
Matt Rosencrantz86ba1a12015-03-09 13:19:02 -070032 "v.io/x/ref/profiles/internal/lib/publisher"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080033 inaming "v.io/x/ref/profiles/internal/naming"
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -070034 "v.io/x/ref/profiles/internal/rpc/stream"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070035 "v.io/x/ref/profiles/internal/rpc/stream/vc"
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070036)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080037
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070038var (
39 // These errors are intended to be used as arguments to higher
40 // level errors and hence {1}{2} is omitted from their format
41 // strings to avoid repeating these n-times in the final error
42 // message visible to the user.
43 errResponseEncoding = reg(".errResponseEncoding", "failed to encode RPC response {3} <-> {4}{:5}")
44 errResultEncoding = reg(".errResultEncoding", "failed to encode result #{3} [{4}]{:5}")
45 errFailedToResolveToEndpoint = reg(".errFailedToResolveToEndpoint", "failed to resolve {3} to an endpoint")
46 errFailedToResolveProxy = reg(".errFailedToResolveProxy", "failed to resolve proxy {3}{:4}")
47 errFailedToListenForProxy = reg(".errFailedToListenForProxy", "failed to listen on {3}{:4}")
48 errInternalTypeConversion = reg(".errInternalTypeConversion", "failed to convert {3} to v.io/x/ref/profiles/internal/naming.Endpoint")
49 errFailedToParseIP = reg(".errFailedToParseIP", "failed to parse {3} as an IP host")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070050)
51
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080052// state for each requested listen address
53type listenState struct {
54 protocol, address string
55 ln stream.Listener
56 lep naming.Endpoint
57 lnerr, eperr error
58 roaming bool
59 // We keep track of all of the endpoints, the port and a copy of
60 // the original listen endpoint for use with roaming network changes.
61 ieps []*inaming.Endpoint // list of currently active eps
62 port string // port to use for creating new eps
63 protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
64}
65
66// state for each requested proxy
67type proxyState struct {
68 endpoint naming.Endpoint
Mike Burrowsdc6b3602015-02-05 15:52:12 -080069 err error
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080070}
71
72type dhcpState struct {
73 name string
74 publisher *config.Publisher
75 stream *config.Stream
76 ch chan config.Setting // channel to receive dhcp settings over
77 err error // error status.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070078 watchers map[chan<- rpc.NetworkChange]struct{}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080079}
80
Jiri Simsa5293dcb2014-05-10 09:56:38 -070081type server struct {
82 sync.Mutex
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080083 // context used by the server to make internal RPCs, error messages etc.
84 ctx *context.T
Matt Rosencrantz1094d062015-01-30 06:43:12 -080085 cancel context.CancelFunc // function to cancel the above context.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080086 state serverState // track state of the server.
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080087 streamMgr stream.Manager // stream manager to listen for new flows.
88 publisher publisher.Publisher // publisher to publish mounttable mounts.
89 listenerOpts []stream.ListenerOpt // listener opts for Listen.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080090 dhcpState *dhcpState // dhcpState, nil if not using dhcp
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -070091 principal security.Principal
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -070092 blessings security.Blessings
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080093
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -080094 // maps that contain state on listeners.
95 listenState map[*listenState]struct{}
96 listeners map[stream.Listener]struct{}
97
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -080098 // state of proxies keyed by the name of the proxy
99 proxies map[string]proxyState
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800100
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800101 // all endpoints generated and returned by this server
102 endpoints []naming.Endpoint
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800103
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700104 disp rpc.Dispatcher // dispatcher to serve RPCs
105 dispReserved rpc.Dispatcher // dispatcher for reserved methods
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800106 active sync.WaitGroup // active goroutines we've spawned.
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800107 stoppedChan chan struct{} // closed when the server has been stopped.
108 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Jungho Ahn25545d32015-01-26 15:14:14 -0800109 // We cache the IP networks on the device since it is not that cheap to read
110 // network interfaces through os syscall.
111 // TODO(jhahn): Add monitoring the network interface changes.
112 ipNets []*net.IPNet
Todd Wang5082a552015-04-02 10:56:11 -0700113 ns namespace.T
Jungho Ahn25545d32015-01-26 15:14:14 -0800114 servesMountTable bool
Robin Thellend89e95232015-03-24 13:48:48 -0700115 isLeaf bool
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800116
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700117 // TODO(cnicolaou): add roaming stats to rpcStats
118 stats *rpcStats // stats for this server.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700119}
120
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800121type serverState int
122
123const (
124 initialized serverState = iota
125 listening
126 serving
127 publishing
128 stopping
129 stopped
130)
131
132// Simple state machine for the server implementation.
133type next map[serverState]bool
134type transitions map[serverState]next
135
136var (
137 states = transitions{
138 initialized: next{listening: true, stopping: true},
139 listening: next{listening: true, serving: true, stopping: true},
140 serving: next{publishing: true, stopping: true},
141 publishing: next{publishing: true, stopping: true},
142 stopping: next{},
143 stopped: next{},
144 }
145
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700146 externalStates = map[serverState]rpc.ServerState{
147 initialized: rpc.ServerInit,
148 listening: rpc.ServerActive,
149 serving: rpc.ServerActive,
150 publishing: rpc.ServerActive,
151 stopping: rpc.ServerStopping,
152 stopped: rpc.ServerStopped,
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800153 }
154)
155
156func (s *server) allowed(next serverState, method string) error {
157 if states[s.state][next] {
158 s.state = next
159 return nil
160 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800161 return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800162}
163
164func (s *server) isStopState() bool {
165 return s.state == stopping || s.state == stopped
166}
167
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700168var _ rpc.Server = (*server)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700169
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -0700170func InternalNewServer(
171 ctx *context.T,
172 streamMgr stream.Manager,
173 ns namespace.T,
174 client rpc.Client,
175 principal security.Principal,
176 opts ...rpc.ServerOpt) (rpc.Server, error) {
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800177 ctx, cancel := context.WithRootCancel(ctx)
Todd Wangad492042015-04-17 15:58:40 -0700178 ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700179 statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700180 s := &server{
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800181 ctx: ctx,
182 cancel: cancel,
183 streamMgr: streamMgr,
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700184 principal: principal,
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800185 publisher: publisher.New(ctx, ns, publishPeriod),
186 listenState: make(map[*listenState]struct{}),
187 listeners: make(map[stream.Listener]struct{}),
188 proxies: make(map[string]proxyState),
189 stoppedChan: make(chan struct{}),
190 ipNets: ipNetworks(),
191 ns: ns,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700192 stats: newRPCStats(statsPrefix),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700193 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700194 var (
195 dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
196 securityLevel options.SecurityLevel
197 )
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700198 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700199 switch opt := opt.(type) {
200 case stream.ListenerOpt:
201 // Collect all ServerOpts that are also ListenerOpts.
202 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800203 switch opt := opt.(type) {
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800204 case vc.DischargeExpiryBuffer:
205 dischargeExpiryBuffer = time.Duration(opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800206 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700207 case options.ServerBlessings:
208 s.blessings = opt.Blessings
Asim Shankarcc044212014-10-15 23:25:26 -0700209 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700210 s.servesMountTable = bool(opt)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800211 case ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800212 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800213 case PreferredServerResolveProtocols:
214 s.preferredProtocols = []string(opt)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700215 case options.SecurityLevel:
216 securityLevel = opt
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700217 }
218 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700219 if s.blessings.IsZero() && principal != nil {
220 s.blessings = principal.BlessingStore().Default()
221 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700222 if securityLevel == options.SecurityNone {
223 s.principal = nil
224 s.blessings = security.Blessings{}
225 s.dispReserved = nil
226 }
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800227 // Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
228 // the discharges by the time the VC asks for them.`
229 dc := InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
Suharsh Sivakumar1b6683e2014-12-30 13:00:38 -0800230 s.listenerOpts = append(s.listenerOpts, dc)
Benjamin Prosnitz9284a002015-02-23 14:57:25 -0800231 s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
Bogdan Capritae7376312014-11-10 13:13:17 -0800232 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
Asim Shankar2bf7b1e2015-02-27 00:45:12 -0800233 // TODO(caprita): revist printing the blessings with %s, and
234 // instead expose them as a list.
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700235 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700236 if principal != nil {
Bogdan Capritae7376312014-11-10 13:13:17 -0800237 stats.NewStringFunc(blessingsStatsName, func() string {
238 return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
239 })
240 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700241 return s, nil
242}
243
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700244func (s *server) Status() rpc.ServerStatus {
245 status := rpc.ServerStatus{}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700246 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700247 s.Lock()
248 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800249 status.State = externalStates[s.state]
250 status.ServesMountTable = s.servesMountTable
251 status.Mounts = s.publisher.Status()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800252 status.Endpoints = []naming.Endpoint{}
253 for ls, _ := range s.listenState {
254 if ls.eperr != nil {
255 status.Errors = append(status.Errors, ls.eperr)
256 }
257 if ls.lnerr != nil {
258 status.Errors = append(status.Errors, ls.lnerr)
259 }
260 for _, iep := range ls.ieps {
261 status.Endpoints = append(status.Endpoints, iep)
262 }
263 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700264 status.Proxies = make([]rpc.ProxyStatus, 0, len(s.proxies))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800265 for k, v := range s.proxies {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700266 status.Proxies = append(status.Proxies, rpc.ProxyStatus{k, v.endpoint, v.err})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700267 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800268 return status
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700269}
270
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700271func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800272 defer vlog.LogCall()()
273 s.Lock()
274 defer s.Unlock()
275 if s.dhcpState != nil {
276 s.dhcpState.watchers[ch] = struct{}{}
277 }
278}
279
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700280func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800281 defer vlog.LogCall()()
282 s.Lock()
283 defer s.Unlock()
284 if s.dhcpState != nil {
285 delete(s.dhcpState.watchers, ch)
286 }
287}
288
Robin Thellend92b65a42014-12-17 14:30:16 -0800289// resolveToEndpoint resolves an object name or address to an endpoint.
290func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800291 var resolved *naming.MountEntry
292 var err error
Asim Shankardee311d2014-08-01 17:41:31 -0700293 if s.ns != nil {
Asim Shankaraae31802015-01-22 11:59:42 -0800294 if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700295 return "", err
296 }
297 } else {
Asim Shankaraae31802015-01-22 11:59:42 -0800298 // Fake a namespace resolution
299 resolved = &naming.MountEntry{Servers: []naming.MountedServer{
300 {Server: address},
301 }}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700302 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800303 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800304 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800305 return "", err
306 }
Asim Shankaraae31802015-01-22 11:59:42 -0800307 for _, n := range resolved.Names() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700308 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800309 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700310 continue
311 }
Asim Shankaraae31802015-01-22 11:59:42 -0800312 if ep, err := inaming.NewEndpoint(address); err == nil {
313 return ep.String(), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700314 }
315 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700316 return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700317}
318
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800319// getPossbileAddrs returns an appropriate set of addresses that could be used
320// to contact the supplied protocol, host, port parameters using the supplied
321// chooser function. It returns an indication of whether the supplied address
322// was fully specified or not, returning false if the address was fully
323// specified, and true if it was not.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700324func getPossibleAddrs(protocol, host, port string, chooser rpc.AddressChooser) ([]rpc.Address, bool, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800325
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800326 ip := net.ParseIP(host)
327 if ip == nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700328 return nil, false, verror.New(errFailedToParseIP, nil, host)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800329 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800330
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700331 addrFromIP := func(ip net.IP) rpc.Address {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800332 return &netstate.AddrIfc{
333 Addr: &net.IPAddr{IP: ip},
334 }
335 }
336
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800337 if ip.IsUnspecified() {
338 if chooser != nil {
339 // Need to find a usable IP address since the call to listen
340 // didn't specify one.
341 if addrs, err := netstate.GetAccessibleIPs(); err == nil {
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800342 a, err := chooser(protocol, addrs)
343 if err == nil && len(a) > 0 {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800344 return a, true, nil
345 }
346 }
347 }
348 // We don't have a chooser, so we just return the address the
349 // underlying system has chosen.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700350 return []rpc.Address{addrFromIP(ip)}, true, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800351 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700352 return []rpc.Address{addrFromIP(ip)}, false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800353}
354
355// createEndpoints creates appropriate inaming.Endpoint instances for
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800356// all of the externally accessible network addresses that can be used
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800357// to reach this server.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700358func (s *server) createEndpoints(lep naming.Endpoint, chooser rpc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800359 iep, ok := lep.(*inaming.Endpoint)
360 if !ok {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700361 return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800362 }
363 if !strings.HasPrefix(iep.Protocol, "tcp") &&
364 !strings.HasPrefix(iep.Protocol, "ws") {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800365 // If not tcp, ws, or wsh, just return the endpoint we were given.
366 return []*inaming.Endpoint{iep}, "", false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800367 }
368
369 host, port, err := net.SplitHostPort(iep.Address)
370 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800371 return nil, "", false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800372 }
Cosmos Nicolaoud70e1fc2014-12-16 14:20:39 -0800373 addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800374 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800375 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800376 }
377 ieps := make([]*inaming.Endpoint, 0, len(addrs))
378 for _, addr := range addrs {
379 n, err := inaming.NewEndpoint(lep.String())
380 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800381 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800382 }
383 n.IsMountTable = s.servesMountTable
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800384 n.Address = net.JoinHostPort(addr.Address().String(), port)
385 ieps = append(ieps, n)
386 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800387 return ieps, port, unspecified, nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800388}
389
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700390func (s *server) Listen(listenSpec rpc.ListenSpec) ([]naming.Endpoint, error) {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700391 defer vlog.LogCall()()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800392 useProxy := len(listenSpec.Proxy) > 0
393 if !useProxy && len(listenSpec.Addrs) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800394 return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800395 }
396
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700397 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800398 defer s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800399
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800400 if err := s.allowed(listening, "Listen"); err != nil {
401 return nil, err
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700402 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700403
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800404 // Start the proxy as early as possible, ignore duplicate requests
405 // for the same proxy.
406 if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800407 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800408 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800409 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800410 s.proxyListenLoop(listenSpec.Proxy)
411 s.active.Done()
412 }()
413 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700414
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800415 roaming := false
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800416 lnState := make([]*listenState, 0, len(listenSpec.Addrs))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800417 for _, addr := range listenSpec.Addrs {
418 if len(addr.Address) > 0 {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800419 // Listen if we have a local address to listen on.
420 ls := &listenState{
421 protocol: addr.Protocol,
422 address: addr.Address,
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800423 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700424 ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.principal, s.blessings, s.listenerOpts...)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800425 lnState = append(lnState, ls)
426 if ls.lnerr != nil {
Asim Shankar7171a252015-03-07 14:41:40 -0800427 vlog.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800428 continue
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800429 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800430 ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
431 if ls.roaming && ls.eperr == nil {
432 ls.protoIEP = *ls.lep.(*inaming.Endpoint)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800433 roaming = true
434 }
435 }
436 }
437
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800438 found := false
439 for _, ls := range lnState {
440 if ls.ln != nil {
441 found = true
442 break
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800443 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800444 }
445 if !found && !useProxy {
Jiri Simsa074bf362015-02-17 09:29:45 -0800446 return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800447 }
448
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800449 if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
450 // Create a dhcp listener if we haven't already done so.
451 dhcp := &dhcpState{
452 name: listenSpec.StreamName,
453 publisher: listenSpec.StreamPublisher,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700454 watchers: make(map[chan<- rpc.NetworkChange]struct{}),
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800455 }
456 s.dhcpState = dhcp
457 dhcp.ch = make(chan config.Setting, 10)
458 dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
459 if dhcp.err == nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800460 // We have a goroutine to listen for dhcp changes.
461 s.active.Add(1)
462 go func() {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800463 s.dhcpLoop(dhcp.ch)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800464 s.active.Done()
465 }()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800466 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800467 }
468
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800469 eps := make([]naming.Endpoint, 0, 10)
470 for _, ls := range lnState {
471 s.listenState[ls] = struct{}{}
472 if ls.ln != nil {
473 // We have a goroutine per listener to accept new flows.
474 // Each flow is served from its own goroutine.
475 s.active.Add(1)
476 go func(ln stream.Listener, ep naming.Endpoint) {
477 s.listenLoop(ln, ep)
478 s.active.Done()
479 }(ls.ln, ls.lep)
480 }
481
482 for _, iep := range ls.ieps {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800483 eps = append(eps, iep)
484 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800485 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800486
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800487 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700488}
489
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800490func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800491 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800492 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700493 return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800494 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700495 ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, s.listenerOpts...)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800496 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700497 return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800498 }
499 iep, ok := ep.(*inaming.Endpoint)
500 if !ok {
501 ln.Close()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700502 return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800503 }
504 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800505 s.proxies[proxy] = proxyState{iep, nil}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800506 s.Unlock()
Robin Thellende22920e2015-02-05 17:15:50 -0800507 iep.IsMountTable = s.servesMountTable
Robin Thellendb457df92015-03-30 09:42:15 -0700508 iep.IsLeaf = s.isLeaf
Robin Thellend89e95232015-03-24 13:48:48 -0700509 s.publisher.AddServer(iep.String())
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800510 return iep, ln, nil
511}
512
513func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700514 const (
515 min = 5 * time.Millisecond
516 max = 5 * time.Minute
517 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800518
519 iep, ln, err := s.reconnectAndPublishProxy(proxy)
520 if err != nil {
Matt Rosencrantz7e16af42015-04-17 11:48:56 -0700521 vlog.Errorf("Failed to connect to proxy: %s", err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800522 }
523 // the initial connection maybe have failed, but we enter the retry
524 // loop anyway so that we will continue to try and connect to the
525 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800526 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800527 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800528 s.Unlock()
529 return
530 }
531 s.Unlock()
532
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700533 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800534 if ln != nil && iep != nil {
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800535 err := s.listenLoop(ln, iep)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800536 // The listener is done, so:
537 // (1) Unpublish its name
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800538 s.publisher.RemoveServer(iep.String())
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800539 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800540 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800541 s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800542 } else {
Asim Shankar7171a252015-03-07 14:41:40 -0800543 // err will be nil if we're stopping.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800544 s.proxies[proxy] = proxyState{iep, nil}
545 s.Unlock()
546 return
547 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800548 s.Unlock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800549 }
550
551 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800552 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800553 s.Unlock()
554 return
555 }
556 s.Unlock()
557
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700558 // (2) Reconnect to the proxy unless the server has been stopped
559 backoff := min
560 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800561 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700562 select {
563 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700564 if backoff = backoff * 2; backoff > max {
565 backoff = max
566 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700567 case <-s.stoppedChan:
568 return
569 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800570 // (3) reconnect, publish new address
571 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
Matt Rosencrantz7e16af42015-04-17 11:48:56 -0700572 vlog.Errorf("Failed to reconnect to proxy %q: %s", proxy, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800573 } else {
574 vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
575 break
576 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700577 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700578 }
579}
580
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800581// addListener adds the supplied listener taking care to
582// check to see if we're already stopping. It returns true
583// if the listener was added.
584func (s *server) addListener(ln stream.Listener) bool {
585 s.Lock()
586 defer s.Unlock()
587 if s.isStopState() {
588 return false
589 }
590 s.listeners[ln] = struct{}{}
591 return true
592}
593
594// rmListener removes the supplied listener taking care to
595// check if we're already stopping. It returns true if the
596// listener was removed.
597func (s *server) rmListener(ln stream.Listener) bool {
598 s.Lock()
599 defer s.Unlock()
600 if s.isStopState() {
601 return false
602 }
603 delete(s.listeners, ln)
604 return true
605}
606
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800607func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700608 defer vlog.VI(1).Infof("rpc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800609 var calls sync.WaitGroup
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800610
611 if !s.addListener(ln) {
612 // We're stopping.
613 return nil
614 }
615
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700616 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800617 calls.Wait()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800618 s.rmListener(ln)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700619 }()
620 for {
621 flow, err := ln.Accept()
622 if err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700623 vlog.VI(10).Infof("rpc: Accept on %v failed: %v", ep, err)
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800624 return err
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700625 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800626 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700627 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800628 defer calls.Done()
629 fs, err := newFlowServer(flow, s)
630 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700631 vlog.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800632 return
633 }
634 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800635 // TODO(caprita): Logging errors here is too spammy. For example, "not
636 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800637 // TODO(cnicolaou): revisit this when verror2 transition is
638 // done.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800639 if err != io.EOF {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700640 vlog.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800641 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700642 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700643 }(flow)
644 }
645}
646
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800647func (s *server) dhcpLoop(ch chan config.Setting) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700648 defer vlog.VI(1).Infof("rpc: Stopped listen for dhcp changes")
649 vlog.VI(2).Infof("rpc: dhcp loop")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800650 for setting := range ch {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700651 if setting == nil {
652 return
653 }
654 switch v := setting.Value().(type) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700655 case []rpc.Address:
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700656 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800657 if s.isStopState() {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700658 s.Unlock()
659 return
660 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800661 var err error
662 var changed []naming.Endpoint
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700663 switch setting.Name() {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700664 case rpc.NewAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800665 changed = s.addAddresses(v)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700666 case rpc.RmAddrsSetting:
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800667 changed, err = s.removeAddresses(v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700668 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700669 change := rpc.NetworkChange{
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800670 Time: time.Now(),
671 State: externalStates[s.state],
672 Setting: setting,
673 Changed: changed,
674 Error: err,
675 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700676 vlog.VI(2).Infof("rpc: dhcp: change %v", change)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800677 for ch, _ := range s.dhcpState.watchers {
678 select {
679 case ch <- change:
680 default:
681 }
682 }
683 s.Unlock()
684 default:
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700685 vlog.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700686 }
687 }
688}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800689
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700690func getHost(address rpc.Address) string {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800691 host, _, err := net.SplitHostPort(address.Address().String())
692 if err == nil {
693 return host
694 }
695 return address.Address().String()
696
697}
698
699// Remove all endpoints that have the same host address as the supplied
700// address parameter.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700701func (s *server) removeAddresses(addresses []rpc.Address) ([]naming.Endpoint, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800702 var removed []naming.Endpoint
703 for _, address := range addresses {
704 host := getHost(address)
705 for ls, _ := range s.listenState {
706 if ls != nil && ls.roaming && len(ls.ieps) > 0 {
707 remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
708 for _, iep := range ls.ieps {
709 lnHost, _, err := net.SplitHostPort(iep.Address)
710 if err != nil {
711 lnHost = iep.Address
712 }
713 if lnHost == host {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700714 vlog.VI(2).Infof("rpc: dhcp removing: %s", iep)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800715 removed = append(removed, iep)
716 s.publisher.RemoveServer(iep.String())
717 continue
718 }
719 remaining = append(remaining, iep)
720 }
721 ls.ieps = remaining
722 }
723 }
724 }
725 return removed, nil
726}
727
728// Add new endpoints for the new address. There is no way to know with
729// 100% confidence which new endpoints to publish without shutting down
730// all network connections and reinitializing everything from scratch.
731// Instead, we find all roaming listeners with at least one endpoint
732// and create a new endpoint with the same port as the existing ones
733// but with the new address supplied to us to by the dhcp code. As
734// an additional safeguard we reject the new address if it is not
735// externally accessible.
736// This places the onus on the dhcp/roaming code that sends us addresses
737// to ensure that those addresses are externally reachable.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700738func (s *server) addAddresses(addresses []rpc.Address) []naming.Endpoint {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800739 var added []naming.Endpoint
740 for _, address := range addresses {
741 if !netstate.IsAccessibleIP(address) {
742 return added
743 }
744 host := getHost(address)
745 for ls, _ := range s.listenState {
746 if ls != nil && ls.roaming {
747 niep := ls.protoIEP
748 niep.Address = net.JoinHostPort(host, ls.port)
Robin Thellendb457df92015-03-30 09:42:15 -0700749 niep.IsMountTable = s.servesMountTable
750 niep.IsLeaf = s.isLeaf
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800751 ls.ieps = append(ls.ieps, &niep)
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700752 vlog.VI(2).Infof("rpc: dhcp adding: %s", niep)
Robin Thellend89e95232015-03-24 13:48:48 -0700753 s.publisher.AddServer(niep.String())
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800754 added = append(added, &niep)
755 }
756 }
757 }
758 return added
759}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700760
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800761type leafDispatcher struct {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700762 invoker rpc.Invoker
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800763 auth security.Authorizer
764}
765
766func (d leafDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
767 if suffix != "" {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700768 return nil, nil, verror.New(verror.ErrUnknownSuffix, nil, suffix)
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800769 }
770 return d.invoker, d.auth, nil
771}
772
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800773func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800774 defer vlog.LogCall()()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800775 if obj == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800776 return verror.New(verror.ErrBadArg, s.ctx, "nil object")
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800777 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800778 invoker, err := objectToInvoker(obj)
779 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800780 return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800781 }
Robin Thellendb457df92015-03-30 09:42:15 -0700782 s.setLeaf(true)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800783 return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800784}
785
Robin Thellendb457df92015-03-30 09:42:15 -0700786func (s *server) setLeaf(value bool) {
787 s.Lock()
788 defer s.Unlock()
789 s.isLeaf = value
790 for ls, _ := range s.listenState {
791 for i := range ls.ieps {
792 ls.ieps[i].IsLeaf = s.isLeaf
793 }
794 }
795}
796
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700797func (s *server) ServeDispatcher(name string, disp rpc.Dispatcher) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800798 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800799 if disp == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800800 return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700801 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800802 s.Lock()
803 defer s.Unlock()
804 if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
805 return err
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700806 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800807 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800808 s.disp = disp
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700809 if len(name) > 0 {
Robin Thellendb457df92015-03-30 09:42:15 -0700810 for ls, _ := range s.listenState {
811 for _, iep := range ls.ieps {
812 s.publisher.AddServer(iep.String())
813 }
814 }
Robin Thellend89e95232015-03-24 13:48:48 -0700815 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700816 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700817 return nil
818}
819
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800820func (s *server) AddName(name string) error {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800821 defer vlog.LogCall()()
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800822 if len(name) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800823 return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800824 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800825 s.Lock()
826 defer s.Unlock()
827 if err := s.allowed(publishing, "AddName"); err != nil {
828 return err
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800829 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800830 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Robin Thellend89e95232015-03-24 13:48:48 -0700831 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800832 return nil
833}
834
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800835func (s *server) RemoveName(name string) {
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800836 defer vlog.LogCall()()
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800837 s.Lock()
838 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800839 if err := s.allowed(publishing, "RemoveName"); err != nil {
840 return
841 }
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800842 vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800843 s.publisher.RemoveName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800844}
845
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700846func (s *server) Stop() error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -0700847 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700848 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800849 if s.isStopState() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700850 s.Unlock()
851 return nil
852 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800853 s.state = stopping
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700854 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700855 s.Unlock()
856
Robin Thellenddf428232014-10-06 12:50:44 -0700857 // Delete the stats object.
858 s.stats.stop()
859
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700860 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
861 // server lock, since publisher is safe for concurrent access.
862
863 // Stop the publisher, which triggers unmounting of published names.
864 s.publisher.Stop()
865 // Wait for the publisher to be done unmounting before we can proceed to
866 // close the listeners (to minimize the number of mounted names pointing
867 // to endpoint that are no longer serving).
868 //
869 // TODO(caprita): See if make sense to fail fast on rejecting
870 // connections once listeners are closed, and parallelize the publisher
871 // and listener shutdown.
872 s.publisher.WaitForStop()
873
874 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800875
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700876 // Close all listeners. No new flows will be accepted, while in-flight
877 // flows will continue until they terminate naturally.
878 nListeners := len(s.listeners)
879 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700880
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800881 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700882 go func(ln stream.Listener) {
883 errCh <- ln.Close()
884 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800885 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800886
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800887 drain := func(ch chan config.Setting) {
888 for {
889 select {
890 case v := <-ch:
891 if v == nil {
892 return
893 }
894 default:
895 close(ch)
896 return
897 }
898 }
899 }
900
901 if dhcp := s.dhcpState; dhcp != nil {
Cosmos Nicolaouaceb8d92015-02-05 20:44:02 -0800902 // TODO(cnicolaou,caprita): investigate not having to close and drain
903 // the channel here. It's a little awkward right now since we have to
904 // be careful to not close the channel in two places, i.e. here and
905 // and from the publisher's Shutdown method.
906 if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
907 drain(dhcp.ch)
908 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700909 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800910
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700911 s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800912
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700913 var firstErr error
914 for i := 0; i < nListeners; i++ {
915 if err := <-errCh; err != nil && firstErr == nil {
916 firstErr = err
917 }
918 }
919 // At this point, we are guaranteed that no new requests are going to be
920 // accepted.
921
922 // Wait for the publisher and active listener + flows to finish.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800923 done := make(chan struct{}, 1)
924 go func() { s.active.Wait(); done <- struct{}{} }()
925
926 select {
927 case <-done:
928 case <-time.After(5 * time.Minute):
929 vlog.Errorf("Listener Close Error: %v", firstErr)
Bogdan Caprita2d04f0e2015-03-13 15:39:13 -0700930 vlog.Errorf("Timedout waiting for goroutines to stop: listeners: %d (currently: %d)", nListeners, len(s.listeners))
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800931 for ln, _ := range s.listeners {
932 vlog.Errorf("Listener: %p", ln)
933 }
934 for ls, _ := range s.listenState {
935 vlog.Errorf("ListenState: %v", ls)
936 }
937 <-done
938 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800939
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700940 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800941 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700942 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800943 if firstErr != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800944 return verror.New(verror.ErrInternal, s.ctx, firstErr)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800945 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800946 s.state = stopped
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800947 s.cancel()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800948 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700949}
950
951// flowServer implements the RPC server-side protocol for a single RPC, over a
952// flow that's already connected to the client.
953type flowServer struct {
Todd Wang54feabe2015-04-15 23:38:26 -0700954 ctx *context.T // context associated with the RPC
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700955 server *server // rpc.Server that this flow server belongs to
956 disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
Todd Wang3425a902015-01-21 18:43:59 -0800957 dec *vom.Decoder // to decode requests and args from the client
958 enc *vom.Encoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -0800959 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700960
Asim Shankar220a0152014-10-30 21:21:09 -0700961 // Fields filled in during the server invocation.
Suharsh Sivakumar380bf342015-02-27 15:38:27 -0800962 clientBlessings security.Blessings
963 ackBlessings bool
964 grantedBlessings security.Blessings
965 method, suffix string
966 tags []*vdl.Value
967 discharges map[string]security.Discharge
968 starttime time.Time
969 endStreamArgs bool // are the stream args at EOF?
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700970}
971
Todd Wang54feabe2015-04-15 23:38:26 -0700972var (
973 _ rpc.StreamServerCall = (*flowServer)(nil)
974 _ security.Call = (*flowServer)(nil)
975)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700976
Todd Wang34ed4c62014-11-26 15:15:52 -0800977func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -0700978 server.Lock()
979 disp := server.disp
980 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -0700981
Todd Wang34ed4c62014-11-26 15:15:52 -0800982 fs := &flowServer{
Todd Wang54feabe2015-04-15 23:38:26 -0700983 ctx: server.ctx,
Todd Wang34ed4c62014-11-26 15:15:52 -0800984 server: server,
985 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -0800986 flow: flow,
987 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700988 }
Todd Wangf519f8f2015-01-21 10:07:41 -0800989 var err error
Jungho Ahn60408fa2015-03-27 15:28:22 -0700990 typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
991 if typedec == nil {
992 if fs.dec, err = vom.NewDecoder(flow); err != nil {
993 flow.Close()
994 return nil, err
995 }
996 if fs.enc, err = vom.NewEncoder(flow); err != nil {
997 flow.Close()
998 return nil, err
999 }
1000 } else {
1001 if fs.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
1002 flow.Close()
1003 return nil, err
1004 }
1005 typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
1006 if fs.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
1007 flow.Close()
1008 return nil, err
1009 }
Todd Wang34ed4c62014-11-26 15:15:52 -08001010 }
1011 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001012}
1013
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001014// authorizeVtrace works by simulating a call to __debug/vtrace.Trace. That
1015// rpc is essentially equivalent in power to the data we are attempting to
1016// attach here.
1017func (fs *flowServer) authorizeVtrace() error {
1018 // Set up a context as though we were calling __debug/vtrace.
1019 params := &security.CallParams{}
Todd Wang4264e4b2015-04-16 22:43:40 -07001020 params.Copy(fs)
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001021 params.Method = "Trace"
1022 params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)}
1023 params.Suffix = "__debug/vtrace"
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001024
1025 var auth security.Authorizer
1026 if fs.server.dispReserved != nil {
1027 _, auth, _ = fs.server.dispReserved.Lookup(params.Suffix)
1028 }
Todd Wang4264e4b2015-04-16 22:43:40 -07001029 return authorize(fs.ctx, security.NewCall(params), auth)
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001030}
1031
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001032func (fs *flowServer) serve() error {
1033 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -07001034
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001035 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001036
Todd Wang54feabe2015-04-15 23:38:26 -07001037 vtrace.GetSpan(fs.ctx).Finish()
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001038
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001039 var traceResponse vtrace.Response
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001040 // Check if the caller is permitted to view vtrace data.
1041 if fs.authorizeVtrace() == nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001042 traceResponse = vtrace.GetResponse(fs.ctx)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001043 }
1044
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001045 // Respond to the client with the response header and positional results.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001046 response := rpc.Response{
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001047 Error: err,
1048 EndStreamResults: true,
1049 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001050 TraceResponse: traceResponse,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001051 AckBlessings: fs.ackBlessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001052 }
1053 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -08001054 if err == io.EOF {
1055 return err
1056 }
Todd Wang54feabe2015-04-15 23:38:26 -07001057 return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001058 }
1059 if response.Error != nil {
1060 return response.Error
1061 }
1062 for ix, res := range results {
Todd Wangf519f8f2015-01-21 10:07:41 -08001063 if err := fs.enc.Encode(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -08001064 if err == io.EOF {
1065 return err
1066 }
Todd Wang54feabe2015-04-15 23:38:26 -07001067 return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001068 }
1069 }
1070 // TODO(ashankar): Should unread data from the flow be drained?
1071 //
1072 // Reason to do so:
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -07001073 // The common stream.Flow implementation (v.io/x/ref/profiles/internal/rpc/stream/vc/reader.go)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001074 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
1075 // slices will not be returned to the pool leading to possibly increased memory usage.
1076 //
1077 // Reason to not do so:
1078 // Draining here will conflict with any Reads on the flow in a separate goroutine
1079 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
1080 //
1081 // For now, go with the reason to not do so as having unread data in the stream
1082 // should be a rare case.
1083 return nil
1084}
1085
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001086func (fs *flowServer) readRPCRequest() (*rpc.Request, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001087 // Set a default timeout before reading from the flow. Without this timeout,
1088 // a client that sends no request or a partial request will retain the flow
1089 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -07001090 initTimer := newTimer(defaultCallTimeout)
1091 defer initTimer.Stop()
1092 fs.flow.SetDeadline(initTimer.C)
1093
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001094 // Decode the initial request.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001095 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001096 if err := fs.dec.Decode(&req); err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001097 return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001098 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001099 return &req, nil
1100}
1101
Todd Wang9548d852015-02-10 16:15:59 -08001102func (fs *flowServer) processRequest() ([]interface{}, error) {
Asim Shankar0cad0832014-11-04 01:27:38 -08001103 fs.starttime = time.Now()
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001104 req, err := fs.readRPCRequest()
Todd Wang9548d852015-02-10 16:15:59 -08001105 if err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001106 // We don't know what the rpc call was supposed to be, but we'll create
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001107 // a placeholder span so we can capture annotations.
Todd Wangad492042015-04-17 15:58:40 -07001108 fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
Todd Wang9548d852015-02-10 16:15:59 -08001109 return nil, err
Matt Rosencrantz86897932014-10-02 09:34:34 -07001110 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001111 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -08001112 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -07001113
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001114 // TODO(mattr): Currently this allows users to trigger trace collection
1115 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -08001116 // results later. This might be considered a DOS vector.
Todd Wang54feabe2015-04-15 23:38:26 -07001117 spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
Todd Wangad492042015-04-17 15:58:40 -07001118 fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001119
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001120 var cancel context.CancelFunc
Todd Wangf6a06882015-02-27 17:38:01 -08001121 if !req.Deadline.IsZero() {
Todd Wang54feabe2015-04-15 23:38:26 -07001122 fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001123 } else {
Todd Wang54feabe2015-04-15 23:38:26 -07001124 fs.ctx, cancel = context.WithCancel(fs.ctx)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001125 }
Todd Wang54feabe2015-04-15 23:38:26 -07001126 fs.flow.SetDeadline(fs.ctx.Done())
Todd Wang5739dda2014-11-16 22:44:02 -08001127 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001128
Todd Wang5739dda2014-11-16 22:44:02 -08001129 // Initialize security: blessings, discharges, etc.
Todd Wang9548d852015-02-10 16:15:59 -08001130 if err := fs.initSecurity(req); err != nil {
1131 return nil, err
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001132 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001133 // Lookup the invoker.
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001134 invoker, auth, err := fs.lookup(fs.suffix, fs.method)
Todd Wangebb3b012015-02-09 21:59:05 -08001135 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001136 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001137 }
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001138
1139 // Note that we strip the reserved prefix when calling the invoker so
1140 // that __Glob will call Glob. Note that we've already assigned a
1141 // special invoker so that we never call the wrong method by mistake.
1142 strippedMethod := naming.StripReserved(fs.method)
1143
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001144 // Prepare invoker and decode args.
1145 numArgs := int(req.NumPosArgs)
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001146 argptrs, tags, err := invoker.Prepare(strippedMethod, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -08001147 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001148 if err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001149 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001150 }
Todd Wang9548d852015-02-10 16:15:59 -08001151 if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
Todd Wang54feabe2015-04-15 23:38:26 -07001152 return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001153 }
1154 for ix, argptr := range argptrs {
1155 if err := fs.dec.Decode(argptr); err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001156 return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001157 }
1158 }
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001159
Todd Wang5739dda2014-11-16 22:44:02 -08001160 // Check application's authorization policy.
Todd Wang4264e4b2015-04-16 22:43:40 -07001161 if err := authorize(fs.ctx, fs, auth); err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001162 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001163 }
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001164
Todd Wang5739dda2014-11-16 22:44:02 -08001165 // Invoke the method.
Todd Wang54feabe2015-04-15 23:38:26 -07001166 results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
Robin Thellendb16d7162014-11-07 13:47:26 -08001167 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Todd Wang9548d852015-02-10 16:15:59 -08001168 return results, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001169}
1170
Todd Wang5739dda2014-11-16 22:44:02 -08001171func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1172 // Ensure that the context gets cancelled if the flow is closed
1173 // due to a network error, or client cancellation.
1174 select {
1175 case <-fs.flow.Closed():
1176 // Here we remove the contexts channel as a deadline to the flow.
1177 // We do this to ensure clients get a consistent error when they read/write
1178 // after the flow is closed. Since the flow is already closed, it doesn't
1179 // matter that the context is also cancelled.
1180 fs.flow.SetDeadline(nil)
1181 cancel()
Todd Wang54feabe2015-04-15 23:38:26 -07001182 case <-fs.ctx.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001183 }
Todd Wang5739dda2014-11-16 22:44:02 -08001184}
1185
1186// lookup returns the invoker and authorizer responsible for serving the given
1187// name and method. The suffix is stripped of any leading slashes. If it begins
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001188// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
Todd Wang5739dda2014-11-16 22:44:02 -08001189// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1190// value may be modified to match the actual suffix and method to use.
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001191func (fs *flowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
1192 if naming.IsReserved(method) {
Todd Wang5739dda2014-11-16 22:44:02 -08001193 return reservedInvoker(fs.disp, fs.server.dispReserved), &acceptAllAuthorizer{}, nil
1194 }
1195 disp := fs.disp
1196 if naming.IsReserved(suffix) {
1197 disp = fs.server.dispReserved
Robin Thellendd24f0842014-09-23 10:27:29 -07001198 }
1199 if disp != nil {
Robin Thellenda02fe8f2014-11-19 09:58:29 -08001200 obj, auth, err := disp.Lookup(suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001201 switch {
1202 case err != nil:
Todd Wang9548d852015-02-10 16:15:59 -08001203 return nil, nil, err
Todd Wang5739dda2014-11-16 22:44:02 -08001204 case obj != nil:
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001205 invoker, err := objectToInvoker(obj)
1206 if err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001207 return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001208 }
1209 return invoker, auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001210 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001211 }
Todd Wang54feabe2015-04-15 23:38:26 -07001212 return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001213}
1214
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001215func objectToInvoker(obj interface{}) (rpc.Invoker, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001216 if obj == nil {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001217 return nil, errors.New("nil object")
Todd Wang5739dda2014-11-16 22:44:02 -08001218 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001219 if invoker, ok := obj.(rpc.Invoker); ok {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001220 return invoker, nil
Todd Wang5739dda2014-11-16 22:44:02 -08001221 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001222 return rpc.ReflectInvoker(obj)
Todd Wang5739dda2014-11-16 22:44:02 -08001223}
1224
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001225func (fs *flowServer) initSecurity(req *rpc.Request) error {
Ankurb905dae2015-03-04 12:38:20 -08001226 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001227 // SecurityNone.
Todd Wang54feabe2015-04-15 23:38:26 -07001228 if fs.LocalPrincipal() == nil {
Ankurb905dae2015-03-04 12:38:20 -08001229 return nil
1230 }
1231
Todd Wang5739dda2014-11-16 22:44:02 -08001232 // If additional credentials are provided, make them available in the context
Todd Wang5739dda2014-11-16 22:44:02 -08001233 // Detect unusable blessings now, rather then discovering they are unusable on
1234 // first use.
1235 //
1236 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1237 // the server's identity as the blessing. Figure out what we want to do about
1238 // this - should servers be able to assume that a blessing is something that
1239 // does not have the authorizations that the server's own identity has?
Todd Wang54feabe2015-04-15 23:38:26 -07001240 if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1241 return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
Todd Wang5739dda2014-11-16 22:44:02 -08001242 }
Asim Shankarb07ec692015-02-27 23:40:44 -08001243 fs.grantedBlessings = req.GrantedBlessings
Ankurb905dae2015-03-04 12:38:20 -08001244
Asim Shankarb07ec692015-02-27 23:40:44 -08001245 var err error
1246 if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001247 // When the server can't access the blessings cache, the client is not following
1248 // protocol, so the server closes the VCs corresponding to the client endpoint.
1249 // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
1250 // of all VCs connected to the RemoteEndpoint.
1251 fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
Todd Wang54feabe2015-04-15 23:38:26 -07001252 return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001253 }
Ankurb905dae2015-03-04 12:38:20 -08001254 // Verify that the blessings sent by the client in the request have the same public
1255 // key as those sent by the client during VC establishment.
1256 if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
Todd Wang54feabe2015-04-15 23:38:26 -07001257 return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
Ankurb905dae2015-03-04 12:38:20 -08001258 }
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001259 fs.ackBlessings = true
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001260
Asim Shankar3ad0b8a2015-02-25 00:37:21 -08001261 for _, d := range req.Discharges {
Asim Shankar08642822015-03-02 21:21:09 -08001262 fs.discharges[d.ID()] = d
Todd Wang5739dda2014-11-16 22:44:02 -08001263 }
1264 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001265}
1266
1267type acceptAllAuthorizer struct{}
1268
Todd Wang4264e4b2015-04-16 22:43:40 -07001269func (acceptAllAuthorizer) Authorize(*context.T, security.Call) error {
Robin Thellendc26c32e2014-10-06 17:44:04 -07001270 return nil
1271}
1272
Todd Wang4264e4b2015-04-16 22:43:40 -07001273func authorize(ctx *context.T, call security.Call, auth security.Authorizer) error {
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001274 if call.LocalPrincipal() == nil {
Todd Wang5739dda2014-11-16 22:44:02 -08001275 // LocalPrincipal is nil means that the server wanted to avoid
1276 // authentication, and thus wanted to skip authorization as well.
1277 return nil
1278 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001279 if auth == nil {
Asim Shankare4a8c092015-04-01 18:43:39 -07001280 auth = security.DefaultAuthorizer()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001281 }
Todd Wang4264e4b2015-04-16 22:43:40 -07001282 if err := auth.Authorize(ctx, call); err != nil {
Asim Shankara5457f02014-10-24 23:23:07 -07001283 // TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error?
Matt Rosencrantz250558f2015-03-17 11:37:31 -07001284 return verror.New(verror.ErrNoAccess, ctx, newErrBadAuth(ctx, call.Suffix(), call.Method(), err))
Asim Shankara5457f02014-10-24 23:23:07 -07001285 }
1286 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001287}
1288
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001289// Send implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001290func (fs *flowServer) Send(item interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001291 defer vlog.LogCall()()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001292 // The empty response header indicates what follows is a streaming result.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001293 if err := fs.enc.Encode(rpc.Response{}); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001294 return err
1295 }
1296 return fs.enc.Encode(item)
1297}
1298
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001299// Recv implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001300func (fs *flowServer) Recv(itemptr interface{}) error {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001301 defer vlog.LogCall()()
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001302 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001303 if err := fs.dec.Decode(&req); err != nil {
1304 return err
1305 }
1306 if req.EndStreamArgs {
1307 fs.endStreamArgs = true
1308 return io.EOF
1309 }
1310 return fs.dec.Decode(itemptr)
1311}
1312
Todd Wang54feabe2015-04-15 23:38:26 -07001313// Implementations of rpc.ServerCall and security.Call methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001314
Todd Wang54feabe2015-04-15 23:38:26 -07001315func (fs *flowServer) Security() security.Call {
1316 //nologcall
1317 return fs
1318}
Ankuredd74ee2015-03-04 16:38:45 -08001319func (fs *flowServer) LocalDischarges() map[string]security.Discharge {
1320 //nologcall
1321 return fs.flow.LocalDischarges()
1322}
Asim Shankar2519cc12014-11-10 21:16:53 -08001323func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001324 //nologcall
1325 return fs.discharges
1326}
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001327func (fs *flowServer) Server() rpc.Server {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001328 //nologcall
1329 return fs.server
1330}
Asim Shankar0cad0832014-11-04 01:27:38 -08001331func (fs *flowServer) Timestamp() time.Time {
1332 //nologcall
1333 return fs.starttime
1334}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001335func (fs *flowServer) Method() string {
1336 //nologcall
1337 return fs.method
1338}
Todd Wangb31da592015-02-20 12:50:39 -08001339func (fs *flowServer) MethodTags() []*vdl.Value {
Asim Shankar0cad0832014-11-04 01:27:38 -08001340 //nologcall
1341 return fs.tags
1342}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001343func (fs *flowServer) Suffix() string {
1344 //nologcall
1345 return fs.suffix
1346}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001347func (fs *flowServer) LocalPrincipal() security.Principal {
1348 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001349 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001350}
1351func (fs *flowServer) LocalBlessings() security.Blessings {
1352 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001353 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001354}
1355func (fs *flowServer) RemoteBlessings() security.Blessings {
1356 //nologcall
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001357 if !fs.clientBlessings.IsZero() {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001358 return fs.clientBlessings
1359 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001360 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001361}
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001362func (fs *flowServer) GrantedBlessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001363 //nologcall
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001364 return fs.grantedBlessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001365}
1366func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1367 //nologcall
1368 return fs.flow.LocalEndpoint()
1369}
1370func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1371 //nologcall
1372 return fs.flow.RemoteEndpoint()
1373}