blob: 5849a5ca6e99f36d286d7cf78741515243b6fa29 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"fmt"
"net"
"strings"
"sync"
"syscall"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
iflow "v.io/x/ref/runtime/internal/flow"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/version"
)
const (
reconnectDelay = 50 * time.Millisecond
reapCacheInterval = 5 * time.Minute
)
type manager struct {
rid naming.RoutingID
closed chan struct{}
q *upcqueue.T
cache *ConnCache
mu *sync.Mutex
listenEndpoints []naming.Endpoint
listeners []flow.Listener
wg sync.WaitGroup
}
func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
m := &manager{
rid: rid,
closed: make(chan struct{}),
q: upcqueue.New(),
cache: NewConnCache(),
mu: &sync.Mutex{},
listeners: []flow.Listener{},
}
go func() {
ticker := time.NewTicker(reapCacheInterval)
for {
select {
case <-ctx.Done():
m.mu.Lock()
listeners := m.listeners
m.listeners = nil
m.mu.Unlock()
for _, ln := range listeners {
ln.Close()
}
m.cache.Close(ctx)
m.q.Close()
m.wg.Wait()
ticker.Stop()
close(m.closed)
return
case <-ticker.C:
// Periodically kill closed connections.
m.cache.KillConnections(ctx, 0)
}
}
}()
return m
}
// Listen causes the Manager to accept flows from the provided protocol and address.
// Listen may be called muliple times.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
if err := m.validateContext(ctx); err != nil {
return err
}
ln, err := listen(ctx, protocol, address)
if err != nil {
return flow.NewErrNetwork(ctx, err)
}
local := &inaming.Endpoint{
Protocol: protocol,
Address: ln.Addr().String(),
RID: m.rid,
}
m.mu.Lock()
if m.listeners == nil {
return flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
}
m.listeners = append(m.listeners, ln)
m.mu.Unlock()
m.wg.Add(1)
go m.lnAcceptLoop(ctx, ln, local)
m.mu.Lock()
m.listenEndpoints = append(m.listenEndpoints, local)
m.mu.Unlock()
return nil
}
// ProxyListen causes the Manager to accept flows from the specified endpoint.
// The endpoint must correspond to a vanadium proxy.
//
// update gets passed the complete set of endpoints for the proxy every time it
// is called.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) error {
if err := m.validateContext(ctx); err != nil {
return err
}
m.wg.Add(1)
go m.connectToProxy(ctx, ep, update)
return nil
}
func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) {
defer m.wg.Done()
var eps []naming.Endpoint
for delay := reconnectDelay; ; delay *= 2 {
time.Sleep(delay - reconnectDelay)
select {
case <-ctx.Done():
return
default:
}
f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
if err != nil {
ctx.Error(err)
continue
}
w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
if err != nil {
ctx.Error(err)
continue
}
if _, err = f.WriteMsg(w); err != nil {
ctx.Error(err)
continue
}
eps, err = m.readProxyResponse(ctx, f)
if err != nil {
ctx.Error(err)
continue
}
update(eps)
select {
case <-ctx.Done():
return
case <-f.Closed():
update(nil)
delay = reconnectDelay
}
}
}
func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
b, err := f.ReadMsg()
if err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
msg, err := message.Read(ctx, b)
if err != nil {
return nil, flow.NewErrBadArg(ctx, err)
}
res, ok := msg.(*message.ProxyResponse)
if !ok {
return nil, flow.NewErrBadArg(ctx, NewErrInvalidProxyResponse(ctx, fmt.Sprintf("%t", res)))
}
return res.Endpoints, nil
}
type proxyBlessingsForPeer struct{}
// TODO(suharshs): Figure out what blessings to present here. And present discharges.
func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint) {
defer m.wg.Done()
const killConnectionsRetryDelay = 5 * time.Millisecond
for {
flowConn, err := ln.Accept(ctx)
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
if err := m.cache.KillConnections(ctx, tokill); err != nil {
ctx.VI(2).Infof("failed to kill connections: %v", err)
continue
}
} else {
tokill = 1
}
time.Sleep(killConnectionsRetryDelay)
flowConn, err = ln.Accept(ctx)
}
if err != nil {
ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
return
}
cached := make(chan struct{})
c, err := conn.NewAccepted(
ctx,
flowConn,
local,
version.Supported,
&flowHandler{q: m.q, cached: cached},
)
if err != nil {
close(cached)
flowConn.Close()
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
if err := m.cache.InsertWithRoutingID(c); err != nil {
close(cached)
ctx.Errorf("failed to cache conn %v: %v", c, err)
}
close(cached)
}
}
type flowHandler struct {
q *upcqueue.T
cached chan struct{}
}
func (h *flowHandler) HandleFlow(f flow.Flow) error {
if h.cached != nil {
<-h.cached
}
return h.q.Put(f)
}
type proxyFlowHandler struct {
ctx *context.T
m *manager
}
func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error {
go func() {
c, err := conn.NewAccepted(
h.ctx,
f,
f.Conn().LocalEndpoint(),
version.Supported,
&flowHandler{q: h.m.q})
if err != nil {
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
if err := h.m.cache.InsertWithRoutingID(c); err != nil {
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
}()
return nil
}
// ListeningEndpoints returns the endpoints that the Manager has explicitly
// called Listen on. The Manager will accept new flows on these endpoints.
// Proxied endpoints are not returned.
// If the Manager is not listening on any endpoints, an endpoint with the
// Manager's RoutingID will be returned for use in bidirectional RPC.
// Returned endpoints all have the Manager's unique RoutingID.
func (m *manager) ListeningEndpoints() []naming.Endpoint {
m.mu.Lock()
ret := make([]naming.Endpoint, len(m.listenEndpoints))
copy(ret, m.listenEndpoints)
m.mu.Unlock()
if len(ret) == 0 {
ret = append(ret, &inaming.Endpoint{RID: m.rid})
}
return ret
}
// Accept blocks until a new Flow has been initiated by a remote process.
// Flows are accepted from addresses that the Manager is listening on,
// including outgoing dialed connections.
//
// For example:
// err := m.Listen(ctx, "tcp", ":0")
// for {
// flow, err := m.Accept(ctx)
// // process flow
// }
//
// can be used to accept Flows initiated by remote processes.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
if err := m.validateContext(ctx); err != nil {
return nil, err
}
item, err := m.q.Get(ctx.Done())
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
case err != nil:
return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
default:
return item.(flow.Flow), nil
}
}
// Dial creates a Flow to the provided remote endpoint, using 'fn' to
// determine the blessings that will be sent to the remote end.
//
// To maximize re-use of connections, the Manager will also Listen on Dialed
// connections for the lifetime of the connection.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
if err := m.validateContext(ctx); err != nil {
return nil, err
}
var fh conn.FlowHandler
if m.rid != naming.NullRoutingID {
fh = &flowHandler{q: m.q}
}
return m.internalDial(ctx, remote, fn, fh)
}
func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
// Disallow making connections to ourselves.
// TODO(suharshs): Figure out the right thing to do here. We could create a "localflow"
// that bypasses auth and is added to the accept queue immediately.
if remote.RoutingID() == m.rid {
return nil, flow.NewErrBadArg(ctx, NewErrManagerDialingSelf(ctx))
}
// Look up the connection based on RoutingID first.
c, err := m.cache.FindWithRoutingID(remote.RoutingID())
if err != nil {
return nil, flow.NewErrBadState(ctx, err)
}
var (
protocol flow.Protocol
network, address string
)
if c == nil {
addr := remote.Addr()
protocol, _ = flow.RegisteredProtocol(addr.Network())
// (network, address) in the endpoint might not always match up
// with the key used for caching conns. For example:
// - conn, err := net.Dial("tcp", "www.google.com:80")
// fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
// might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
// Thus we look for Conns with the resolved address.
network, address, err = resolve(ctx, protocol, addr.Network(), addr.String())
if err != nil {
return nil, flow.NewErrResolveFailed(ctx, err)
}
c, err = m.cache.ReservedFind(network, address, remote.BlessingNames())
if err != nil {
return nil, flow.NewErrBadState(ctx, err)
}
defer m.cache.Unreserve(network, address, remote.BlessingNames())
}
if c == nil {
flowConn, err := dial(ctx, protocol, network, address)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
// TODO(mattr): We should only pass a flowHandler to NewDialed if there
// is a server attached to this flow manager. Perhaps we can signal
// "serving flow manager" by passing a 0 RID to non-serving flow managers?
c, err = conn.NewDialed(
ctx,
flowConn,
localEndpoint(flowConn, m.rid),
remote,
version.Supported,
fh,
)
if err != nil {
flowConn.Close()
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
return nil, err
}
return nil, flow.NewErrDialFailed(ctx, err)
}
if err := m.cache.Insert(c); err != nil {
return nil, flow.NewErrBadState(ctx, err)
}
}
f, err := c.Dial(ctx, fn)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
// return a flow on that corresponding conn.
if proxyConn := c; remote.RoutingID() != proxyConn.RemoteEndpoint().RoutingID() {
c, err = conn.NewDialed(
ctx,
f,
proxyConn.LocalEndpoint(),
remote,
version.Supported,
fh,
)
if err != nil {
proxyConn.Close(ctx, err)
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
return nil, err
}
return nil, flow.NewErrDialFailed(ctx, err)
}
if err := m.cache.InsertWithRoutingID(c); err != nil {
return nil, flow.NewErrBadState(ctx, err)
}
f, err = c.Dial(ctx, fn)
if err != nil {
proxyConn.Close(ctx, err)
return nil, flow.NewErrDialFailed(ctx, err)
}
}
return f, nil
}
// RoutingID returns the naming.Routing of the flow.Manager.
func (m *manager) RoutingID() naming.RoutingID {
return m.rid
}
// Closed returns a channel that remains open for the lifetime of the Manager
// object. Once the channel is closed any operations on the Manager will
// necessarily fail.
func (m *manager) Closed() <-chan struct{} {
return m.closed
}
func (m *manager) validateContext(ctx *context.T) error {
if v23.ExperimentalGetFlowManager(ctx) != m {
return flow.NewErrBadArg(ctx, iflow.NewErrWrongObjectInContext(ctx, "manager"))
}
return nil
}
func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
if p != nil {
var timeout time.Duration
if dl, ok := ctx.Deadline(); ok {
timeout = dl.Sub(time.Now())
}
return p.Dial(ctx, protocol, address, timeout)
}
return nil, NewErrUnknownProtocol(ctx, protocol)
}
func resolve(ctx *context.T, p flow.Protocol, protocol, address string) (string, string, error) {
if p != nil {
net, addr, err := p.Resolve(ctx, protocol, address)
if err != nil {
return "", "", err
}
return net, addr, nil
}
return "", "", NewErrUnknownProtocol(ctx, protocol)
}
func listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
if p, _ := flow.RegisteredProtocol(protocol); p != nil {
ln, err := p.Listen(ctx, protocol, address)
if err != nil {
return nil, err
}
return ln, nil
}
return nil, NewErrUnknownProtocol(ctx, protocol)
}
func localEndpoint(conn flow.Conn, rid naming.RoutingID) naming.Endpoint {
localAddr := conn.LocalAddr()
ep := &inaming.Endpoint{
Protocol: localAddr.Network(),
Address: localAddr.String(),
RID: rid,
}
return ep
}
func isTemporaryError(err error) bool {
oErr, ok := err.(*net.OpError)
return ok && oErr.Temporary()
}
func isTooManyOpenFiles(err error) bool {
oErr, ok := err.(*net.OpError)
return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
}