blob: ecd7d0097f553887a88d5e823673dadc6ff33883 [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07005package rpc
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006
7import (
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08008 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "fmt"
10 "io"
Cosmos Nicolaoubae615a2014-08-27 23:32:31 -070011 "net"
Asim Shankarb54d7642014-06-05 13:08:04 -070012 "reflect"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070013 "strings"
14 "sync"
15 "time"
16
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -070017 "v.io/x/lib/netstate"
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -070018
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -070019 "v.io/v23"
Jiri Simsa6ac95222015-02-23 16:11:49 -080020 "v.io/v23/context"
Matt Rosencrantz88be1182015-04-27 13:45:43 -070021 "v.io/v23/i18n"
Todd Wang5082a552015-04-02 10:56:11 -070022 "v.io/v23/namespace"
Jiri Simsa6ac95222015-02-23 16:11:49 -080023 "v.io/v23/naming"
24 "v.io/v23/options"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070025 "v.io/v23/rpc"
Jiri Simsa6ac95222015-02-23 16:11:49 -080026 "v.io/v23/security"
Todd Wang387d8a42015-03-30 17:09:05 -070027 "v.io/v23/security/access"
Jiri Simsa6ac95222015-02-23 16:11:49 -080028 "v.io/v23/vdl"
29 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080030 "v.io/v23/vom"
31 "v.io/v23/vtrace"
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -070032
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -070033 "v.io/x/ref/lib/apilog"
Jiri Simsa574ec4b2015-08-11 09:31:37 -070034 "v.io/x/ref/lib/pubsub"
Jiri Simsaffceefa2015-02-28 11:03:34 -080035 "v.io/x/ref/lib/stats"
Suharsh Sivakumardcc11d72015-05-11 12:19:20 -070036 "v.io/x/ref/runtime/internal/lib/publisher"
37 inaming "v.io/x/ref/runtime/internal/naming"
38 "v.io/x/ref/runtime/internal/rpc/stream"
39 "v.io/x/ref/runtime/internal/rpc/stream/manager"
40 "v.io/x/ref/runtime/internal/rpc/stream/vc"
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070041)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -080042
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070043var (
44 // These errors are intended to be used as arguments to higher
45 // level errors and hence {1}{2} is omitted from their format
46 // strings to avoid repeating these n-times in the final error
47 // message visible to the user.
48 errResponseEncoding = reg(".errResponseEncoding", "failed to encode RPC response {3} <-> {4}{:5}")
49 errResultEncoding = reg(".errResultEncoding", "failed to encode result #{3} [{4}]{:5}")
50 errFailedToResolveToEndpoint = reg(".errFailedToResolveToEndpoint", "failed to resolve {3} to an endpoint")
51 errFailedToResolveProxy = reg(".errFailedToResolveProxy", "failed to resolve proxy {3}{:4}")
52 errFailedToListenForProxy = reg(".errFailedToListenForProxy", "failed to listen on {3}{:4}")
Suharsh Sivakumardcc11d72015-05-11 12:19:20 -070053 errInternalTypeConversion = reg(".errInternalTypeConversion", "failed to convert {3} to v.io/x/ref/runtime/internal/naming.Endpoint")
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -070054 errFailedToParseIP = reg(".errFailedToParseIP", "failed to parse {3} as an IP host")
Ali Ghassemibac34032015-04-30 18:27:57 -070055 errUnexpectedSuffix = reg(".errUnexpectedSuffix", "suffix {3} was not expected because either server has the option IsLeaf set to true or it served an object and not a dispatcher")
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -070056 errNoListeners = reg(".errNoListeners", "failed to ceate any listeners{:3}")
Jiri Simsa5293dcb2014-05-10 09:56:38 -070057)
58
Matt Rosencrantz5efd7822015-09-15 18:07:17 -070059type DeprecatedServer interface {
60 // Listen creates a listening network endpoint for the Server
61 // as specified by its ListenSpec parameter. If any of the listen
62 // addresses passed in the ListenSpec are 'unspecified' (e.g. don't
63 // include a fixed address such as in ":0") and the ListenSpec includes
64 // a Publisher, then 'roaming' support will be enabled. In this mode
65 // the server will listen for changes in the network configuration
66 // using a Stream created on the supplied Publisher and change the
67 // set of Endpoints it publishes to the mount table accordingly.
68 // The set of expected Settings received over the Stream is defined
69 // by the New<setting>Functions above. The Publisher is ignored if
70 // all of the addresses are specified.
71 //
72 // Listen may be called multiple times, but it must be called before
73 // Serve or ServeDispatcher.
74 //
75 // Listen returns the set of endpoints that can be used to reach
76 // this server. A single listen address in the ListenSpec can lead
77 // to multiple such endpoints (e.g. :0 on a device with multiple interfaces
78 // or that is being proxied). In the case where multiple listen addresses
79 // are used it is not possible to tell which listen address supports which
80 // Endpoint. If there is need to associate endpoints with specific
81 // listen addresses then Listen should be called separately for each one.
82 //
83 // Any non-nil value of error can be converted to a verror.E. If
84 // error is nil and at least one address was supplied in the ListenSpec
85 // then ListenEndpoints will include at least one Endpoint.
86 Listen(spec rpc.ListenSpec) ([]naming.Endpoint, error)
87
88 // Serve associates object with name by publishing the address of this
89 // server with the mount table under the supplied name and using
90 // authorizer to authorize access to it. RPCs invoked on the supplied
91 // name will be delivered to methods implemented by the supplied object.
92 //
93 // Reflection is used to match requests to the object's method set. As
94 // a special-case, if the object implements the Invoker interface, the
95 // Invoker is used to invoke methods directly, without reflection.
96 //
97 // If name is an empty string, no attempt will made to publish that
98 // name to a mount table.
99 //
100 // It is an error to call Serve if ServeDispatcher has already been
101 // called. It is also an error to call Serve multiple times.
102 // It is considered an error to call Listen after Serve.
103 Serve(name string, object interface{}, auth security.Authorizer) error
104
105 // ServeDispatcher associates dispatcher with the portion of the mount
106 // table's name space for which name is a prefix, by publishing the
107 // address of this dispatcher with the mount table under the supplied
108 // name.
109 //
110 // If name is an empty string, no attempt will made to publish that name
111 // to a mount table.
112 //
113 // RPCs invoked on the supplied name will be delivered to the supplied
114 // Dispatcher's Lookup method which will in turn return the object
115 // and security.Authorizer used to serve the actual RPC call.
116 // If name is an empty string, no attempt will made to publish that
117 // name to a mount table.
118 //
119 // It is an error to call ServeDispatcher if Serve has already been
120 // called. It is also an error to call ServeDispatcher multiple times.
121 // It is considered an error to call Listen after ServeDispatcher.
122 ServeDispatcher(name string, disp rpc.Dispatcher) error
123
124 rpc.Server
125}
126
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800127// state for each requested listen address
128type listenState struct {
129 protocol, address string
130 ln stream.Listener
131 lep naming.Endpoint
132 lnerr, eperr error
133 roaming bool
134 // We keep track of all of the endpoints, the port and a copy of
135 // the original listen endpoint for use with roaming network changes.
136 ieps []*inaming.Endpoint // list of currently active eps
137 port string // port to use for creating new eps
138 protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
139}
140
141// state for each requested proxy
142type proxyState struct {
143 endpoint naming.Endpoint
Mike Burrowsdc6b3602015-02-05 15:52:12 -0800144 err error
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800145}
146
147type dhcpState struct {
148 name string
Cosmos Nicolaou11c0ca12015-04-23 16:23:43 -0700149 publisher *pubsub.Publisher
150 stream *pubsub.Stream
151 ch chan pubsub.Setting // channel to receive dhcp settings over
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800152 err error // error status.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700153 watchers map[chan<- rpc.NetworkChange]struct{}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800154}
155
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700156type server struct {
157 sync.Mutex
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800158 // context used by the server to make internal RPCs, error messages etc.
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700159 ctx *context.T
160 cancel context.CancelFunc // function to cancel the above context.
161 state serverState // track state of the server.
162 streamMgr stream.Manager // stream manager to listen for new flows.
163 publisher publisher.Publisher // publisher to publish mounttable mounts.
Asim Shankar99b18a72015-04-25 23:19:28 -0700164 dc vc.DischargeClient // fetches discharges of blessings
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700165 listenerOpts []stream.ListenerOpt // listener opts for Listen.
166 settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
167 settingsName string // pubwsub stream name for dhcp
168 dhcpState *dhcpState // dhcpState, nil if not using dhcp
169 principal security.Principal
170 blessings security.Blessings
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800171
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800172 // maps that contain state on listeners.
173 listenState map[*listenState]struct{}
174 listeners map[stream.Listener]struct{}
175
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800176 // state of proxies keyed by the name of the proxy
177 proxies map[string]proxyState
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800178
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700179 disp rpc.Dispatcher // dispatcher to serve RPCs
180 dispReserved rpc.Dispatcher // dispatcher for reserved methods
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800181 active sync.WaitGroup // active goroutines we've spawned.
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800182 stoppedChan chan struct{} // closed when the server has been stopped.
183 preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
Jungho Ahn25545d32015-01-26 15:14:14 -0800184 // We cache the IP networks on the device since it is not that cheap to read
185 // network interfaces through os syscall.
186 // TODO(jhahn): Add monitoring the network interface changes.
187 ipNets []*net.IPNet
Todd Wang5082a552015-04-02 10:56:11 -0700188 ns namespace.T
Jungho Ahn25545d32015-01-26 15:14:14 -0800189 servesMountTable bool
Robin Thellend89e95232015-03-24 13:48:48 -0700190 isLeaf bool
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800191
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700192 // TODO(cnicolaou): add roaming stats to rpcStats
193 stats *rpcStats // stats for this server.
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700194}
195
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800196type serverState int
197
198const (
199 initialized serverState = iota
200 listening
201 serving
202 publishing
203 stopping
204 stopped
205)
206
207// Simple state machine for the server implementation.
208type next map[serverState]bool
209type transitions map[serverState]next
210
211var (
212 states = transitions{
213 initialized: next{listening: true, stopping: true},
214 listening: next{listening: true, serving: true, stopping: true},
215 serving: next{publishing: true, stopping: true},
216 publishing: next{publishing: true, stopping: true},
217 stopping: next{},
218 stopped: next{},
219 }
220
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700221 externalStates = map[serverState]rpc.ServerState{
Matt Rosencrantz8fd0f482015-09-15 17:42:12 -0700222 initialized: rpc.ServerActive,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700223 listening: rpc.ServerActive,
224 serving: rpc.ServerActive,
225 publishing: rpc.ServerActive,
226 stopping: rpc.ServerStopping,
227 stopped: rpc.ServerStopped,
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800228 }
229)
230
231func (s *server) allowed(next serverState, method string) error {
232 if states[s.state][next] {
233 s.state = next
234 return nil
235 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800236 return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800237}
238
239func (s *server) isStopState() bool {
240 return s.state == stopping || s.state == stopped
241}
242
Matt Rosencrantz5efd7822015-09-15 18:07:17 -0700243var _ DeprecatedServer = (*server)(nil)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -0700244
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -0700245func InternalNewServer(
246 ctx *context.T,
247 streamMgr stream.Manager,
248 ns namespace.T,
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700249 settingsPublisher *pubsub.Publisher,
250 settingsName string,
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -0700251 client rpc.Client,
Matt Rosencrantz5efd7822015-09-15 18:07:17 -0700252 opts ...rpc.ServerOpt) (DeprecatedServer, error) {
Matt Rosencrantz1094d062015-01-30 06:43:12 -0800253 ctx, cancel := context.WithRootCancel(ctx)
Todd Wangad492042015-04-17 15:58:40 -0700254 ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700255 statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700256 s := &server{
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700257 ctx: ctx,
258 cancel: cancel,
259 streamMgr: streamMgr,
260 publisher: publisher.New(ctx, ns, publishPeriod),
261 listenState: make(map[*listenState]struct{}),
262 listeners: make(map[stream.Listener]struct{}),
263 proxies: make(map[string]proxyState),
264 stoppedChan: make(chan struct{}),
265
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700266 ns: ns,
267 stats: newRPCStats(statsPrefix),
268 settingsPublisher: settingsPublisher,
269 settingsName: settingsName,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700270 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700271 var (
272 dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
273 securityLevel options.SecurityLevel
274 )
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700275 ipNets, err := ipNetworks()
276 if err != nil {
277 return nil, err
278 }
279 s.ipNets = ipNets
280
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700281 for _, opt := range opts {
Bogdan Caprita187269b2014-05-13 19:59:46 -0700282 switch opt := opt.(type) {
283 case stream.ListenerOpt:
284 // Collect all ServerOpts that are also ListenerOpts.
285 s.listenerOpts = append(s.listenerOpts, opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800286 switch opt := opt.(type) {
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800287 case vc.DischargeExpiryBuffer:
288 dischargeExpiryBuffer = time.Duration(opt)
Bogdan Capritae7376312014-11-10 13:13:17 -0800289 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700290 case options.ServerBlessings:
291 s.blessings = opt.Blessings
Asim Shankarcc044212014-10-15 23:25:26 -0700292 case options.ServesMountTable:
Cosmos Nicolaoue6e87f12014-06-03 14:29:10 -0700293 s.servesMountTable = bool(opt)
Ali Ghassemibac34032015-04-30 18:27:57 -0700294 case options.IsLeaf:
295 s.isLeaf = bool(opt)
Suharsh Sivakumard7a65192015-01-27 22:57:15 -0800296 case ReservedNameDispatcher:
Todd Wang5739dda2014-11-16 22:44:02 -0800297 s.dispReserved = opt.Dispatcher
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800298 case PreferredServerResolveProtocols:
299 s.preferredProtocols = []string(opt)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700300 case options.SecurityLevel:
301 securityLevel = opt
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700302
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700303 }
304 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700305
306 authenticateVC := true
307
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700308 if securityLevel == options.SecurityNone {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700309 authenticateVC = false
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700310 s.blessings = security.Blessings{}
311 s.dispReserved = nil
312 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700313 if authenticateVC {
314 s.principal = v23.GetPrincipal(ctx)
315 if s.blessings.IsZero() && s.principal != nil {
316 s.blessings = s.principal.BlessingStore().Default()
317 }
318 }
319
Suharsh Sivakumar08918582015-03-03 15:16:36 -0800320 // Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700321 // the discharges by the time the VC asks for them.
Asim Shankar99b18a72015-04-25 23:19:28 -0700322 s.dc = InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
323 s.listenerOpts = append(s.listenerOpts, s.dc)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700324 s.listenerOpts = append(s.listenerOpts, stream.AuthenticatedVC(authenticateVC))
Bogdan Capritae7376312014-11-10 13:13:17 -0800325 blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
Asim Shankar2bf7b1e2015-02-27 00:45:12 -0800326 // TODO(caprita): revist printing the blessings with %s, and
327 // instead expose them as a list.
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700328 stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700329 if s.principal != nil {
Bogdan Capritae7376312014-11-10 13:13:17 -0800330 stats.NewStringFunc(blessingsStatsName, func() string {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700331 return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
Bogdan Capritae7376312014-11-10 13:13:17 -0800332 })
333 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700334 return s, nil
335}
336
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700337func (s *server) Status() rpc.ServerStatus {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700338 defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700339 status := rpc.ServerStatus{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700340 s.Lock()
341 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800342 status.State = externalStates[s.state]
343 status.ServesMountTable = s.servesMountTable
344 status.Mounts = s.publisher.Status()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800345 status.Endpoints = []naming.Endpoint{}
346 for ls, _ := range s.listenState {
347 if ls.eperr != nil {
348 status.Errors = append(status.Errors, ls.eperr)
349 }
350 if ls.lnerr != nil {
351 status.Errors = append(status.Errors, ls.lnerr)
352 }
353 for _, iep := range ls.ieps {
354 status.Endpoints = append(status.Endpoints, iep)
355 }
356 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700357 status.Proxies = make([]rpc.ProxyStatus, 0, len(s.proxies))
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800358 for k, v := range s.proxies {
Jiri Simsad9a7b3c2015-08-12 16:38:27 -0700359 status.Proxies = append(status.Proxies, rpc.ProxyStatus{
360 Proxy: k,
361 Endpoint: v.endpoint,
362 Error: v.err,
363 })
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700364 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800365 return status
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700366}
367
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700368func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700369 defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800370 s.Lock()
371 defer s.Unlock()
372 if s.dhcpState != nil {
373 s.dhcpState.watchers[ch] = struct{}{}
374 }
375}
376
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700377func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700378 defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800379 s.Lock()
380 defer s.Unlock()
381 if s.dhcpState != nil {
382 delete(s.dhcpState.watchers, ch)
383 }
384}
385
Robin Thellend92b65a42014-12-17 14:30:16 -0800386// resolveToEndpoint resolves an object name or address to an endpoint.
387func (s *server) resolveToEndpoint(address string) (string, error) {
Asim Shankaraae31802015-01-22 11:59:42 -0800388 var resolved *naming.MountEntry
389 var err error
Asim Shankardee311d2014-08-01 17:41:31 -0700390 if s.ns != nil {
Asim Shankaraae31802015-01-22 11:59:42 -0800391 if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
Asim Shankardee311d2014-08-01 17:41:31 -0700392 return "", err
393 }
394 } else {
Asim Shankaraae31802015-01-22 11:59:42 -0800395 // Fake a namespace resolution
396 resolved = &naming.MountEntry{Servers: []naming.MountedServer{
397 {Server: address},
398 }}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700399 }
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800400 // An empty set of protocols means all protocols...
Jungho Ahn25545d32015-01-26 15:14:14 -0800401 if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
Nicolas LaCasse55a10f32014-11-26 13:25:53 -0800402 return "", err
403 }
Asim Shankaraae31802015-01-22 11:59:42 -0800404 for _, n := range resolved.Names() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700405 address, suffix := naming.SplitAddressName(n)
David Why Use Two When One Will Do Presottoadf0ca12014-11-13 10:49:01 -0800406 if suffix != "" {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700407 continue
408 }
Asim Shankaraae31802015-01-22 11:59:42 -0800409 if ep, err := inaming.NewEndpoint(address); err == nil {
410 return ep.String(), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700411 }
412 }
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700413 return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700414}
415
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800416// createEndpoints creates appropriate inaming.Endpoint instances for
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800417// all of the externally accessible network addresses that can be used
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800418// to reach this server.
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700419func (s *server) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800420 iep, ok := lep.(*inaming.Endpoint)
421 if !ok {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700422 return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800423 }
424 if !strings.HasPrefix(iep.Protocol, "tcp") &&
425 !strings.HasPrefix(iep.Protocol, "ws") {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800426 // If not tcp, ws, or wsh, just return the endpoint we were given.
427 return []*inaming.Endpoint{iep}, "", false, nil
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800428 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800429 host, port, err := net.SplitHostPort(iep.Address)
430 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800431 return nil, "", false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800432 }
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700433 addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800434 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800435 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800436 }
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700437
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800438 ieps := make([]*inaming.Endpoint, 0, len(addrs))
439 for _, addr := range addrs {
440 n, err := inaming.NewEndpoint(lep.String())
441 if err != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800442 return nil, port, false, err
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800443 }
444 n.IsMountTable = s.servesMountTable
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700445 n.Address = net.JoinHostPort(addr.String(), port)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800446 ieps = append(ieps, n)
447 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800448 return ieps, port, unspecified, nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800449}
450
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700451func (s *server) Listen(listenSpec rpc.ListenSpec) ([]naming.Endpoint, error) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700452 defer apilog.LogCallf(nil, "listenSpec=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800453 useProxy := len(listenSpec.Proxy) > 0
454 if !useProxy && len(listenSpec.Addrs) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800455 return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800456 }
457
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700458 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800459 defer s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800460
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800461 if err := s.allowed(listening, "Listen"); err != nil {
462 return nil, err
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700463 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700464
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800465 // Start the proxy as early as possible, ignore duplicate requests
466 // for the same proxy.
467 if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
Asim Shankar6b1c8b02015-04-26 22:09:45 -0700468 // Pre-emptively fetch discharges on the blessings (they will be cached
469 // within s.dc for future calls).
470 // This shouldn't be required, but is a hack to reduce flakiness in
471 // JavaScript browser integration tests.
472 // See https://v.io/i/392
473 s.dc.PrepareDischarges(s.ctx, s.blessings.ThirdPartyCaveats(), security.DischargeImpetus{})
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800474 // We have a goroutine for listening on proxy connections.
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800475 s.active.Add(1)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800476 go func() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800477 s.proxyListenLoop(listenSpec.Proxy)
478 s.active.Done()
479 }()
480 }
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700481
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800482 roaming := false
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800483 lnState := make([]*listenState, 0, len(listenSpec.Addrs))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800484 for _, addr := range listenSpec.Addrs {
485 if len(addr.Address) > 0 {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800486 // Listen if we have a local address to listen on.
487 ls := &listenState{
488 protocol: addr.Protocol,
489 address: addr.Address,
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800490 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700491 ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(s.ctx, addr.Protocol, addr.Address, s.blessings, s.listenerOpts...)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800492 lnState = append(lnState, ls)
493 if ls.lnerr != nil {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700494 s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800495 continue
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800496 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800497 ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
498 if ls.roaming && ls.eperr == nil {
499 ls.protoIEP = *ls.lep.(*inaming.Endpoint)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800500 roaming = true
501 }
502 }
503 }
504
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800505 found := false
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700506 var lastErr error
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800507 for _, ls := range lnState {
508 if ls.ln != nil {
509 found = true
510 break
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800511 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700512 if ls.lnerr != nil {
513 lastErr = ls.lnerr
514 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800515 }
516 if !found && !useProxy {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700517 return nil, verror.New(verror.ErrBadArg, s.ctx, verror.New(errNoListeners, s.ctx, lastErr))
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800518 }
519
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700520 if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800521 // Create a dhcp listener if we haven't already done so.
522 dhcp := &dhcpState{
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700523 name: s.settingsName,
524 publisher: s.settingsPublisher,
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700525 watchers: make(map[chan<- rpc.NetworkChange]struct{}),
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800526 }
527 s.dhcpState = dhcp
Cosmos Nicolaou11c0ca12015-04-23 16:23:43 -0700528 dhcp.ch = make(chan pubsub.Setting, 10)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800529 dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
530 if dhcp.err == nil {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800531 // We have a goroutine to listen for dhcp changes.
532 s.active.Add(1)
533 go func() {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800534 s.dhcpLoop(dhcp.ch)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800535 s.active.Done()
536 }()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800537 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800538 }
539
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800540 eps := make([]naming.Endpoint, 0, 10)
541 for _, ls := range lnState {
542 s.listenState[ls] = struct{}{}
543 if ls.ln != nil {
544 // We have a goroutine per listener to accept new flows.
545 // Each flow is served from its own goroutine.
546 s.active.Add(1)
547 go func(ln stream.Listener, ep naming.Endpoint) {
548 s.listenLoop(ln, ep)
549 s.active.Done()
550 }(ls.ln, ls.lep)
551 }
552
553 for _, iep := range ls.ieps {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800554 eps = append(eps, iep)
555 }
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800556 }
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800557
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -0800558 return eps, nil
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700559}
560
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800561func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
Robin Thellend92b65a42014-12-17 14:30:16 -0800562 resolved, err := s.resolveToEndpoint(proxy)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800563 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700564 return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800565 }
Asim Shankar99b18a72015-04-25 23:19:28 -0700566 opts := append([]stream.ListenerOpt{proxyAuth{s}}, s.listenerOpts...)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700567 ln, ep, err := s.streamMgr.Listen(s.ctx, inaming.Network, resolved, s.blessings, opts...)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800568 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700569 return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800570 }
571 iep, ok := ep.(*inaming.Endpoint)
572 if !ok {
573 ln.Close()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700574 return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800575 }
576 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800577 s.proxies[proxy] = proxyState{iep, nil}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800578 s.Unlock()
Robin Thellende22920e2015-02-05 17:15:50 -0800579 iep.IsMountTable = s.servesMountTable
Robin Thellendb457df92015-03-30 09:42:15 -0700580 iep.IsLeaf = s.isLeaf
Robin Thellend89e95232015-03-24 13:48:48 -0700581 s.publisher.AddServer(iep.String())
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800582 return iep, ln, nil
583}
584
585func (s *server) proxyListenLoop(proxy string) {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700586 const (
587 min = 5 * time.Millisecond
588 max = 5 * time.Minute
589 )
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800590
591 iep, ln, err := s.reconnectAndPublishProxy(proxy)
592 if err != nil {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700593 s.ctx.Errorf("Failed to connect to proxy: %s", err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800594 }
595 // the initial connection maybe have failed, but we enter the retry
596 // loop anyway so that we will continue to try and connect to the
597 // proxy.
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800598 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800599 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800600 s.Unlock()
601 return
602 }
603 s.Unlock()
604
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700605 for {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800606 if ln != nil && iep != nil {
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800607 err := s.listenLoop(ln, iep)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800608 // The listener is done, so:
609 // (1) Unpublish its name
Cosmos Nicolaou8bd8e102015-01-13 21:52:53 -0800610 s.publisher.RemoveServer(iep.String())
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800611 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800612 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800613 s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800614 } else {
Asim Shankar7171a252015-03-07 14:41:40 -0800615 // err will be nil if we're stopping.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800616 s.proxies[proxy] = proxyState{iep, nil}
617 s.Unlock()
618 return
619 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800620 s.Unlock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800621 }
622
623 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800624 if s.isStopState() {
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800625 s.Unlock()
626 return
627 }
628 s.Unlock()
629
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700630 // (2) Reconnect to the proxy unless the server has been stopped
631 backoff := min
632 ln = nil
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800633 for {
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700634 select {
635 case <-time.After(backoff):
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700636 if backoff = backoff * 2; backoff > max {
637 backoff = max
638 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700639 case <-s.stoppedChan:
640 return
641 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800642 // (3) reconnect, publish new address
643 if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700644 s.ctx.Errorf("Failed to reconnect to proxy %q: %s", proxy, err)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800645 } else {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700646 s.ctx.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800647 break
648 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700649 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700650 }
651}
652
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800653// addListener adds the supplied listener taking care to
654// check to see if we're already stopping. It returns true
655// if the listener was added.
656func (s *server) addListener(ln stream.Listener) bool {
657 s.Lock()
658 defer s.Unlock()
659 if s.isStopState() {
660 return false
661 }
662 s.listeners[ln] = struct{}{}
663 return true
664}
665
666// rmListener removes the supplied listener taking care to
667// check if we're already stopping. It returns true if the
668// listener was removed.
669func (s *server) rmListener(ln stream.Listener) bool {
670 s.Lock()
671 defer s.Unlock()
672 if s.isStopState() {
673 return false
674 }
675 delete(s.listeners, ln)
676 return true
677}
678
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800679func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700680 defer s.ctx.VI(1).Infof("rpc: Stopped listening on %s", ep)
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800681 var calls sync.WaitGroup
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800682
683 if !s.addListener(ln) {
684 // We're stopping.
685 return nil
686 }
687
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700688 defer func() {
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800689 calls.Wait()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800690 s.rmListener(ln)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700691 }()
692 for {
693 flow, err := ln.Accept()
694 if err != nil {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700695 s.ctx.VI(10).Infof("rpc: Accept on %v failed: %v", ep, err)
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800696 return err
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700697 }
Cosmos Nicolaoueef1fab2014-11-11 18:23:41 -0800698 calls.Add(1)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700699 go func(flow stream.Flow) {
Todd Wang34ed4c62014-11-26 15:15:52 -0800700 defer calls.Done()
701 fs, err := newFlowServer(flow, s)
702 if err != nil {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700703 s.ctx.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
Todd Wang34ed4c62014-11-26 15:15:52 -0800704 return
705 }
706 if err := fs.serve(); err != nil {
Todd Wang5739dda2014-11-16 22:44:02 -0800707 // TODO(caprita): Logging errors here is too spammy. For example, "not
708 // authorized" errors shouldn't be logged as server errors.
Cosmos Nicolaou93dd88b2015-02-19 15:10:53 -0800709 // TODO(cnicolaou): revisit this when verror2 transition is
710 // done.
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800711 if err != io.EOF {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700712 s.ctx.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800713 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700714 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700715 }(flow)
716 }
717}
718
Cosmos Nicolaou11c0ca12015-04-23 16:23:43 -0700719func (s *server) dhcpLoop(ch chan pubsub.Setting) {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700720 defer s.ctx.VI(1).Infof("rpc: Stopped listen for dhcp changes")
721 s.ctx.VI(2).Infof("rpc: dhcp loop")
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800722 for setting := range ch {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700723 if setting == nil {
724 return
725 }
726 switch v := setting.Value().(type) {
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700727 case []net.Addr:
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700728 s.Lock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800729 if s.isStopState() {
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700730 s.Unlock()
731 return
732 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700733 change := rpc.NetworkChange{
Cosmos Nicolaou00fe9a42015-04-24 14:18:01 -0700734 Time: time.Now(),
735 State: externalStates[s.state],
736 }
737 switch setting.Name() {
738 case NewAddrsSetting:
739 change.Changed = s.addAddresses(v)
740 change.AddedAddrs = v
741 case RmAddrsSetting:
742 change.Changed, change.Error = s.removeAddresses(v)
743 change.RemovedAddrs = v
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800744 }
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700745 s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800746 for ch, _ := range s.dhcpState.watchers {
747 select {
748 case ch <- change:
749 default:
750 }
751 }
752 s.Unlock()
753 default:
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700754 s.ctx.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700755 }
756 }
757}
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800758
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700759func getHost(address net.Addr) string {
760 host, _, err := net.SplitHostPort(address.String())
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800761 if err == nil {
762 return host
763 }
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700764 return address.String()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800765
766}
767
768// Remove all endpoints that have the same host address as the supplied
769// address parameter.
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700770func (s *server) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800771 var removed []naming.Endpoint
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700772 for _, address := range addrs {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800773 host := getHost(address)
774 for ls, _ := range s.listenState {
775 if ls != nil && ls.roaming && len(ls.ieps) > 0 {
776 remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
777 for _, iep := range ls.ieps {
778 lnHost, _, err := net.SplitHostPort(iep.Address)
779 if err != nil {
780 lnHost = iep.Address
781 }
782 if lnHost == host {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700783 s.ctx.VI(2).Infof("rpc: dhcp removing: %s", iep)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800784 removed = append(removed, iep)
785 s.publisher.RemoveServer(iep.String())
786 continue
787 }
788 remaining = append(remaining, iep)
789 }
790 ls.ieps = remaining
791 }
792 }
793 }
794 return removed, nil
795}
796
797// Add new endpoints for the new address. There is no way to know with
798// 100% confidence which new endpoints to publish without shutting down
799// all network connections and reinitializing everything from scratch.
800// Instead, we find all roaming listeners with at least one endpoint
801// and create a new endpoint with the same port as the existing ones
802// but with the new address supplied to us to by the dhcp code. As
803// an additional safeguard we reject the new address if it is not
804// externally accessible.
805// This places the onus on the dhcp/roaming code that sends us addresses
806// to ensure that those addresses are externally reachable.
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700807func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800808 var added []naming.Endpoint
Cosmos Nicolaouaa87e292015-04-21 22:15:50 -0700809 for _, address := range netstate.ConvertToAddresses(addrs) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800810 if !netstate.IsAccessibleIP(address) {
811 return added
812 }
813 host := getHost(address)
814 for ls, _ := range s.listenState {
815 if ls != nil && ls.roaming {
816 niep := ls.protoIEP
817 niep.Address = net.JoinHostPort(host, ls.port)
Robin Thellendb457df92015-03-30 09:42:15 -0700818 niep.IsMountTable = s.servesMountTable
819 niep.IsLeaf = s.isLeaf
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800820 ls.ieps = append(ls.ieps, &niep)
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700821 s.ctx.VI(2).Infof("rpc: dhcp adding: %s", niep)
Robin Thellend89e95232015-03-24 13:48:48 -0700822 s.publisher.AddServer(niep.String())
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800823 added = append(added, &niep)
824 }
825 }
826 }
827 return added
828}
Cosmos Nicolaouef323db2014-09-07 22:13:28 -0700829
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800830type leafDispatcher struct {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700831 invoker rpc.Invoker
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800832 auth security.Authorizer
833}
834
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -0700835func (d leafDispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700836 defer apilog.LogCallf(nil, "suffix=%.10s...", suffix)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800837 if suffix != "" {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700838 return nil, nil, verror.New(verror.ErrUnknownSuffix, nil, suffix)
Bogdan Caprita7590a6d2015-01-08 13:43:40 -0800839 }
840 return d.invoker, d.auth, nil
841}
842
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800843func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700844 defer apilog.LogCallf(nil, "name=%.10s...,obj=,authorizer=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800845 if obj == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800846 return verror.New(verror.ErrBadArg, s.ctx, "nil object")
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800847 }
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800848 invoker, err := objectToInvoker(obj)
849 if err != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800850 return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800851 }
Robin Thellendb457df92015-03-30 09:42:15 -0700852 s.setLeaf(true)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -0800853 return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
Cosmos Nicolaou61c96c72014-11-03 11:57:56 -0800854}
855
Robin Thellendb457df92015-03-30 09:42:15 -0700856func (s *server) setLeaf(value bool) {
857 s.Lock()
858 defer s.Unlock()
859 s.isLeaf = value
860 for ls, _ := range s.listenState {
861 for i := range ls.ieps {
862 ls.ieps[i].IsLeaf = s.isLeaf
863 }
864 }
865}
866
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700867func (s *server) ServeDispatcher(name string, disp rpc.Dispatcher) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700868 defer apilog.LogCallf(nil, "name=%.10s...,disp=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800869 if disp == nil {
Jiri Simsa074bf362015-02-17 09:29:45 -0800870 return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700871 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800872 s.Lock()
873 defer s.Unlock()
874 if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
875 return err
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700876 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800877 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800878 s.disp = disp
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700879 if len(name) > 0 {
Robin Thellendb457df92015-03-30 09:42:15 -0700880 for ls, _ := range s.listenState {
881 for _, iep := range ls.ieps {
882 s.publisher.AddServer(iep.String())
883 }
884 }
Robin Thellend89e95232015-03-24 13:48:48 -0700885 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -0700886 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700887 return nil
888}
889
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800890func (s *server) AddName(name string) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700891 defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800892 if len(name) == 0 {
Jiri Simsa074bf362015-02-17 09:29:45 -0800893 return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
Ali Ghassemi3c6db7b2014-11-10 17:20:26 -0800894 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800895 s.Lock()
896 defer s.Unlock()
897 if err := s.allowed(publishing, "AddName"); err != nil {
898 return err
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800899 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800900 vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
Robin Thellend89e95232015-03-24 13:48:48 -0700901 s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800902 return nil
903}
904
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800905func (s *server) RemoveName(name string) {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700906 defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800907 s.Lock()
908 defer s.Unlock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800909 if err := s.allowed(publishing, "RemoveName"); err != nil {
910 return
911 }
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800912 vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800913 s.publisher.RemoveName(name)
Cosmos Nicolaou92dba582014-11-05 17:24:10 -0800914}
915
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700916func (s *server) Stop() error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -0700917 defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Bogdan Caprita0ca02162015-04-30 11:21:44 -0700918 serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -0700919 s.ctx.VI(1).Infof("Stop: %s", serverDebug)
920 defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700921 s.Lock()
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800922 if s.isStopState() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700923 s.Unlock()
924 return nil
925 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800926 s.state = stopping
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700927 close(s.stoppedChan)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700928 s.Unlock()
929
Robin Thellenddf428232014-10-06 12:50:44 -0700930 // Delete the stats object.
931 s.stats.stop()
932
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700933 // Note, It's safe to Stop/WaitForStop on the publisher outside of the
934 // server lock, since publisher is safe for concurrent access.
935
936 // Stop the publisher, which triggers unmounting of published names.
937 s.publisher.Stop()
938 // Wait for the publisher to be done unmounting before we can proceed to
939 // close the listeners (to minimize the number of mounted names pointing
940 // to endpoint that are no longer serving).
941 //
942 // TODO(caprita): See if make sense to fail fast on rejecting
943 // connections once listeners are closed, and parallelize the publisher
944 // and listener shutdown.
945 s.publisher.WaitForStop()
946
947 s.Lock()
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800948
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700949 // Close all listeners. No new flows will be accepted, while in-flight
950 // flows will continue until they terminate naturally.
951 nListeners := len(s.listeners)
952 errCh := make(chan error, nListeners)
Cosmos Nicolaoubc743142014-10-06 21:27:18 -0700953
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800954 for ln, _ := range s.listeners {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700955 go func(ln stream.Listener) {
956 errCh <- ln.Close()
957 }(ln)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800958 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -0800959
Cosmos Nicolaou11c0ca12015-04-23 16:23:43 -0700960 drain := func(ch chan pubsub.Setting) {
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800961 for {
962 select {
963 case v := <-ch:
964 if v == nil {
965 return
966 }
967 default:
968 close(ch)
969 return
970 }
971 }
972 }
973
974 if dhcp := s.dhcpState; dhcp != nil {
Cosmos Nicolaouaceb8d92015-02-05 20:44:02 -0800975 // TODO(cnicolaou,caprita): investigate not having to close and drain
976 // the channel here. It's a little awkward right now since we have to
977 // be careful to not close the channel in two places, i.e. here and
978 // and from the publisher's Shutdown method.
979 if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
980 drain(dhcp.ch)
981 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700982 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800983
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700984 s.Unlock()
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800985
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700986 var firstErr error
987 for i := 0; i < nListeners; i++ {
988 if err := <-errCh; err != nil && firstErr == nil {
989 firstErr = err
990 }
991 }
992 // At this point, we are guaranteed that no new requests are going to be
993 // accepted.
994
995 // Wait for the publisher and active listener + flows to finish.
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -0800996 done := make(chan struct{}, 1)
997 go func() { s.active.Wait(); done <- struct{}{} }()
998
999 select {
1000 case <-done:
Bogdan Caprita0ca02162015-04-30 11:21:44 -07001001 case <-time.After(5 * time.Second):
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -07001002 s.ctx.Errorf("%s: Listener Close Error: %v", serverDebug, firstErr)
1003 s.ctx.Errorf("%s: Timedout waiting for goroutines to stop: listeners: %d (currently: %d)", serverDebug, nListeners, len(s.listeners))
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -08001004 for ln, _ := range s.listeners {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -07001005 s.ctx.Errorf("%s: Listener: %v", serverDebug, ln)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -08001006 }
1007 for ls, _ := range s.listenState {
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -07001008 s.ctx.Errorf("%s: ListenState: %v", serverDebug, ls)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -08001009 }
1010 <-done
Cosmos Nicolaoue9c622d2015-07-10 11:09:42 -07001011 s.ctx.Infof("%s: Done waiting.", serverDebug)
Cosmos Nicolaou1b3594d2015-02-01 10:05:03 -08001012 }
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -08001013
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001014 s.Lock()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001015 defer s.Unlock()
Cosmos Nicolaoufdc838b2014-06-30 21:44:27 -07001016 s.disp = nil
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001017 if firstErr != nil {
Jiri Simsa074bf362015-02-17 09:29:45 -08001018 return verror.New(verror.ErrInternal, s.ctx, firstErr)
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001019 }
Cosmos Nicolaou9fbe7d22015-01-25 22:13:13 -08001020 s.state = stopped
Matt Rosencrantz1094d062015-01-30 06:43:12 -08001021 s.cancel()
Cosmos Nicolaou28dabfc2014-12-15 22:51:07 -08001022 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001023}
1024
1025// flowServer implements the RPC server-side protocol for a single RPC, over a
1026// flow that's already connected to the client.
1027type flowServer struct {
Todd Wang54feabe2015-04-15 23:38:26 -07001028 ctx *context.T // context associated with the RPC
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001029 server *server // rpc.Server that this flow server belongs to
1030 disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
Todd Wang3425a902015-01-21 18:43:59 -08001031 dec *vom.Decoder // to decode requests and args from the client
1032 enc *vom.Encoder // to encode responses and results to the client
Todd Wang5739dda2014-11-16 22:44:02 -08001033 flow stream.Flow // underlying flow
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001034
Asim Shankar220a0152014-10-30 21:21:09 -07001035 // Fields filled in during the server invocation.
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001036 clientBlessings security.Blessings
1037 ackBlessings bool
1038 grantedBlessings security.Blessings
1039 method, suffix string
1040 tags []*vdl.Value
1041 discharges map[string]security.Discharge
1042 starttime time.Time
1043 endStreamArgs bool // are the stream args at EOF?
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001044}
1045
Todd Wang54feabe2015-04-15 23:38:26 -07001046var (
1047 _ rpc.StreamServerCall = (*flowServer)(nil)
1048 _ security.Call = (*flowServer)(nil)
1049)
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -07001050
Todd Wang34ed4c62014-11-26 15:15:52 -08001051func newFlowServer(flow stream.Flow, server *server) (*flowServer, error) {
Cosmos Nicolaoudcba93d2014-07-30 11:09:26 -07001052 server.Lock()
1053 disp := server.disp
1054 server.Unlock()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001055
Todd Wang34ed4c62014-11-26 15:15:52 -08001056 fs := &flowServer{
Todd Wang54feabe2015-04-15 23:38:26 -07001057 ctx: server.ctx,
Todd Wang34ed4c62014-11-26 15:15:52 -08001058 server: server,
1059 disp: disp,
Todd Wang5739dda2014-11-16 22:44:02 -08001060 flow: flow,
1061 discharges: make(map[string]security.Discharge),
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001062 }
Jungho Ahncfc89622015-04-24 11:27:23 -07001063 typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
1064 if typeenc == nil {
Jungho Ahn5d1fe972015-04-27 17:51:32 -07001065 fs.enc = vom.NewEncoder(flow)
1066 fs.dec = vom.NewDecoder(flow)
Jungho Ahncfc89622015-04-24 11:27:23 -07001067 } else {
Jungho Ahn5d1fe972015-04-27 17:51:32 -07001068 fs.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder))
Jungho Ahncfc89622015-04-24 11:27:23 -07001069 typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
Jungho Ahn5d1fe972015-04-27 17:51:32 -07001070 fs.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder))
Todd Wang34ed4c62014-11-26 15:15:52 -08001071 }
1072 return fs, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001073}
1074
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001075// authorizeVtrace works by simulating a call to __debug/vtrace.Trace. That
1076// rpc is essentially equivalent in power to the data we are attempting to
1077// attach here.
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -07001078func (fs *flowServer) authorizeVtrace(ctx *context.T) error {
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001079 // Set up a context as though we were calling __debug/vtrace.
1080 params := &security.CallParams{}
Todd Wang4264e4b2015-04-16 22:43:40 -07001081 params.Copy(fs)
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001082 params.Method = "Trace"
1083 params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)}
1084 params.Suffix = "__debug/vtrace"
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001085
1086 var auth security.Authorizer
1087 if fs.server.dispReserved != nil {
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -07001088 _, auth, _ = fs.server.dispReserved.Lookup(ctx, params.Suffix)
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001089 }
Todd Wang4264e4b2015-04-16 22:43:40 -07001090 return authorize(fs.ctx, security.NewCall(params), auth)
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001091}
1092
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001093func (fs *flowServer) serve() error {
1094 defer fs.flow.Close()
Matt Rosencrantz86897932014-10-02 09:34:34 -07001095
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001096 results, err := fs.processRequest()
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001097
Todd Wang54feabe2015-04-15 23:38:26 -07001098 vtrace.GetSpan(fs.ctx).Finish()
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001099
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001100 var traceResponse vtrace.Response
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001101 // Check if the caller is permitted to view vtrace data.
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -07001102 if fs.authorizeVtrace(fs.ctx) == nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001103 traceResponse = vtrace.GetResponse(fs.ctx)
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001104 }
1105
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001106 // Respond to the client with the response header and positional results.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001107 response := rpc.Response{
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001108 Error: err,
1109 EndStreamResults: true,
1110 NumPosResults: uint64(len(results)),
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001111 TraceResponse: traceResponse,
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001112 AckBlessings: fs.ackBlessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001113 }
1114 if err := fs.enc.Encode(response); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -08001115 if err == io.EOF {
1116 return err
1117 }
Todd Wang54feabe2015-04-15 23:38:26 -07001118 return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001119 }
1120 if response.Error != nil {
1121 return response.Error
1122 }
1123 for ix, res := range results {
Todd Wangf519f8f2015-01-21 10:07:41 -08001124 if err := fs.enc.Encode(res); err != nil {
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -08001125 if err == io.EOF {
1126 return err
1127 }
Todd Wang54feabe2015-04-15 23:38:26 -07001128 return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001129 }
1130 }
1131 // TODO(ashankar): Should unread data from the flow be drained?
1132 //
1133 // Reason to do so:
Suharsh Sivakumardcc11d72015-05-11 12:19:20 -07001134 // The common stream.Flow implementation (v.io/x/ref/runtime/internal/rpc/stream/vc/reader.go)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001135 // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
1136 // slices will not be returned to the pool leading to possibly increased memory usage.
1137 //
1138 // Reason to not do so:
1139 // Draining here will conflict with any Reads on the flow in a separate goroutine
1140 // (for example, see TestStreamReadTerminatedByServer in full_test.go).
1141 //
1142 // For now, go with the reason to not do so as having unread data in the stream
1143 // should be a rare case.
1144 return nil
1145}
1146
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001147func (fs *flowServer) readRPCRequest() (*rpc.Request, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001148 // Set a default timeout before reading from the flow. Without this timeout,
1149 // a client that sends no request or a partial request will retain the flow
1150 // indefinitely (and lock up server resources).
Matt Rosencrantz86897932014-10-02 09:34:34 -07001151 initTimer := newTimer(defaultCallTimeout)
1152 defer initTimer.Stop()
1153 fs.flow.SetDeadline(initTimer.C)
1154
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001155 // Decode the initial request.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001156 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001157 if err := fs.dec.Decode(&req); err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001158 return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001159 }
Matt Rosencrantz86897932014-10-02 09:34:34 -07001160 return &req, nil
1161}
1162
Todd Wang9548d852015-02-10 16:15:59 -08001163func (fs *flowServer) processRequest() ([]interface{}, error) {
Asim Shankar0cad0832014-11-04 01:27:38 -08001164 fs.starttime = time.Now()
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001165 req, err := fs.readRPCRequest()
Todd Wang9548d852015-02-10 16:15:59 -08001166 if err != nil {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001167 // We don't know what the rpc call was supposed to be, but we'll create
Matt Rosencrantz1fa32772014-10-28 11:31:46 -07001168 // a placeholder span so we can capture annotations.
Todd Wangad492042015-04-17 15:58:40 -07001169 fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
Todd Wang9548d852015-02-10 16:15:59 -08001170 return nil, err
Matt Rosencrantz86897932014-10-02 09:34:34 -07001171 }
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001172 // We must call fs.drainDecoderArgs for any error that occurs
1173 // after this point, and before we actually decode the arguments.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001174 fs.method = req.Method
Todd Wang5739dda2014-11-16 22:44:02 -08001175 fs.suffix = strings.TrimLeft(req.Suffix, "/")
Matt Rosencrantz86897932014-10-02 09:34:34 -07001176
Matt Rosencrantz88be1182015-04-27 13:45:43 -07001177 if req.Language != "" {
1178 fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
1179 }
1180
Matt Rosencrantz9fe60822014-09-12 10:09:53 -07001181 // TODO(mattr): Currently this allows users to trigger trace collection
1182 // on the server even if they will not be allowed to collect the
Matt Rosencrantz3197d6c2014-11-06 09:53:22 -08001183 // results later. This might be considered a DOS vector.
Todd Wang54feabe2015-04-15 23:38:26 -07001184 spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
Todd Wangad492042015-04-17 15:58:40 -07001185 fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001186
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001187 var cancel context.CancelFunc
Todd Wangf6a06882015-02-27 17:38:01 -08001188 if !req.Deadline.IsZero() {
Todd Wang54feabe2015-04-15 23:38:26 -07001189 fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001190 } else {
Todd Wang54feabe2015-04-15 23:38:26 -07001191 fs.ctx, cancel = context.WithCancel(fs.ctx)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001192 }
Todd Wang54feabe2015-04-15 23:38:26 -07001193 fs.flow.SetDeadline(fs.ctx.Done())
Todd Wang5739dda2014-11-16 22:44:02 -08001194 go fs.cancelContextOnClose(cancel)
Matt Rosencrantz137b8d22014-08-18 09:56:15 -07001195
Todd Wang5739dda2014-11-16 22:44:02 -08001196 // Initialize security: blessings, discharges, etc.
Todd Wang9548d852015-02-10 16:15:59 -08001197 if err := fs.initSecurity(req); err != nil {
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001198 fs.drainDecoderArgs(int(req.NumPosArgs))
Todd Wang9548d852015-02-10 16:15:59 -08001199 return nil, err
Andres Erbsenb7f95f32014-07-07 12:07:56 -07001200 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001201 // Lookup the invoker.
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001202 invoker, auth, err := fs.lookup(fs.suffix, fs.method)
Todd Wangebb3b012015-02-09 21:59:05 -08001203 if err != nil {
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001204 fs.drainDecoderArgs(int(req.NumPosArgs))
Todd Wang9548d852015-02-10 16:15:59 -08001205 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001206 }
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001207
1208 // Note that we strip the reserved prefix when calling the invoker so
1209 // that __Glob will call Glob. Note that we've already assigned a
1210 // special invoker so that we never call the wrong method by mistake.
1211 strippedMethod := naming.StripReserved(fs.method)
1212
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001213 // Prepare invoker and decode args.
1214 numArgs := int(req.NumPosArgs)
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -07001215 argptrs, tags, err := invoker.Prepare(fs.ctx, strippedMethod, numArgs)
Asim Shankar0cad0832014-11-04 01:27:38 -08001216 fs.tags = tags
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001217 if err != nil {
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001218 fs.drainDecoderArgs(numArgs)
Todd Wang9548d852015-02-10 16:15:59 -08001219 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001220 }
Todd Wang9548d852015-02-10 16:15:59 -08001221 if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001222 fs.drainDecoderArgs(numArgs)
1223 return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001224 }
1225 for ix, argptr := range argptrs {
1226 if err := fs.dec.Decode(argptr); err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001227 return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001228 }
1229 }
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001230
Todd Wang5739dda2014-11-16 22:44:02 -08001231 // Check application's authorization policy.
Todd Wang4264e4b2015-04-16 22:43:40 -07001232 if err := authorize(fs.ctx, fs, auth); err != nil {
Todd Wang9548d852015-02-10 16:15:59 -08001233 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001234 }
Matt Rosencrantzbf0d9d92015-04-08 12:43:14 -07001235
Todd Wang5739dda2014-11-16 22:44:02 -08001236 // Invoke the method.
Todd Wang54feabe2015-04-15 23:38:26 -07001237 results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
Robin Thellendb16d7162014-11-07 13:47:26 -08001238 fs.server.stats.record(fs.method, time.Since(fs.starttime))
Todd Wang9548d852015-02-10 16:15:59 -08001239 return results, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001240}
1241
Suharsh Sivakumarc095fc22015-06-29 16:00:18 -07001242// drainDecoderArgs drains the next n arguments encoded onto the flows decoder.
1243// This is needed to ensure that the client is able to encode all of its args
1244// before the server closes its flow. This guarantees that the client will
1245// consistently get the server's error response.
1246// TODO(suharshs): Figure out a better way to solve this race condition without
1247// unnecessarily reading all arguments.
1248func (fs *flowServer) drainDecoderArgs(n int) error {
1249 for i := 0; i < n; i++ {
1250 if err := fs.dec.Ignore(); err != nil {
1251 return err
1252 }
1253 }
1254 return nil
1255}
1256
Todd Wang5739dda2014-11-16 22:44:02 -08001257func (fs *flowServer) cancelContextOnClose(cancel context.CancelFunc) {
1258 // Ensure that the context gets cancelled if the flow is closed
1259 // due to a network error, or client cancellation.
1260 select {
1261 case <-fs.flow.Closed():
1262 // Here we remove the contexts channel as a deadline to the flow.
1263 // We do this to ensure clients get a consistent error when they read/write
1264 // after the flow is closed. Since the flow is already closed, it doesn't
1265 // matter that the context is also cancelled.
1266 fs.flow.SetDeadline(nil)
1267 cancel()
Todd Wang54feabe2015-04-15 23:38:26 -07001268 case <-fs.ctx.Done():
Robin Thellendc26c32e2014-10-06 17:44:04 -07001269 }
Todd Wang5739dda2014-11-16 22:44:02 -08001270}
1271
1272// lookup returns the invoker and authorizer responsible for serving the given
1273// name and method. The suffix is stripped of any leading slashes. If it begins
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001274// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
Todd Wang5739dda2014-11-16 22:44:02 -08001275// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
1276// value may be modified to match the actual suffix and method to use.
Matt Rosencrantz311378b2015-03-25 15:26:12 -07001277func (fs *flowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
1278 if naming.IsReserved(method) {
Asim Shankar149b4972015-04-23 13:29:58 -07001279 return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
Todd Wang5739dda2014-11-16 22:44:02 -08001280 }
1281 disp := fs.disp
1282 if naming.IsReserved(suffix) {
1283 disp = fs.server.dispReserved
Ali Ghassemibac34032015-04-30 18:27:57 -07001284 } else if fs.server.isLeaf && suffix != "" {
1285 innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
1286 return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
Robin Thellendd24f0842014-09-23 10:27:29 -07001287 }
1288 if disp != nil {
Cosmos Nicolaou5a3125a2015-07-10 11:19:20 -07001289 obj, auth, err := disp.Lookup(fs.ctx, suffix)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001290 switch {
1291 case err != nil:
Todd Wang9548d852015-02-10 16:15:59 -08001292 return nil, nil, err
Todd Wang5739dda2014-11-16 22:44:02 -08001293 case obj != nil:
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001294 invoker, err := objectToInvoker(obj)
1295 if err != nil {
Todd Wang54feabe2015-04-15 23:38:26 -07001296 return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001297 }
1298 return invoker, auth, nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001299 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001300 }
Todd Wang54feabe2015-04-15 23:38:26 -07001301 return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
Todd Wang5739dda2014-11-16 22:44:02 -08001302}
1303
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001304func objectToInvoker(obj interface{}) (rpc.Invoker, error) {
Todd Wang5739dda2014-11-16 22:44:02 -08001305 if obj == nil {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001306 return nil, errors.New("nil object")
Todd Wang5739dda2014-11-16 22:44:02 -08001307 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001308 if invoker, ok := obj.(rpc.Invoker); ok {
Bogdan Caprita9592d9f2015-01-08 22:15:16 -08001309 return invoker, nil
Todd Wang5739dda2014-11-16 22:44:02 -08001310 }
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001311 return rpc.ReflectInvoker(obj)
Todd Wang5739dda2014-11-16 22:44:02 -08001312}
1313
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001314func (fs *flowServer) initSecurity(req *rpc.Request) error {
Ankurb905dae2015-03-04 12:38:20 -08001315 // LocalPrincipal is nil which means we are operating under
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001316 // SecurityNone.
Todd Wang54feabe2015-04-15 23:38:26 -07001317 if fs.LocalPrincipal() == nil {
Ankurb905dae2015-03-04 12:38:20 -08001318 return nil
1319 }
1320
Todd Wang5739dda2014-11-16 22:44:02 -08001321 // If additional credentials are provided, make them available in the context
Todd Wang5739dda2014-11-16 22:44:02 -08001322 // Detect unusable blessings now, rather then discovering they are unusable on
1323 // first use.
1324 //
1325 // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
1326 // the server's identity as the blessing. Figure out what we want to do about
1327 // this - should servers be able to assume that a blessing is something that
1328 // does not have the authorizations that the server's own identity has?
Todd Wang54feabe2015-04-15 23:38:26 -07001329 if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
1330 return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
Todd Wang5739dda2014-11-16 22:44:02 -08001331 }
Asim Shankarb07ec692015-02-27 23:40:44 -08001332 fs.grantedBlessings = req.GrantedBlessings
Ankurb905dae2015-03-04 12:38:20 -08001333
Asim Shankarb07ec692015-02-27 23:40:44 -08001334 var err error
1335 if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001336 // When the server can't access the blessings cache, the client is not following
1337 // protocol, so the server closes the VCs corresponding to the client endpoint.
1338 // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
1339 // of all VCs connected to the RemoteEndpoint.
1340 fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
Todd Wang54feabe2015-04-15 23:38:26 -07001341 return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001342 }
Ankurb905dae2015-03-04 12:38:20 -08001343 // Verify that the blessings sent by the client in the request have the same public
1344 // key as those sent by the client during VC establishment.
1345 if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
Todd Wang54feabe2015-04-15 23:38:26 -07001346 return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
Ankurb905dae2015-03-04 12:38:20 -08001347 }
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001348 fs.ackBlessings = true
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001349
Asim Shankar3ad0b8a2015-02-25 00:37:21 -08001350 for _, d := range req.Discharges {
Asim Shankar08642822015-03-02 21:21:09 -08001351 fs.discharges[d.ID()] = d
Todd Wang5739dda2014-11-16 22:44:02 -08001352 }
1353 return nil
Robin Thellendc26c32e2014-10-06 17:44:04 -07001354}
1355
Todd Wang4264e4b2015-04-16 22:43:40 -07001356func authorize(ctx *context.T, call security.Call, auth security.Authorizer) error {
Matt Rosencrantz9dce9b22015-03-02 10:48:37 -08001357 if call.LocalPrincipal() == nil {
Todd Wang5739dda2014-11-16 22:44:02 -08001358 // LocalPrincipal is nil means that the server wanted to avoid
1359 // authentication, and thus wanted to skip authorization as well.
1360 return nil
1361 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001362 if auth == nil {
Asim Shankare4a8c092015-04-01 18:43:39 -07001363 auth = security.DefaultAuthorizer()
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001364 }
Todd Wang4264e4b2015-04-16 22:43:40 -07001365 if err := auth.Authorize(ctx, call); err != nil {
Matt Rosencrantz250558f2015-03-17 11:37:31 -07001366 return verror.New(verror.ErrNoAccess, ctx, newErrBadAuth(ctx, call.Suffix(), call.Method(), err))
Asim Shankara5457f02014-10-24 23:23:07 -07001367 }
1368 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001369}
1370
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001371// Send implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001372func (fs *flowServer) Send(item interface{}) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -07001373 defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001374 // The empty response header indicates what follows is a streaming result.
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001375 if err := fs.enc.Encode(rpc.Response{}); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001376 return err
1377 }
1378 return fs.enc.Encode(item)
1379}
1380
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001381// Recv implements the rpc.Stream method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001382func (fs *flowServer) Recv(itemptr interface{}) error {
Cosmos Nicolaouf3c19092015-05-27 17:53:37 -07001383 defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
Matt Rosencrantz94502cf2015-03-18 09:43:44 -07001384 var req rpc.Request
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001385 if err := fs.dec.Decode(&req); err != nil {
1386 return err
1387 }
1388 if req.EndStreamArgs {
1389 fs.endStreamArgs = true
1390 return io.EOF
1391 }
1392 return fs.dec.Decode(itemptr)
1393}
1394
Todd Wang54feabe2015-04-15 23:38:26 -07001395// Implementations of rpc.ServerCall and security.Call methods.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001396
Todd Wang54feabe2015-04-15 23:38:26 -07001397func (fs *flowServer) Security() security.Call {
1398 //nologcall
1399 return fs
1400}
Ankuredd74ee2015-03-04 16:38:45 -08001401func (fs *flowServer) LocalDischarges() map[string]security.Discharge {
1402 //nologcall
1403 return fs.flow.LocalDischarges()
1404}
Asim Shankar2519cc12014-11-10 21:16:53 -08001405func (fs *flowServer) RemoteDischarges() map[string]security.Discharge {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001406 //nologcall
1407 return fs.discharges
1408}
Matt Rosencrantz98d6d7c2015-09-04 12:34:08 -07001409func (fs *flowServer) Server() rpc.Server {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001410 //nologcall
1411 return fs.server
1412}
Asim Shankar0cad0832014-11-04 01:27:38 -08001413func (fs *flowServer) Timestamp() time.Time {
1414 //nologcall
1415 return fs.starttime
1416}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001417func (fs *flowServer) Method() string {
1418 //nologcall
1419 return fs.method
1420}
Todd Wangb31da592015-02-20 12:50:39 -08001421func (fs *flowServer) MethodTags() []*vdl.Value {
Asim Shankar0cad0832014-11-04 01:27:38 -08001422 //nologcall
1423 return fs.tags
1424}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001425func (fs *flowServer) Suffix() string {
1426 //nologcall
1427 return fs.suffix
1428}
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001429func (fs *flowServer) LocalPrincipal() security.Principal {
1430 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001431 return fs.flow.LocalPrincipal()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001432}
1433func (fs *flowServer) LocalBlessings() security.Blessings {
1434 //nologcall
Asim Shankar8f05c222014-10-06 22:08:19 -07001435 return fs.flow.LocalBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001436}
1437func (fs *flowServer) RemoteBlessings() security.Blessings {
1438 //nologcall
Asim Shankar2bf7b1e2015-02-27 00:45:12 -08001439 if !fs.clientBlessings.IsZero() {
Suharsh Sivakumar720b7042014-12-22 17:33:23 -08001440 return fs.clientBlessings
1441 }
Asim Shankar8f05c222014-10-06 22:08:19 -07001442 return fs.flow.RemoteBlessings()
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001443}
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001444func (fs *flowServer) GrantedBlessings() security.Blessings {
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001445 //nologcall
Suharsh Sivakumar380bf342015-02-27 15:38:27 -08001446 return fs.grantedBlessings
Mehrdad Afsharicd9852b2014-09-26 11:07:35 -07001447}
1448func (fs *flowServer) LocalEndpoint() naming.Endpoint {
1449 //nologcall
1450 return fs.flow.LocalEndpoint()
1451}
1452func (fs *flowServer) RemoteEndpoint() naming.Endpoint {
1453 //nologcall
1454 return fs.flow.RemoteEndpoint()
1455}
Asim Shankar99b18a72015-04-25 23:19:28 -07001456
1457type proxyAuth struct {
1458 s *server
1459}
1460
1461func (a proxyAuth) RPCStreamListenerOpt() {}
1462
1463func (a proxyAuth) Login(proxy stream.Flow) (security.Blessings, []security.Discharge, error) {
Asim Shankar6b1c8b02015-04-26 22:09:45 -07001464 var (
1465 principal = a.s.principal
1466 dc = a.s.dc
1467 ctx = a.s.ctx
1468 )
1469 if principal == nil {
1470 return security.Blessings{}, nil, nil
1471 }
1472 proxyNames, _ := security.RemoteBlessingNames(ctx, security.NewCall(&security.CallParams{
1473 LocalPrincipal: principal,
1474 RemoteBlessings: proxy.RemoteBlessings(),
1475 RemoteDischarges: proxy.RemoteDischarges(),
1476 RemoteEndpoint: proxy.RemoteEndpoint(),
1477 LocalEndpoint: proxy.LocalEndpoint(),
1478 }))
1479 blessings := principal.BlessingStore().ForPeer(proxyNames...)
1480 tpc := blessings.ThirdPartyCaveats()
1481 if len(tpc) == 0 {
1482 return blessings, nil, nil
1483 }
1484 // Set DischargeImpetus.Server = proxyNames.
1485 // See https://v.io/i/392
1486 discharges := dc.PrepareDischarges(ctx, tpc, security.DischargeImpetus{})
1487 return blessings, discharges, nil
Asim Shankar99b18a72015-04-25 23:19:28 -07001488}
1489
1490var _ manager.ProxyAuthenticator = proxyAuth{}