// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package manager

import (
	"fmt"
	"net"
	"strings"
	"sync"
	"syscall"
	"time"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/flow"
	"v.io/v23/flow/message"
	"v.io/v23/naming"
	"v.io/v23/security"
	"v.io/v23/verror"

	"v.io/x/lib/netconfig"
	"v.io/x/lib/netstate"
	"v.io/x/ref/lib/pubsub"
	slib "v.io/x/ref/lib/security"
	"v.io/x/ref/lib/stats"
	iflow "v.io/x/ref/runtime/internal/flow"
	"v.io/x/ref/runtime/internal/flow/conn"
	"v.io/x/ref/runtime/internal/lib/upcqueue"
	"v.io/x/ref/runtime/internal/rpc/version"
	"v.io/x/ref/runtime/protocols/bidi"
)

const (
	reapCacheInterval = 5 * time.Minute
	minCacheInterval  = 10 * time.Millisecond // the minimum time we are willing to poll the cache for idle or closed connections.
	handshakeTimeout  = time.Minute
)

type manager struct {
	rid                  naming.RoutingID
	closed               chan struct{}
	cache                *ConnCache
	ls                   *listenState
	ctx                  *context.T
	acceptChannelTimeout time.Duration
	idleExpiry           time.Duration // time after which idle connections will be closed.
	cacheTicker          *time.Ticker
}

type listenState struct {
	q                     *upcqueue.T
	listenLoops           sync.WaitGroup
	dhcpPublisher         *pubsub.Publisher
	serverAuthorizedPeers []security.BlessingPattern // empty list implies all peers are authorized to see the server's blessings.

	mu              sync.Mutex
	serverBlessings security.Blessings
	serverNames     []string
	listeners       map[flow.Listener]*endpointState
	proxyEndpoints  []naming.Endpoint
	proxyErrors     map[string]error
	// TODO(suharshs): Look into making the struct{Protocol, Address string} into
	// a named struct. This may involve changing v.io/v23/rpc.ListenAddrs.
	listenErrors map[struct{ Protocol, Address string }]error
	dirty        chan struct{}
	stopRoaming  func()
	proxyFlows   map[string]flow.Flow // keyed by ep.String()
}

type endpointState struct {
	leps         []naming.Endpoint // the list of currently active endpoints.
	tmplEndpoint naming.Endpoint   // endpoint used as a template for creating new endpoints from the network interfaces provided from roaming.
	roaming      bool
}

// New creates a new flow manager.
func New(
	ctx *context.T,
	rid naming.RoutingID,
	dhcpPublisher *pubsub.Publisher,
	channelTimeout time.Duration,
	idleExpiry time.Duration,
	authorizedPeers []security.BlessingPattern) flow.Manager {
	m := &manager{
		rid:                  rid,
		closed:               make(chan struct{}),
		cache:                NewConnCache(idleExpiry),
		ctx:                  ctx,
		acceptChannelTimeout: channelTimeout,
		idleExpiry:           idleExpiry,
	}
	var valid <-chan struct{}
	if rid != naming.NullRoutingID {
		m.ls = &listenState{
			q:                     upcqueue.New(),
			listeners:             make(map[flow.Listener]*endpointState),
			dirty:                 make(chan struct{}),
			dhcpPublisher:         dhcpPublisher,
			proxyFlows:            make(map[string]flow.Flow),
			proxyErrors:           make(map[string]error),
			listenErrors:          make(map[struct{ Protocol, Address string }]error),
			serverAuthorizedPeers: authorizedPeers,
		}
		p := v23.GetPrincipal(ctx)
		m.ls.serverBlessings, valid = p.BlessingStore().Default()
		m.ls.serverNames = security.BlessingNames(p, m.ls.serverBlessings)
	}
	// Pick a interval that is the minimum of the idleExpiry and the reapCacheInterval.
	cacheInterval := reapCacheInterval
	if idleExpiry > 0 && idleExpiry/2 < cacheInterval {
		cacheInterval = idleExpiry / 2
	}
	if cacheInterval < minCacheInterval {
		cacheInterval = minCacheInterval
	}
	m.cacheTicker = time.NewTicker(cacheInterval)

	statsPrefix := naming.Join("rpc", "flow", rid.String())
	m.cache.ExportStats(naming.Join(statsPrefix, "conn-cache"))
	go func() {
		for {
			select {
			case <-valid:
				m.ls.mu.Lock()
				p := v23.GetPrincipal(ctx)
				m.ls.serverBlessings, valid = p.BlessingStore().Default()
				m.ls.serverNames = security.BlessingNames(p, m.ls.serverBlessings)
				m.updateEndpointBlessingsLocked(m.ls.serverNames)
				m.ls.mu.Unlock()
			case <-ctx.Done():
				m.stopListening()
				m.cache.Close(ctx)
				stats.Delete(statsPrefix)
				close(m.closed)
				return
			case <-m.cacheTicker.C:
				// Periodically kill closed connections and remove expired connections,
				// based on the idleExpiry passed to the NewConnCache constructor.
				m.cache.KillConnections(ctx, 0)
			}
		}
	}()
	return m
}

