blob: 8d6e2f86907341fbaabfd827aaf54ea36456f052 [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -07005// Package manager provides an implementation of the Manager interface defined in v.io/x/ref/profiles/internal/rpc/stream.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07006package manager
7
8import (
Jiri Simsa5293dcb2014-05-10 09:56:38 -07009 "errors"
10 "fmt"
11 "net"
12 "strings"
13 "sync"
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080014 "time"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015
Jiri Simsa6ac95222015-02-23 16:11:49 -080016 "v.io/v23/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070017 "v.io/v23/rpc"
Asim Shankar7171a252015-03-07 14:41:40 -080018 "v.io/v23/security"
Jiri Simsa6ac95222015-02-23 16:11:49 -080019 "v.io/v23/verror"
Jiri Simsa337af232015-02-27 14:36:46 -080020 "v.io/x/lib/vlog"
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080021
Jiri Simsaffceefa2015-02-28 11:03:34 -080022 "v.io/x/ref/lib/stats"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080023 inaming "v.io/x/ref/profiles/internal/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070024 "v.io/x/ref/profiles/internal/rpc/stream"
25 "v.io/x/ref/profiles/internal/rpc/stream/crypto"
Jungho Ahncd175b82015-03-27 14:29:40 -070026 "v.io/x/ref/profiles/internal/rpc/stream/vc"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070027 "v.io/x/ref/profiles/internal/rpc/stream/vif"
28 "v.io/x/ref/profiles/internal/rpc/version"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070029)
30
Asim Shankar7171a252015-03-07 14:41:40 -080031var (
32 errShutDown = errors.New("manager has been shut down")
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -070033 errProvidedServerBlessingsWithoutPrincipal = errors.New("blessings provided but no known principal")
Asim Shankar7171a252015-03-07 14:41:40 -080034 errNoBlessingNames = errors.New("stream.ListenerOpts includes a principal but no blessing names could be extracted")
35)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070036
Jungho Ahncd175b82015-03-27 14:29:40 -070037const (
38 defaultIdleTimeout = 30 * time.Minute
39)
40
Jiri Simsa5293dcb2014-05-10 09:56:38 -070041// InternalNew creates a new stream.Manager for managing streams where the local
42// process is identified by the provided RoutingID.
43//
44// As the name suggests, this method is intended for use only within packages
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -070045// placed inside v.io/x/ref/profiles/internal. Code outside the
46// v.io/x/ref/profiles/internal/* packages should never call this method.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070047func InternalNew(rid naming.RoutingID) stream.Manager {
Robin Thellend8cc0aee2014-10-16 10:58:24 -070048 m := &manager{
Jiri Simsa5293dcb2014-05-10 09:56:38 -070049 rid: rid,
50 vifs: vif.NewSet(),
Asim Shankar147c3b62014-08-16 09:28:38 -070051 sessionCache: crypto.NewTLSClientSessionCache(),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070052 listeners: make(map[listener]bool),
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070053 statsName: naming.Join("rpc", "stream", "routing-id", rid.String(), "debug"),
Jiri Simsa5293dcb2014-05-10 09:56:38 -070054 }
Robin Thellend8cc0aee2014-10-16 10:58:24 -070055 stats.NewStringFunc(m.statsName, m.DebugString)
56 return m
Jiri Simsa5293dcb2014-05-10 09:56:38 -070057}
58
59type manager struct {
60 rid naming.RoutingID
61 vifs *vif.Set
62 sessionCache crypto.TLSClientSessionCache
63
64 muListeners sync.Mutex
65 listeners map[listener]bool // GUARDED_BY(muListeners)
66 shutdown bool // GUARDED_BY(muListeners)
Robin Thellend8cc0aee2014-10-16 10:58:24 -070067
68 statsName string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070069}
70
Benjamin Prosnitzfdfbf7b2014-10-08 09:47:21 -070071var _ stream.Manager = (*manager)(nil)
72
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080073type DialTimeout struct{ time.Duration }
74
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070075func (DialTimeout) RPCStreamVCOpt() {}
76func (DialTimeout) RPCClientOpt() {}
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -080077
78func dial(network, address string, timeout time.Duration) (net.Conn, error) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070079 if d, _, _ := rpc.RegisteredProtocol(network); d != nil {
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080080 return d(network, address, timeout)
Asim Shankar13752172014-07-09 11:29:07 -070081 }
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080082 return nil, fmt.Errorf("unknown network %s", network)
Asim Shankar13752172014-07-09 11:29:07 -070083}
84
Jiri Simsa5293dcb2014-05-10 09:56:38 -070085// FindOrDialVIF returns the network connection (VIF) to the provided address
86// from the cache in the manager. If not already present in the cache, a new
87// connection will be created using net.Dial.
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -070088func (m *manager) FindOrDialVIF(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (*vif.VIF, error) {
Jason Hickey96d30e82014-11-13 07:40:00 -080089 // Extract options.
90 var timeout time.Duration
91 for _, o := range opts {
92 switch v := o.(type) {
Jungho Ahncd175b82015-03-27 14:29:40 -070093 case DialTimeout:
Jason Hickey96d30e82014-11-13 07:40:00 -080094 timeout = v.Duration
95 }
96 }
Jason Hickey96d30e82014-11-13 07:40:00 -080097 addr := remote.Addr()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070098 network, address := addr.Network(), addr.String()
99 if vf := m.vifs.Find(network, address); vf != nil {
100 return vf, nil
101 }
102 vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
Cosmos Nicolaou9388ae42014-11-10 10:57:15 -0800103 conn, err := dial(network, address, timeout)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700104 if err != nil {
105 return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
106 }
107 // (network, address) in the endpoint might not always match up
108 // with the key used in the vifs. For example:
109 // - conn, err := net.Dial("tcp", "www.google.com:80")
110 // fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
111 // - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
112 // might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
113 // Thus, look for VIFs with the resolved address as well.
114 if vf := m.vifs.Find(conn.RemoteAddr().Network(), conn.RemoteAddr().String()); vf != nil {
115 vlog.VI(1).Infof("(%q, %q) resolved to (%q, %q) which exists in the VIF cache. Closing newly Dialed connection", network, address, conn.RemoteAddr().Network(), conn.RemoteAddr())
116 conn.Close()
117 return vf, nil
118 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800119 vRange := version.SupportedRange
120 if ep, ok := remote.(*inaming.Endpoint); ok {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700121 epRange := &version.Range{Min: ep.MinRPCVersion, Max: ep.MaxRPCVersion}
Jason Hickey96d30e82014-11-13 07:40:00 -0800122 if r, err := vRange.Intersect(epRange); err == nil {
123 vRange = r
124 }
125 }
Jungho Ahncd175b82015-03-27 14:29:40 -0700126 vf, err := vif.InternalNewDialedVIF(conn, m.rid, principal, vRange, m.deleteVIF, opts...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700127 if err != nil {
128 conn.Close()
129 return nil, fmt.Errorf("failed to create VIF: %v", err)
130 }
131 // TODO(ashankar): If two goroutines are simultaneously invoking
132 // manager.Dial, it is possible that two VIFs are inserted into m.vifs
133 // for the same remote network address. This is normally not a problem,
134 // but can be troublesome if the remote endpoint corresponds to a
135 // proxy, since the proxy requires a single network connection per
136 // routing id. Figure out a way to handle this cleanly. One option is
137 // to have only a single VIF per remote network address - have to think
138 // that through.
139 m.vifs.Insert(vf)
140 return vf, nil
141}
142
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700143func (m *manager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.VC, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700144 // If vif.Dial fails because the cached network connection was broken, remove from
145 // the cache and try once more.
146 for retry := true; true; retry = false {
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700147 vf, err := m.FindOrDialVIF(remote, principal, opts...)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700148 if err != nil {
149 return nil, err
150 }
Jungho Ahncd175b82015-03-27 14:29:40 -0700151 opts = append([]stream.VCOpt{m.sessionCache, vc.IdleTimeout{defaultIdleTimeout}}, opts...)
152 vc, err := vf.Dial(remote, principal, opts...)
Jiri Simsa074bf362015-02-17 09:29:45 -0800153 if !retry || verror.ErrorID(err) != verror.ErrAborted.ID {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700154 return vc, err
155 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800156 vf.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700157 }
Jiri Simsa074bf362015-02-17 09:29:45 -0800158 return nil, verror.NewErrInternal(nil) // Not reached
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700159}
160
Asim Shankar13752172014-07-09 11:29:07 -0700161func listen(protocol, address string) (net.Listener, error) {
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700162 if _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -0800163 return l(protocol, address)
Asim Shankar13752172014-07-09 11:29:07 -0700164 }
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -0800165 return nil, fmt.Errorf("unknown network %s", protocol)
Asim Shankar13752172014-07-09 11:29:07 -0700166}
167
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700168func (m *manager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
169 bNames, err := extractBlessingNames(principal, blessings)
Asim Shankar7171a252015-03-07 14:41:40 -0800170 if err != nil {
171 return nil, nil, err
172 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700173 ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...)
Asim Shankar7171a252015-03-07 14:41:40 -0800174 if err != nil {
175 return nil, nil, err
176 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700177 ep.Blessings = bNames
Asim Shankar7171a252015-03-07 14:41:40 -0800178 return ln, ep, nil
179}
180
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700181func (m *manager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, *inaming.Endpoint, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700182 m.muListeners.Lock()
183 if m.shutdown {
184 m.muListeners.Unlock()
185 return nil, nil, errShutDown
186 }
187 m.muListeners.Unlock()
188
189 if protocol == inaming.Network {
190 // Act as if listening on the address of a remote proxy.
191 ep, err := inaming.NewEndpoint(address)
192 if err != nil {
193 return nil, nil, fmt.Errorf("failed to parse endpoint %q: %v", address, err)
194 }
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700195 return m.remoteListen(ep, principal, opts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700196 }
Asim Shankar13752172014-07-09 11:29:07 -0700197 netln, err := listen(protocol, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700198 if err != nil {
199 return nil, nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", protocol, address, err)
200 }
201
202 m.muListeners.Lock()
203 if m.shutdown {
204 m.muListeners.Unlock()
205 closeNetListener(netln)
206 return nil, nil, errShutDown
207 }
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800208
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700209 ln := newNetListener(m, netln, principal, blessings, opts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700210 m.listeners[ln] = true
211 m.muListeners.Unlock()
Asim Shankar7171a252015-03-07 14:41:40 -0800212 return ln, version.Endpoint(protocol, netln.Addr().String(), m.rid), nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700213}
214
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700215func (m *manager) remoteListen(proxy naming.Endpoint, principal security.Principal, listenerOpts []stream.ListenerOpt) (stream.Listener, *inaming.Endpoint, error) {
216 ln, ep, err := newProxyListener(m, proxy, principal, listenerOpts)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700217 if err != nil {
218 return nil, nil, err
219 }
220 m.muListeners.Lock()
221 defer m.muListeners.Unlock()
222 if m.shutdown {
223 ln.Close()
224 return nil, nil, errShutDown
225 }
226 m.listeners[ln] = true
227 return ln, ep, nil
228}
229
Jungho Ahncd175b82015-03-27 14:29:40 -0700230func (m *manager) deleteVIF(vf *vif.VIF) {
231 vlog.VI(2).Infof("%p: VIF %v is closed, removing from cache", m, vf)
232 m.vifs.Delete(vf)
233}
234
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700235func (m *manager) ShutdownEndpoint(remote naming.Endpoint) {
236 vifs := m.vifs.List()
237 total := 0
238 for _, vf := range vifs {
239 total += vf.ShutdownVCs(remote)
240 }
241 vlog.VI(1).Infof("ShutdownEndpoint(%q) closed %d VCs", remote, total)
242}
243
244func closeNetListener(ln net.Listener) {
245 addr := ln.Addr()
246 err := ln.Close()
247 vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
248}
249
250func (m *manager) removeListener(ln listener) {
251 m.muListeners.Lock()
252 delete(m.listeners, ln)
253 m.muListeners.Unlock()
254}
255
256func (m *manager) Shutdown() {
Robin Thellend8cc0aee2014-10-16 10:58:24 -0700257 stats.Delete(m.statsName)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700258 m.muListeners.Lock()
259 if m.shutdown {
260 m.muListeners.Unlock()
261 return
262 }
263 m.shutdown = true
264 var wg sync.WaitGroup
265 wg.Add(len(m.listeners))
266 for ln, _ := range m.listeners {
267 go func(ln stream.Listener) {
268 ln.Close()
269 wg.Done()
270 }(ln)
271 }
272 m.listeners = make(map[listener]bool)
273 m.muListeners.Unlock()
274 wg.Wait()
275
276 vifs := m.vifs.List()
277 for _, vf := range vifs {
278 vf.Close()
279 }
280}
281
Robin Thellend8eb77522014-08-28 14:12:01 -0700282func (m *manager) RoutingID() naming.RoutingID {
283 return m.rid
284}
285
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700286func (m *manager) DebugString() string {
287 vifs := m.vifs.List()
288
289 m.muListeners.Lock()
290 defer m.muListeners.Unlock()
291
292 l := make([]string, 0)
293 l = append(l, fmt.Sprintf("Manager: RoutingID:%v #VIFs:%d #Listeners:%d Shutdown:%t", m.rid, len(vifs), len(m.listeners), m.shutdown))
294 if len(vifs) > 0 {
295 l = append(l, "============================VIFs================================================")
296 for ix, vif := range vifs {
297 l = append(l, fmt.Sprintf("%4d) %v", ix, vif.DebugString()))
298 l = append(l, "--------------------------------------------------------------------------------")
299 }
300 }
301 if len(m.listeners) > 0 {
302 l = append(l, "=======================================Listeners==================================================")
303 l = append(l, " (stream listeners, their local network listeners (missing for proxied listeners), and VIFS")
304 for ln, _ := range m.listeners {
305 l = append(l, ln.DebugString())
306 }
307 }
308 return strings.Join(l, "\n")
309}
Asim Shankar7171a252015-03-07 14:41:40 -0800310
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700311func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
Asim Shankar7171a252015-03-07 14:41:40 -0800312 if !b.IsZero() && p == nil {
313 return nil, errProvidedServerBlessingsWithoutPrincipal
314 }
315 if p == nil {
316 return nil, nil
317 }
Asim Shankar7171a252015-03-07 14:41:40 -0800318 var ret []string
319 for b, _ := range p.BlessingsInfo(b) {
320 ret = append(ret, b)
321 }
322 if len(ret) == 0 {
323 return nil, errNoBlessingNames
324 }
325 return ret, nil
326}