blob: 6ef54534856188940a9d7faa9d8475a00c517990 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"syscall"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/proxy"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/runtime/internal/rpc/stream/vif"
)
// ProxyAuthenticator is a stream.ListenerOpt that is used when listening via a
// proxy to authenticate with the proxy.
type ProxyAuthenticator interface {
stream.ListenerOpt
// Login returns the Blessings (and the set of Discharges to make them
// valid) to send to the proxy. Typically, the proxy uses these to
// determine whether it wants to authorize use.
Login(proxy stream.Flow) (security.Blessings, []security.Discharge, error)
}
func reg(id, msg string) verror.IDAction {
return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
}
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
// strings to avoid repeating these n-times in the final error
// message visible to the user.
errVomEncodeRequest = reg(".errVomEncodeRequest", "failed to encode request to proxy{:3}")
errVomDecodeResponse = reg(".errVomDecodeRequest", "failed to decoded response from proxy{:3}")
errProxyError = reg(".errProxyError", "proxy error {:3}")
errProxyEndpointError = reg(".errProxyEndpointError", "proxy returned an invalid endpoint {:3}{:4}")
errAlreadyConnected = reg(".errAlreadyConnected", "already connected to proxy and accepting connections? VIF: {3}, StartAccepting{:_}")
errFailedToCreateLivenessFlow = reg(".errFailedToCreateLivenessFlow", "unable to create liveness check flow to proxy{:3}")
errAcceptFailed = reg(".errAcceptFailed", "accept failed{:3}")
errFailedToEstablishVC = reg(".errFailedToEstablishVC", "VC establishment with proxy failed{:_}")
errListenerAlreadyClosed = reg(".errListenerAlreadyClosed", "listener already closed")
errRefusedProxyLogin = reg(".errRefusedProxyLogin", "server did not want to listen via proxy{:_}")
)
// listener extends stream.Listener with a DebugString method.
type listener interface {
stream.Listener
DebugString() string
}
// netListener implements the listener interface by accepting flows (and VCs)
// over network connections accepted on an underlying net.Listener.
type netListener struct {
q *upcqueue.T
netLn net.Listener
manager *manager
ctx *context.T
connsMu sync.Mutex
conns map[net.Conn]bool
vifs *vif.Set
netLoop sync.WaitGroup
vifLoops sync.WaitGroup
}
var _ stream.Listener = (*netListener)(nil)
// proxyListener implements the listener interface by connecting to a remote
// proxy (typically used to "listen" across network domains).
type proxyListener struct {
q *upcqueue.T
proxyEP naming.Endpoint
manager *manager
vif *vif.VIF
ctx *context.T
vifLoop sync.WaitGroup
}
var _ stream.Listener = (*proxyListener)(nil)
func newNetListener(ctx *context.T, m *manager, netLn net.Listener, blessings security.Blessings, opts []stream.ListenerOpt) listener {
ln := &netListener{
q: upcqueue.New(),
manager: m,
netLn: netLn,
vifs: vif.NewSet(),
conns: make(map[net.Conn]bool),
ctx: ctx,
}
// Set the default idle timeout for VC. But for "unixfd", we do not set
// the idle timeout since we cannot reconnect it.
if ln.netLn.Addr().Network() != "unixfd" {
opts = append([]stream.ListenerOpt{vc.IdleTimeout{Duration: defaultIdleTimeout}}, opts...)
}
ln.netLoop.Add(1)
go ln.netAcceptLoop(blessings, opts)
return ln
}
func isTemporaryError(err error) bool {
if oErr, ok := err.(*net.OpError); ok && oErr.Temporary() {
return true
}
return false
}
func isTooManyOpenFiles(err error) bool {
if oErr, ok := err.(*net.OpError); ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error()) {
return true
}
return false
}
func (ln *netListener) killConnections(n int) {
ln.connsMu.Lock()
if n > len(ln.conns) {
n = len(ln.conns)
}
remaining := make([]net.Conn, 0, len(ln.conns))
for c := range ln.conns {
remaining = append(remaining, c)
}
removed := remaining[:n]
ln.connsMu.Unlock()
ln.ctx.Infof("Killing %d Conns", n)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
idx := rand.Intn(len(remaining))
conn := remaining[idx]
go func(conn net.Conn) {
ln.ctx.Infof("Killing connection (%s, %s)", conn.LocalAddr(), conn.RemoteAddr())
conn.Close()
ln.manager.killedConns.Incr(1)
wg.Done()
}(conn)
remaining[idx], remaining[0] = remaining[0], remaining[idx]
remaining = remaining[1:]
}
ln.connsMu.Lock()
for _, conn := range removed {
delete(ln.conns, conn)
}
ln.connsMu.Unlock()
wg.Wait()
}
func (ln *netListener) netAcceptLoop(blessings security.Blessings, opts []stream.ListenerOpt) {
defer ln.netLoop.Done()
opts = append([]stream.ListenerOpt{vc.StartTimeout{Duration: defaultStartTimeout}}, opts...)
for {
conn, err := ln.netLn.Accept()
if isTemporaryError(err) {
// Use Info instead of Error to reduce the changes that
// the log library will cause the process to abort on
// failing to create a new file.
ln.ctx.Infof("net.Listener.Accept() failed on %v with %v", ln.netLn, err)
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
ln.killConnections(tokill)
} else {
tokill = 1
}
time.Sleep(10 * time.Millisecond)
conn, err = ln.netLn.Accept()
}
}
if err != nil {
// TODO(cnicolaou): closeListener in manager.go writes to ln (by calling
// ln.Close()) and we read it here in the Infof output, so there is
// an unguarded read here that will fail under --race. This will only show
// itself if the Infof below is changed to always be printed (which is
// how I noticed). The right solution is to lock these datastructures, but
// that can wait until a bigger overhaul occurs. For now, we leave this at
// VI(1) knowing that it's basically harmless.
ln.ctx.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
return
}
ln.connsMu.Lock()
ln.conns[conn] = true
ln.connsMu.Unlock()
ln.ctx.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
ln.vifLoops.Add(1)
go func() {
vf, err := vif.InternalNewAcceptedVIF(ln.ctx, conn, ln.manager.rid, blessings, nil, ln.deleteVIF, opts...)
if err != nil {
ln.ctx.Infof("Shutting down conn from %s (local address: %s) as a VIF could not be created: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
conn.Close()
ln.vifLoops.Done()
return
}
ln.connsMu.Lock()
if ln.vifs == nil {
ln.connsMu.Unlock()
vf.Close()
ln.vifLoops.Done()
return
}
ln.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
ln.connsMu.Unlock()
ln.manager.vifs.Insert(vf, conn.RemoteAddr().Network(), conn.RemoteAddr().String())
vifLoop(ln.ctx, vf, ln.q, func() {
ln.connsMu.Lock()
delete(ln.conns, conn)
ln.connsMu.Unlock()
ln.vifLoops.Done()
})
}()
}
}
func (ln *netListener) deleteVIF(vf *vif.VIF) {
ln.ctx.VI(2).Infof("VIF %v is closed, removing from cache", vf)
ln.connsMu.Lock()
if ln.vifs != nil {
ln.vifs.Delete(vf)
}
ln.connsMu.Unlock()
ln.manager.vifs.Delete(vf)
}
func (ln *netListener) Accept() (stream.Flow, error) {
item, err := ln.q.Get(nil)
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
case err != nil:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
default:
return item.(vif.ConnectorAndFlow).Flow, nil
}
}
func (ln *netListener) Close() error {
closeNetListener(ln.ctx, ln.netLn)
ln.netLoop.Wait()
ln.connsMu.Lock()
var vfs []*vif.VIF
if ln.vifs != nil {
vfs, ln.vifs = ln.vifs.List(), nil
}
ln.connsMu.Unlock()
for _, vf := range vfs {
// NOTE(caprita): We do not actually Close down the vifs, as
// that would require knowing when all outstanding requests are
// finished. For now, do not worry about it, since we expect
// shut down to immediately precede process exit.
//v23.Logger().Infof("Close: stop accepting: %p", vif)
vf.StopAccepting()
}
ln.q.Shutdown()
ln.manager.removeListener(ln)
ln.vifLoops.Wait()
return nil
}
func (ln *netListener) String() string {
return fmt.Sprintf("%T: (%v, %v)", ln, ln.netLn.Addr().Network(), ln.netLn.Addr())
}
func (ln *netListener) DebugString() string {
ret := []string{
fmt.Sprintf("stream.Listener: net.Listener on (%q, %q)", ln.netLn.Addr().Network(), ln.netLn.Addr()),
}
ln.connsMu.Lock()
var vifs []*vif.VIF
if ln.vifs != nil {
vifs = ln.vifs.List()
}
ln.connsMu.Unlock()
if len(vifs) > 0 {
ret = append(ret, fmt.Sprintf("===Accepted VIFs(%d)===", len(vifs)))
for ix, vif := range vifs {
ret = append(ret, fmt.Sprintf("%4d) %v", ix, vif))
}
}
return strings.Join(ret, "\n")
}
func newProxyListener(ctx *context.T, m *manager, proxyEP naming.Endpoint, opts []stream.ListenerOpt) (listener, *inaming.Endpoint, error) {
ln := &proxyListener{
q: upcqueue.New(),
proxyEP: proxyEP,
manager: m,
ctx: ctx,
}
vf, ep, err := ln.connect(opts)
if err != nil {
return nil, nil, err
}
ln.vif = vf
ln.vifLoop.Add(1)
go vifLoop(ctx, ln.vif, ln.q, func() {
ln.vifLoop.Done()
})
return ln, ep, nil
}
func (ln *proxyListener) connect(opts []stream.ListenerOpt) (*vif.VIF, *inaming.Endpoint, error) {
ln.ctx.VI(1).Infof("Connecting to proxy at %v", ln.proxyEP)
// Requires dialing a VC to the proxy, need to extract options from ln.opts to do so.
var dialOpts []stream.VCOpt
var auth ProxyAuthenticator
for _, opt := range opts {
switch v := opt.(type) {
case stream.VCOpt:
dialOpts = append(dialOpts, v)
case ProxyAuthenticator:
auth = v
}
}
// TODO(cnicolaou, ashankar): probably want to set a timeout here. (is
// this covered by opts?)
// TODO(ashankar): Authorize the proxy server as well (similar to how
// clients authorize servers in RPCs).
vf, err := ln.manager.FindOrDialVIF(ln.ctx, ln.proxyEP, dialOpts...)
if err != nil {
return nil, nil, err
}
// Prepend the default idle timeout for VC.
opts = append([]stream.ListenerOpt{vc.IdleTimeout{Duration: defaultIdleTimeout}}, opts...)
if err := vf.StartAccepting(opts...); err != nil {
return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errAlreadyConnected, nil, vf, err))
}
// Proxy protocol: See v.io/x/ref/runtime/internal/rpc/stream/proxy/protocol.vdl
//
// We don't need idle timeout for this VC, since one flow will be kept alive.
vc, err := vf.Dial(ln.ctx, ln.proxyEP, dialOpts...)
if err != nil {
vf.StopAccepting()
if verror.ErrorID(err) == verror.ErrAborted.ID {
ln.manager.vifs.Delete(vf)
return nil, nil, verror.New(stream.ErrAborted, nil, err)
}
return nil, nil, err
}
flow, err := vc.Connect()
if err != nil {
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errFailedToCreateLivenessFlow, nil, err))
}
var request proxy.Request
var response proxy.Response
if auth != nil {
if request.Blessings, request.Discharges, err = auth.Login(flow); err != nil {
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrSecurity, nil, verror.New(errRefusedProxyLogin, nil, err))
}
}
enc := vom.NewEncoder(flow)
if err := enc.Encode(request); err != nil {
flow.Close()
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeRequest, nil, err))
}
dec := vom.NewDecoder(flow)
if err := dec.Decode(&response); err != nil {
flow.Close()
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrNetwork, nil, verror.New(errVomDecodeResponse, nil, err))
}
if response.Error != nil {
flow.Close()
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrProxy, nil, response.Error)
}
ep, err := inaming.NewEndpoint(response.Endpoint)
if err != nil {
flow.Close()
vf.StopAccepting()
return nil, nil, verror.New(stream.ErrProxy, nil, verror.New(errProxyEndpointError, nil, response.Endpoint, err))
}
go func(vf *vif.VIF, flow stream.Flow, q *upcqueue.T) {
<-flow.Closed()
vf.StopAccepting()
q.Close()
}(vf, flow, ln.q)
return vf, ep, nil
}
func (ln *proxyListener) Accept() (stream.Flow, error) {
item, err := ln.q.Get(nil)
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
case err != nil:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
default:
return item.(vif.ConnectorAndFlow).Flow, nil
}
}
func (ln *proxyListener) Close() error {
ln.vif.StopAccepting()
ln.q.Shutdown()
ln.manager.removeListener(ln)
ln.vifLoop.Wait()
ln.ctx.VI(3).Infof("Closed stream.Listener %s", ln)
return nil
}
func (ln *proxyListener) String() string {
return ln.DebugString()
}
func (ln *proxyListener) DebugString() string {
return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
}
func vifLoop(ctx *context.T, vf *vif.VIF, q *upcqueue.T, cleanup func()) {
defer cleanup()
for {
cAndf, err := vf.Accept()
switch {
case err != nil:
ctx.VI(2).Infof("Shutting down listener on VIF %v: %v", vf, err)
return
case cAndf.Flow == nil:
ctx.VI(1).Infof("New VC %v on VIF %v", cAndf.Connector, vf)
default:
if err := q.Put(cAndf); err != nil {
ctx.VI(1).Infof("Closing new flow on VC %v (VIF %v) as Put failed in vifLoop: %v", cAndf.Connector, vf, err)
cAndf.Flow.Close()
}
}
}
}