func (m *manager) stopListening() {
	if m.ls == nil {
		return
	}
	m.cacheTicker.Stop()
	m.ls.mu.Lock()
	listeners := m.ls.listeners
	m.ls.listeners = nil
	if m.ls.dirty != nil {
		close(m.ls.dirty)
		m.ls.dirty = nil
	}
	stopRoaming := m.ls.stopRoaming
	m.ls.stopRoaming = nil
	for _, f := range m.ls.proxyFlows {
		f.Close()
	}
	m.ls.mu.Unlock()
	if stopRoaming != nil {
		stopRoaming()
	}
	for ln := range listeners {
		ln.Close()
	}
	m.ls.listenLoops.Wait()
}

func (m *manager) StopListening(ctx *context.T) {
	if m.ls == nil {
		return
	}
	m.stopListening()
	// Now no more connections can start.  We should lame duck all the conns
	// and wait for all of them to ack.
	m.cache.EnterLameDuckMode(ctx)
	// Now nobody should send any more flows, so close the queue.
	m.ls.q.Close()
}

// Listen causes the Manager to accept flows from the provided protocol and address.
// Listen may be called muliple times.
func (m *manager) Listen(ctx *context.T, protocol, address string) (<-chan struct{}, error) {
	if m.ls == nil {
		return nil, NewErrListeningWithNullRid(ctx)
	}

	ln, lnErr := listen(ctx, protocol, address)
	defer m.ls.mu.Unlock()
	m.ls.mu.Lock()
	if m.ls.listeners == nil {
		if ln != nil {
			ln.Close()
		}
		return nil, flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
	}
	errKey := struct{ Protocol, Address string }{Protocol: protocol, Address: address}
	if lnErr != nil {
		m.ls.listenErrors[errKey] = lnErr
		if m.ls.dirty != nil {
			close(m.ls.dirty)
			m.ls.dirty = make(chan struct{})
		}
		return nil, iflow.MaybeWrapError(flow.ErrNetwork, ctx, lnErr)
	}

	local := naming.Endpoint{
		Protocol:  protocol,
		Address:   ln.Addr().String(),
		RoutingID: m.rid,
	}.WithBlessingNames(m.ls.serverNames)
	leps, roam, err := m.createEndpoints(ctx, local)
	if err != nil {
		m.ls.listenErrors[errKey] = err
		if m.ls.dirty != nil {
			close(m.ls.dirty)
			m.ls.dirty = make(chan struct{})
		}
		return nil, iflow.MaybeWrapError(flow.ErrBadArg, ctx, err)
	}
	m.ls.listeners[ln] = &endpointState{
		leps:         leps,
		tmplEndpoint: local,
		roaming:      roam,
	}
	if m.ls.stopRoaming == nil && m.ls.dhcpPublisher != nil && roam {
		ctx2, cancel := context.WithCancel(ctx)
		ch := make(chan struct{})
		m.ls.stopRoaming = func() {
			cancel()
			<-ch
		}
		go m.monitorNetworkChanges(ctx2, ch)
	}

	// The endpoints have changed on this successful listen so notify any watchers.
	m.ls.listenErrors[errKey] = nil
	if m.ls.dirty != nil {
		close(m.ls.dirty)
		m.ls.dirty = make(chan struct{})
	}

	m.ls.listenLoops.Add(1)
	acceptFailed := make(chan struct{})
	go m.lnAcceptLoop(ctx, ln, local, errKey, acceptFailed)
	return acceptFailed, nil
}

