blob: 8b60c98a5757900de34b1990bd59ae40ffd67fcb [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"bytes"
"fmt"
"sort"
"sync"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/security"
iflow "v.io/x/ref/runtime/internal/flow"
"v.io/x/ref/runtime/internal/flow/conn"
)
// ConnCache is a cache of Conns keyed by (protocol, address) and (routingID).
// Multiple goroutines can invoke methods on the ConnCache simultaneously.
type ConnCache struct {
mu *sync.Mutex
cond *sync.Cond
addrCache map[string]*connEntry // keyed by (protocol, address)
ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
started map[string]bool // keyed by (protocol, address)
unmappedConns map[*connEntry]bool // list of connEntries replaced by other entries
}
type connEntry struct {
conn *conn.Conn
rid naming.RoutingID
addrKey string
proxy bool
}
func NewConnCache() *ConnCache {
mu := &sync.Mutex{}
cond := sync.NewCond(mu)
return &ConnCache{
mu: mu,
cond: cond,
addrCache: make(map[string]*connEntry),
ridCache: make(map[naming.RoutingID]*connEntry),
started: make(map[string]bool),
unmappedConns: make(map[*connEntry]bool),
}
}
func (c *ConnCache) String() string {
buf := &bytes.Buffer{}
fmt.Fprintln(buf, "AddressCache:")
for k, v := range c.addrCache {
fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
}
fmt.Fprintln(buf, "RIDCache:")
for k, v := range c.ridCache {
fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
}
return buf.String()
}
// Insert adds conn to the cache, keyed by both (protocol, address) and (routingID)
// An error will be returned iff the cache has been closed.
func (c *ConnCache) Insert(conn *conn.Conn, proxy bool) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
ep := conn.RemoteEndpoint()
addr := ep.Addr()
k := key(addr.Network(), addr.String())
entry := &connEntry{
conn: conn,
rid: ep.RoutingID(),
addrKey: k,
proxy: proxy,
}
if old := c.ridCache[entry.rid]; old != nil {
c.unmappedConns[old] = true
}
c.addrCache[k] = entry
c.ridCache[entry.rid] = entry
return nil
}
// InsertWithRoutingID adds conn to the cache keyed only by conn's RoutingID.
func (c *ConnCache) InsertWithRoutingID(conn *conn.Conn, proxy bool) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
ep := conn.RemoteEndpoint()
entry := &connEntry{
conn: conn,
rid: ep.RoutingID(),
proxy: proxy,
}
if old := c.ridCache[entry.rid]; old != nil {
c.unmappedConns[old] = true
}
c.ridCache[entry.rid] = entry
return nil
}
// Find returns a Conn based on the input remoteEndpoint.
// nil is returned if there is no such Conn.
//
// Find will return an error iff the cache is closed.
// If no error is returned, the caller is required to call Unreserve, to avoid
// deadlock. The (protocol, address) provided to Unreserve must be the same as
// the arguments provided to Find.
// All new Find calls for the (protocol, address) will Block
// until the corresponding Unreserve call is made.
// p is used to check the cache for resolved protocols.
func (c *ConnCache) Find(ctx *context.T, remote naming.Endpoint, network, address string, auth flow.PeerAuthorizer, p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, nil, nil, NewErrCacheClosed(nil)
}
if rid := remote.RoutingID(); rid != naming.NullRoutingID {
if entry, names, rejected := c.removeUndialable(ctx, remote, c.ridCache[rid], auth); entry != nil {
return entry, names, rejected, nil
}
}
k := key(network, address)
for c.started[k] {
c.cond.Wait()
if c.addrCache == nil {
return nil, nil, nil, NewErrCacheClosed(nil)
}
}
c.started[k] = true
entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
if entry != nil {
return entry, names, rejected, nil
}
return c.findResolvedLocked(ctx, remote, network, address, auth, p)
}
func (c *ConnCache) findResolvedLocked(ctx *context.T, remote naming.Endpoint, network string, address string, auth flow.PeerAuthorizer, p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
var addresses []string
network, addresses, err = resolve(ctx, p, network, address)
if err != nil {
c.unreserveLocked(network, address)
return nil, nil, nil, iflow.MaybeWrapError(flow.ErrResolveFailed, ctx, err)
}
for _, address := range addresses {
k := key(network, address)
entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
if entry != nil {
return entry, names, rejected, nil
}
}
// No entries for any of the addresses were in the cache.
return nil, nil, nil, nil
}
func resolve(ctx *context.T, p flow.Protocol, protocol, address string) (string, []string, error) {
if p != nil {
net, addrs, err := p.Resolve(ctx, protocol, address)
if err != nil {
return "", nil, err
}
if len(addrs) > 0 {
return net, addrs, nil
}
}
return "", nil, NewErrUnknownProtocol(ctx, protocol)
}
// Unreserve marks the status of the (protocol, address) as no longer started, and
// broadcasts waiting threads.
func (c *ConnCache) Unreserve(protocol, address string) {
c.mu.Lock()
c.unreserveLocked(protocol, address)
c.mu.Unlock()
}
func (c *ConnCache) unreserveLocked(protocol, address string) {
delete(c.started, key(protocol, address))
c.cond.Broadcast()
}
// Close marks the ConnCache as closed and closes all Conns in the cache.
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
c.mu.Lock()
c.addrCache, c.started = nil, nil
err := NewErrCacheClosed(ctx)
for _, d := range c.ridCache {
d.conn.Close(ctx, err)
}
for d := range c.unmappedConns {
d.conn.Close(ctx, err)
}
}
// removeUndialable filters connections that are closed, lameducked, or non-proxied
// connections that do not authorize.
func (c *ConnCache) removeUndialable(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
if e == nil {
return nil, nil, nil
}
if status := e.conn.Status(); status >= conn.Closing || e.conn.RemoteLameDuck() {
delete(c.addrCache, e.addrKey)
delete(c.ridCache, e.rid)
if status < conn.Closing {
c.unmappedConns[e] = true
}
return nil, nil, nil
}
if !e.proxy && auth != nil {
names, rejected, err := auth.AuthorizePeer(ctx,
e.conn.LocalEndpoint(),
remote,
e.conn.RemoteBlessings(),
e.conn.RemoteDischarges())
if err != nil {
return nil, names, rejected
}
return e.conn, names, rejected
}
return e.conn, nil, nil
}
// KillConnections will close and remove num LRU Conns in the cache.
// If connections are already closed they will be removed from the cache.
// This is useful when the manager is approaching system FD limits.
// If num is greater than the number of connections in the cache, all cached
// connections will be closed and removed.
// KillConnections returns an error iff the cache is closed.
func (c *ConnCache) KillConnections(ctx *context.T, num int) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(ctx)
}
err := NewErrConnKilledToFreeResources(ctx)
pq := make(connEntries, 0, len(c.ridCache))
for _, e := range c.ridCache {
if entry, _, _ := c.removeUndialable(ctx, nil, e, nil); entry == nil {
continue
}
if e.conn.IsEncapsulated() {
// Killing a proxied connection doesn't save us any FD resources, just memory.
continue
}
pq = append(pq, e)
}
for d := range c.unmappedConns {
if status := d.conn.Status(); status == conn.Closed {
delete(c.unmappedConns, d)
continue
}
if d.conn.IsEncapsulated() {
continue
}
pq = append(pq, d)
}
sort.Sort(pq)
for i := 0; i < num; i++ {
d := pq[i]
d.conn.Close(ctx, err)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
delete(c.unmappedConns, d)
}
return nil
}
func (c *ConnCache) EnterLameDuckMode(ctx *context.T) {
c.mu.Lock()
n := len(c.ridCache) + len(c.unmappedConns)
conns := make([]*conn.Conn, 0, n)
for _, e := range c.ridCache {
conns = append(conns, e.conn)
}
for d := range c.unmappedConns {
conns = append(conns, d.conn)
}
c.mu.Unlock()
waitfor := make([]chan struct{}, 0, n)
for _, c := range conns {
waitfor = append(waitfor, c.EnterLameDuck(ctx))
}
for _, w := range waitfor {
<-w
}
}
// TODO(suharshs): If sorting the connections becomes too slow, switch to
// container/heap instead of sorting all the connections.
type connEntries []*connEntry
func (c connEntries) Len() int {
return len(c)
}
func (c connEntries) Less(i, j int) bool {
return c[i].conn.LastUsed().Before(c[j].conn.LastUsed())
}
func (c connEntries) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}
func key(protocol, address string) string {
return protocol + "," + address
}