blob: e6d9360e8901c74a915cba1676b190ef51ba72d9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"net"
"strings"
"sync"
"syscall"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/version"
)
type manager struct {
rid naming.RoutingID
closed <-chan struct{}
q *upcqueue.T
mu *sync.Mutex
listenEndpoints []naming.Endpoint
}
func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
m := &manager{
rid: rid,
closed: ctx.Done(),
mu: &sync.Mutex{},
q: upcqueue.New(),
}
return m
}
// Listen causes the Manager to accept flows from the provided protocol and address.
// Listen may be called muliple times.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
netLn, err := listen(ctx, protocol, address)
if err != nil {
return flow.NewErrNetwork(ctx, err)
}
local := &inaming.Endpoint{
Protocol: protocol,
Address: netLn.Addr().String(),
RID: m.rid,
}
m.mu.Lock()
m.listenEndpoints = append(m.listenEndpoints, local)
m.mu.Unlock()
go m.netLnAcceptLoop(ctx, netLn, local)
return nil
}
func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
const killConnectionsRetryDelay = 5 * time.Millisecond
for {
netConn, err := netLn.Accept()
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
// TODO(suharshs): Find a way to kill connections here. We will need
// caching to be able to delete the connections.
} else {
tokill = 1
}
time.Sleep(killConnectionsRetryDelay)
netConn, err = netLn.Accept()
}
if err != nil {
ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
}
// TODO(suharshs): This conn needs to be cached instead of ignored.
_, err = conn.NewAccepted(
ctx,
&framer{ReadWriter: netConn},
local,
v23.GetPrincipal(ctx).BlessingStore().Default(),
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
)
if err != nil {
netConn.Close()
ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
}
}
}
type flowHandler struct {
q *upcqueue.T
closed <-chan struct{}
}
func (h *flowHandler) HandleFlow(f flow.Flow) error {
select {
case <-h.closed:
// This will make the Put call below return a upcqueue.ErrQueueIsClosed.
h.q.Close()
default:
}
return h.q.Put(f)
}
// ListeningEndpoints returns the endpoints that the Manager has explicitly
// listened on. The Manager will accept new flows on these endpoints.
// Returned endpoints all have a RoutingID unique to the Acceptor.
func (m *manager) ListeningEndpoints() []naming.Endpoint {
m.mu.Lock()
ret := make([]naming.Endpoint, len(m.listenEndpoints))
copy(ret, m.listenEndpoints)
m.mu.Unlock()
return ret
}
// Accept blocks until a new Flow has been initiated by a remote process.
// Flows are accepted from addresses that the Manager is listening on,
// including outgoing dialed connections.
//
// For example:
// err := m.Listen(ctx, "tcp", ":0")
// for {
// flow, err := m.Accept(ctx)
// // process flow
// }
//
// can be used to accept Flows initiated by remote processes.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
// TODO(suharshs): Ensure that m is attached to ctx.
item, err := m.q.Get(m.closed)
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
case err != nil:
return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
default:
return item.(flow.Flow), nil
}
}
// Dial creates a Flow to the provided remote endpoint, using 'fn' to
// determine the blessings that will be sent to the remote end.
//
// To maximize re-use of connections, the Manager will also Listen on Dialed
// connections for the lifetime of the connection.
//
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
// TODO(suharshs): Add caching of connections.
addr := remote.Addr()
d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
netConn, err := dial(ctx, d, addr.Network(), addr.String())
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
c, err := conn.NewDialed(
ctx,
&framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
localEndpoint(netConn, m.rid),
remote,
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
fn,
)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
return c.Dial(ctx)
}
// Closed returns a channel that remains open for the lifetime of the Manager
// object. Once the channel is closed any operations on the Manager will
// necessarily fail.
func (m *manager) Closed() <-chan struct{} {
return m.closed
}
func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
if d != nil {
var timeout time.Duration
if dl, ok := ctx.Deadline(); ok {
timeout = dl.Sub(time.Now())
}
return d(protocol, address, timeout)
}
return nil, NewErrUnknownProtocol(ctx, protocol)
}
func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
if r != nil {
net, addr, err := r(protocol, address)
if err != nil {
return "", "", err
}
return net, addr, nil
}
return "", "", NewErrUnknownProtocol(ctx, protocol)
}
func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
ln, err := l(protocol, address)
if err != nil {
return nil, err
}
return ln, nil
}
return nil, NewErrUnknownProtocol(ctx, protocol)
}
func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
localAddr := conn.LocalAddr()
ep := &inaming.Endpoint{
Protocol: localAddr.Network(),
Address: localAddr.String(),
RID: rid,
}
return ep
}
func isTemporaryError(err error) bool {
oErr, ok := err.(*net.OpError)
return ok && oErr.Temporary()
}
func isTooManyOpenFiles(err error) bool {
oErr, ok := err.(*net.OpError)
return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
}