func (m *manager) monitorNetworkChanges(ctx *context.T, done chan<- struct{}) {
	defer close(done)
	change, err := netconfig.NotifyChange()
	if err != nil {
		ctx.Errorf("endpoints will not be updated if the network configuration changes, failed to monitor network changes: %v", err)
		return
	}
	for {
		select {
		case <-ctx.Done():
			return
		case <-change:
			netstate.InvalidateCache()
			m.updateRoamingEndpoints(ctx)
			if change, err = netconfig.NotifyChange(); err != nil {
				ctx.Errorf("endpoints will not be updated if the network configuration changes, failed to monitor network changes: %v", err)
				return
			}
		}
	}
}

func (m *manager) updateRoamingEndpoints(ctx *context.T) {
	ctx.Infof("Network configuration may have changed, adjusting the addresses to listen on (routing id: %v)", m.rid)
	changed := false
	m.ls.mu.Lock()
	defer m.ls.mu.Unlock()
	for _, epState := range m.ls.listeners {
		if !epState.roaming {
			continue
		}
		newleps, _, err := m.createEndpoints(ctx, epState.tmplEndpoint)
		if err != nil {
			ctx.Infof("Unable to update roaming endpoints for template %v: %v", epState.tmplEndpoint, err)
			continue
		}
		if len(newleps) != len(epState.leps) {
			changed = true
		}
		if !changed {
			newset := make(map[string]bool)
			for _, lep := range newleps {
				newset[lep.String()] = true
			}
			for _, lep := range epState.leps {
				if !newset[lep.String()] {
					changed = true
					break
				}
			}
		}
		if changed {
			ctx.Infof("Changing from %v to %v", epState.leps, newleps)
		}
		epState.leps = newleps
	}
	if changed {
		close(m.ls.dirty)
		m.ls.dirty = make(chan struct{})
	}
}

func (m *manager) createEndpoints(ctx *context.T, lep naming.Endpoint) ([]naming.Endpoint, bool, error) {
	iep := lep
	if !strings.HasPrefix(iep.Protocol, "tcp") &&
		!strings.HasPrefix(iep.Protocol, "ws") {
		// If not tcp, ws, or wsh, just return the endpoint we were given.
		return []naming.Endpoint{iep}, false, nil
	}
	host, port, err := net.SplitHostPort(iep.Address)
	if err != nil {
		return nil, false, err
	}
	chooser := v23.GetListenSpec(ctx).AddressChooser
	addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
	if err != nil {
		return nil, false, err
	}
	ieps := make([]naming.Endpoint, 0, len(addrs))
	for _, addr := range addrs {
		n, err := naming.ParseEndpoint(lep.String())
		if err != nil {
			return nil, false, err
		}
		n.Address = net.JoinHostPort(addr.String(), port)
		ieps = append(ieps, n)
	}
	return ieps, unspecified, nil
}

func (m *manager) updateEndpointBlessingsLocked(names []string) {
	for _, eps := range m.ls.listeners {
		eps.tmplEndpoint = eps.tmplEndpoint.WithBlessingNames(names)
		for i := range eps.leps {
			eps.leps[i] = eps.leps[i].WithBlessingNames(names)
		}
	}
	if m.ls.dirty != nil {
		close(m.ls.dirty)
		m.ls.dirty = make(chan struct{})
	}
}

// ProxyListen causes the Manager to accept flows from the specified endpoint.
// The endpoint must correspond to a vanadium proxy.
// If error != nil, establishing a connection to the Proxy failed.
// Otherwise, if error == nil, the returned chan will block until the
// connection to the proxy endpoint fails. The caller may then choose to retry
// the connection.
// name is a identifier of the proxy. It can be used to access errors
// in ListenStatus.ProxyErrors.
func (m *manager) ProxyListen(ctx *context.T, name string, ep naming.Endpoint) (<-chan struct{}, error) {
	if m.ls == nil {
		return nil, NewErrListeningWithNullRid(ctx)
	}
	f, err := m.internalDial(ctx, ep, proxyAuthorizer{}, m.acceptChannelTimeout, true, false)
	if err != nil {
		return nil, err
	}
	k := ep.String()
	m.ls.mu.Lock()
	m.ls.proxyFlows[k] = f
	serverNames := m.ls.serverNames
	m.ls.mu.Unlock()
	w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
	if err != nil {
		m.updateProxyEndpoints(nil, name, err, k)
		return nil, err
	}
	if _, err = f.WriteMsg(w); err != nil {
		m.updateProxyEndpoints(nil, name, err, k)
		return nil, err
	}
	// We connect to the proxy once before we loop.
	if err := m.readAndUpdateProxyEndpoints(ctx, name, f, k, serverNames); err != nil {
		return nil, err
	}
	// We do exponential backoff unless the proxy closes the flow cleanly, in which
	// case we redial immediately.
	done := make(chan struct{})
	go func() {
		for {
			// we keep reading updates until we encounter an error, usually because the
			// flow has been closed.
			if err := m.readAndUpdateProxyEndpoints(ctx, name, f, k, serverNames); err != nil {
				ctx.VI(2).Info(err)
				close(done)
				return
			}
		}
	}()
	return done, nil
}

func (m *manager) readAndUpdateProxyEndpoints(ctx *context.T, name string, f flow.Flow, flowKey string, serverNames []string) error {
	eps, err := m.readProxyResponse(ctx, f)
	if err == nil {
		for i := range eps {
			eps[i] = eps[i].WithBlessingNames(serverNames)
		}
	}
	m.updateProxyEndpoints(eps, name, err, flowKey)
	return err
}

func (m *manager) updateProxyEndpoints(eps []naming.Endpoint, name string, err error, flowKey string) {
	defer m.ls.mu.Unlock()
	m.ls.mu.Lock()
	if err != nil {
		delete(m.ls.proxyFlows, flowKey)
	}
	origErrS, errS := "", ""
	if err != nil {
		errS = err.Error()
	}
	if origErr := m.ls.proxyErrors[name]; origErr != nil {
		errS = origErr.Error()
	}
	if endpointsEqual(m.ls.proxyEndpoints, eps) && errS == origErrS {
		return
	}
	m.ls.proxyEndpoints = eps
	m.ls.proxyErrors[name] = err
	// The proxy endpoints have changed so we need to notify any watchers to
	// requery Status.
	if m.ls.dirty != nil {
		close(m.ls.dirty)
		m.ls.dirty = make(chan struct{})
	}
}

func endpointsEqual(a, b []naming.Endpoint) bool {
	if len(a) != len(b) {
		return false
	}
	m := make(map[string]struct{})
	for _, ep := range a {
		m[ep.String()] = struct{}{}
	}
	for _, ep := range b {
		key := ep.String()
		if _, ok := m[key]; !ok {
			return false
		}
		delete(m, key)
	}
	return len(m) == 0
}

func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
	b, err := f.ReadMsg()
	if err != nil {
		f.Close()
		return nil, err
	}
	msg, err := message.Read(ctx, b)
	if err != nil {
		f.Close()
		return nil, iflow.MaybeWrapError(flow.ErrBadArg, ctx, err)
	}
	switch m := msg.(type) {
	case *message.ProxyResponse:
		return m.Endpoints, nil
	case *message.ProxyErrorResponse:
		f.Close()
		return nil, NewErrProxyResponse(ctx, m.Error)
	default:
		f.Close()
		return nil, flow.NewErrBadArg(ctx, NewErrInvalidProxyResponse(ctx, fmt.Sprintf("%T", m)))
	}
}

type proxyAuthorizer struct{}

func (proxyAuthorizer) AuthorizePeer(
	ctx *context.T,
	localEndpoint, remoteEndpoint naming.Endpoint,
	remoteBlessings security.Blessings,
	remoteDischarges map[string]security.Discharge,
) ([]string, []security.RejectedBlessing, error) {
	return nil, nil, nil
}

func (a proxyAuthorizer) BlessingsForPeer(ctx *context.T, proxyBlessings []string) (
	security.Blessings, map[string]security.Discharge, error) {
	blessings := v23.GetPrincipal(ctx).BlessingStore().ForPeer(proxyBlessings...)
	discharges, _ := slib.PrepareDischarges(ctx, blessings, proxyBlessings, "", nil)
	return blessings, discharges, nil
}

func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint,
	errKey struct{ Protocol, Address string }, acceptFailed chan struct{}) {
	defer m.ls.listenLoops.Done()
	defer func() {
		close(acceptFailed)
		delete(m.ls.listeners, ln)
		if m.ls.dirty != nil {
			close(m.ls.dirty)
			m.ls.dirty = make(chan struct{})
		}
		ln.Close()
	}()
	const killConnectionsRetryDelay = 5 * time.Millisecond
	for {
		flowConn, err := ln.Accept(ctx)
		for tokill := 1; isTemporaryError(err); tokill *= 2 {
			if isTooManyOpenFiles(err) {
				if err := m.cache.KillConnections(ctx, tokill); err != nil {
					ctx.VI(2).Infof("failed to kill connections: %v", err)
					continue
				}
			} else {
				tokill = 1
			}
			time.Sleep(killConnectionsRetryDelay)
			flowConn, err = ln.Accept(ctx)
		}
		if err != nil {
			m.ls.mu.Lock()
			closed := m.ls.listeners == nil
			m.ls.mu.Unlock()
			if !closed {
				ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
				return
			}
			m.ls.mu.Lock()
			m.ls.listenErrors[errKey] = err
			if m.ls.dirty != nil {
				close(m.ls.dirty)
				m.ls.dirty = make(chan struct{})
			}
			m.ls.mu.Unlock()
			return
		}

		m.ls.mu.Lock()
		if m.ls.listeners == nil {
			m.ls.mu.Unlock()
			return
		}
		m.ls.listenLoops.Add(1)
		m.ls.mu.Unlock()
		fh := &flowHandler{m, make(chan struct{})}
		go func() {
			defer m.ls.listenLoops.Done()
			c, err := conn.NewAccepted(
				m.ctx,
				m.ls.serverAuthorizedPeers,
				flowConn,
				local,
				version.Supported,
				handshakeTimeout,
				m.acceptChannelTimeout,
				fh)
			if err != nil {
				ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
				flowConn.Close()
			} else if err = m.cache.InsertWithRoutingID(c, false); err != nil {
				ctx.Errorf("failed to cache conn %v: %v", c, err)
				c.Close(ctx, err)
			}
			close(fh.cached)
		}()
	}
}

type hybridHandler struct {
	handler conn.FlowHandler
	ready   chan struct{}
}

func (h *hybridHandler) HandleFlow(f flow.Flow) error {
	<-h.ready
	return h.handler.HandleFlow(f)
}

func (m *manager) handlerReady(fh conn.FlowHandler, proxy bool) {
	if fh != nil {
		if h, ok := fh.(*hybridHandler); ok {
			if proxy {
				h.handler = &proxyFlowHandler{m: m}
			} else {
				h.handler = &flowHandler{m: m}
			}
			close(h.ready)
		}
	}
}

func newHybridHandler(m *manager) *hybridHandler {
	return &hybridHandler{
		ready: make(chan struct{}),
	}
}

type flowHandler struct {
	m      *manager
	cached chan struct{}
}

func (h *flowHandler) HandleFlow(f flow.Flow) error {
	if h.cached != nil {
		<-h.cached
	}
	return h.m.ls.q.Put(f)
}

type proxyFlowHandler struct {
	m *manager
}

func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error {
	go func() {
		fh := &flowHandler{h.m, make(chan struct{})}
		h.m.ls.mu.Lock()
		if h.m.ls.listeners == nil {
			// If we've entered lame duck mode we want to reject new flows
			// from the proxy.  This should come out as a connection failure
			// for the client, which will result in a retry.
			h.m.ls.mu.Unlock()
			f.Close()
			return
		}
		h.m.ls.mu.Unlock()
		c, err := conn.NewAccepted(
			h.m.ctx,
			h.m.ls.serverAuthorizedPeers,
			f,
			f.LocalEndpoint(),
			version.Supported,
			handshakeTimeout,
			h.m.acceptChannelTimeout,
			fh)
		if err != nil {
			h.m.ctx.Errorf("failed to create accepted conn: %v", err)
		} else if err = h.m.cache.InsertWithRoutingID(c, false); err != nil {
			h.m.ctx.Errorf("failed to create accepted conn: %v", err)
		}
		close(fh.cached)
	}()
	return nil
}

// Status returns the current flow.ListenStatus of the manager.
func (m *manager) Status() flow.ListenStatus {
	var status flow.ListenStatus
	if m.ls == nil {
		return status
	}
	m.ls.mu.Lock()
	status.Endpoints = make([]naming.Endpoint, len(m.ls.proxyEndpoints))
	copy(status.Endpoints, m.ls.proxyEndpoints)
	for _, epState := range m.ls.listeners {
		for _, ep := range epState.leps {
			status.Endpoints = append(status.Endpoints, ep)
		}
	}
	status.ProxyErrors = make(map[string]error, len(m.ls.proxyErrors))
	for k, v := range m.ls.proxyErrors {
		status.ProxyErrors[k] = v
	}
	status.ListenErrors = make(map[struct{ Protocol, Address string }]error, len(m.ls.listenErrors))
	for k, v := range m.ls.listenErrors {
		status.ListenErrors[k] = v
	}
	status.Dirty = m.ls.dirty
	m.ls.mu.Unlock()
	if len(status.Endpoints) == 0 {
		status.Endpoints = append(status.Endpoints, naming.Endpoint{Protocol: bidi.Name, RoutingID: m.rid})
	}
	return status
}

// Accept blocks until a new Flow has been initiated by a remote process.
// Flows are accepted from addresses that the Manager is listening on,
// including outgoing dialed connections.
//
// For example:
//   err := m.Listen(ctx, "tcp", ":0")
//   for {
//     flow, err := m.Accept(ctx)
//     // process flow
//   }
//
// can be used to accept Flows initiated by remote processes.
func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
	if m.ls == nil {
		return nil, NewErrListeningWithNullRid(ctx)
	}
	item, err := m.ls.q.Get(ctx.Done())
	switch {
	case err == upcqueue.ErrQueueIsClosed:
		return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
	case err != nil:
		return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
	default:
		return item.(flow.Flow), nil
	}
}

// Dial creates a Flow to the provided remote endpoint, using 'auth' to
// determine the blessings that will be sent to the remote end.
//
// If the manager has a non-null RoutingID, the Manager will re-use connections
// by Listening on Dialed connections for the lifetime of the Dialed connection.
//
// channelTimeout specifies the duration we are willing to wait before determining
// that connections managed by this Manager are unhealthy and should be
// closed.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) {
	return m.internalDial(ctx, remote, auth, channelTimeout, false, false)
}

// DialSideChannel behaves the same as Dial, except that the returned flow is
// not factored in when deciding the underlying connection's idleness, etc.
func (m *manager) DialSideChannel(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) {
	return m.internalDial(ctx, remote, auth, channelTimeout, false, true)
}

// DialCached creates a Flow to the provided remote endpoint using only cached
// connections from previous Listen or Dial calls.
// If no cached connection exists, an error will be returned.
//
// 'auth' is used to determine the blessings that will be sent to the remote end.
//
// channelTimeout specifies the duration we are willing to wait before determining
// that connections managed by this Manager are unhealthy and should be
// closed.
func (m *manager) DialCached(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) {
	var (
		err                        error
		cached                     cachedConn
		names                      []string
		rejected                   []security.RejectedBlessing
		addr                       = remote.Addr()
		unresNetwork, unresAddress = addr.Network(), addr.String()
		protocol, _                = flow.RegisteredProtocol(unresNetwork)
	)
	if rid := remote.RoutingID; rid != naming.NullRoutingID {
		// In the case the endpoint has a RoutingID we only want to check the cache
		// for the RoutingID because we want to guarantee that the connection we
		// return is to the end server and not a connection to an intermediate proxy.
		cached, names, rejected, err = m.cache.FindWithRoutingID(ctx, remote, auth)
	} else {
		cached, names, rejected, err = m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol)
		if err == nil {
			m.cache.Unreserve(unresNetwork, unresAddress)
		}
	}
	if err != nil {
		return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
	}
	if cached == nil {
		return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, NewErrConnNotInCache(ctx, remote.String()))
	}
	c := cached.(*conn.Conn)
	return dialFlow(ctx, c, remote, names, rejected, channelTimeout, auth, false)
}

func (m *manager) internalDial(
	ctx *context.T,
	remote naming.Endpoint,
	auth flow.PeerAuthorizer,
	channelTimeout time.Duration,
	proxy, sideChannel bool) (flow.Flow, error) {
	// Fast path, look for the conn based on unresolved network, address, and routingId first.
	var (
		c                          *conn.Conn
		addr                       = remote.Addr()
		unresNetwork, unresAddress = addr.Network(), addr.String()
		protocol, _                = flow.RegisteredProtocol(unresNetwork)
	)
	if m.ls != nil && len(m.ls.serverAuthorizedPeers) > 0 {
		auth = &peerAuthorizer{auth, m.ls.serverAuthorizedPeers}
	}
	cached, names, rejected, err := m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol)
	if err != nil {
		return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
	}
	if cached != nil {
		c = cached.(*conn.Conn)
		m.cache.Unreserve(unresNetwork, unresAddress)
	} else {
		// We didn't find the connection we want in the cache.  Dial it.
		defer m.cache.Unreserve(unresNetwork, unresAddress)
		flowConn, err := dial(ctx, protocol, unresNetwork, unresAddress)
		if err != nil {
			switch err := err.(type) {
			case *net.OpError:
				if err, ok := err.Err.(net.Error); ok && err.Timeout() {
					return nil, iflow.MaybeWrapError(verror.ErrTimeout, ctx, err)
				}
			}
			return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
		}
		var fh conn.FlowHandler
		if m.ls != nil {
			m.ls.mu.Lock()
			if stoppedListening := m.ls.listeners == nil; !stoppedListening {
				fh = newHybridHandler(m)
			}
			m.ls.mu.Unlock()
		}
		c, names, rejected, err = conn.NewDialed(
			ctx,
			flowConn,
			localEndpoint(flowConn, m.rid),
			remote,
			version.Supported,
			auth,
			handshakeTimeout,
			0,
			fh,
		)
		if err != nil {
			flowConn.Close()
			return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
		}
		isProxy := proxy || !c.MatchesRID(remote)
		if err := m.cache.Insert(c, isProxy); err != nil {
			c.Close(ctx, err)
			return nil, flow.NewErrBadState(ctx, err)
		}
		// Now that c is in the cache we can explicitly unreserve.
		m.cache.Unreserve(unresNetwork, unresAddress)
		m.handlerReady(fh, proxy)
	}

	// If the connection we found or dialed turns out to not be the person we expected, assume it is a Proxy.
	// We now need to make a flow to a proxy and upgrade it to a conn to the final server.
	if !c.MatchesRID(remote) {
		proxyConn := c
		f, err := proxyConn.Dial(ctx, security.Blessings{}, nil, remote, channelTimeout, false)
		if err != nil {
			return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
		}
		var fh conn.FlowHandler
		if m.ls != nil {
			m.ls.mu.Lock()
			if stoppedListening := m.ls.listeners == nil; !stoppedListening {
				fh = &flowHandler{m: m}
			}
			m.ls.mu.Unlock()
		}
		c, names, rejected, err = conn.NewDialed(
			ctx,
			f,
			proxyConn.LocalEndpoint(),
			remote,
			version.Supported,
			auth,
			handshakeTimeout,
			0,
			fh,
		)
		if err != nil {
			return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
		}
		if err := m.cache.InsertWithRoutingID(c, false); err != nil {
			c.Close(ctx, err)
			return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
		}
	}

	return dialFlow(ctx, c, remote, names, rejected, channelTimeout, auth, sideChannel)
}

func dialFlow(ctx *context.T, c *conn.Conn, remote naming.Endpoint, names []string, rejected []security.RejectedBlessing,
	channelTimeout time.Duration, auth flow.PeerAuthorizer, sideChannel bool) (flow.Flow, error) {
	// Find the proper blessings and dial the final flow.
	blessings, discharges, err := auth.BlessingsForPeer(ctx, names)
	if err != nil {
		return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, NewErrNoBlessingsForPeer(ctx, names, rejected, err))
	}
	f, err := c.Dial(ctx, blessings, discharges, remote, channelTimeout, sideChannel)
	if err != nil {
		return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
	}
	return f, nil
}

// RoutingID returns the naming.Routing of the flow.Manager.
func (m *manager) RoutingID() naming.RoutingID {
	return m.rid
}

// Closed returns a channel that remains open for the lifetime of the Manager
// object. Once the channel is closed any operations on the Manager will
// necessarily fail.
func (m *manager) Closed() <-chan struct{} {
	return m.closed
}

func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
	if p != nil {
		var timeout time.Duration
		if dl, ok := ctx.Deadline(); ok {
			timeout = dl.Sub(time.Now())
		}
		type connAndErr struct {
			c flow.Conn
			e error
		}
		ch := make(chan connAndErr, 1)
		go func() {
			conn, err := p.Dial(ctx, protocol, address, timeout)
			ch <- connAndErr{conn, err}
		}()
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case cae := <-ch:
			return cae.c, cae.e
		}
	}
	return nil, NewErrUnknownProtocol(ctx, protocol)
}

func listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
	if p, _ := flow.RegisteredProtocol(protocol); p != nil {
		ln, err := p.Listen(ctx, protocol, address)
		if err != nil {
			return nil, err
		}
		return ln, nil
	}
	return nil, NewErrUnknownProtocol(ctx, protocol)
}

func localEndpoint(conn flow.Conn, rid naming.RoutingID) naming.Endpoint {
	localAddr := conn.LocalAddr()
	ep := naming.Endpoint{
		Protocol:  localAddr.Network(),
		Address:   localAddr.String(),
		RoutingID: rid,
	}
	return ep
}

func isTemporaryError(err error) bool {
	oErr, ok := err.(*net.OpError)
	return ok && oErr.Temporary()
}

func isTooManyOpenFiles(err error) bool {
	oErr, ok := err.(*net.OpError)
	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
}

// peerAuthorizer implements flow.PeerAuthorizer. It is meant to be used
// when a server operating in private mode (i.e., with a non-empty set
// of authorized peers) acts as a client. It wraps around the PeerAuthorizer
// specified by the call opts and addiitonally ensures that any peers that
// the client communicates with belong to the set of authorized peers.
type peerAuthorizer struct {
	auth            flow.PeerAuthorizer
	authorizedPeers []security.BlessingPattern
}

func (x *peerAuthorizer) AuthorizePeer(
	ctx *context.T,
	localEP, remoteEP naming.Endpoint,
	remoteBlessings security.Blessings,
	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
	localPrincipal := v23.GetPrincipal(ctx)
	// The "Method" and "Suffix" fields of the call are not populated
	// as they are considered irrelevant for authorizing server blessings.
	call := security.NewCall(&security.CallParams{
		Timestamp:        time.Now(),
		LocalPrincipal:   localPrincipal,
		LocalEndpoint:    localEP,
		RemoteBlessings:  remoteBlessings,
		RemoteDischarges: remoteDischarges,
		RemoteEndpoint:   remoteEP,
	})

	peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
	for _, p := range x.authorizedPeers {
		if p.MatchedBy(peerNames...) {
			return x.auth.AuthorizePeer(ctx, localEP, remoteEP, remoteBlessings, remoteDischarges)
		}
	}
	return nil, nil, fmt.Errorf("peer names: %v (rejected: %v) do not match one of the authorized patterns: %v", peerNames, rejectedPeerNames, x.authorizedPeers)
}

func (x *peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
	security.Blessings, map[string]security.Discharge, error) {
	return x.auth.BlessingsForPeer(ctx, peerNames)
}
