blob: 92f83483335e0b1befaeb34127ddb027e7dc8552 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"sort"
"strings"
"sync"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/ref/runtime/internal/flow/conn"
)
// ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
// and (routingID).
// Multiple goroutines can invoke methods on the ConnCache simultaneously.
// TODO(suharshs): We should periodically look for closed connections and remove them.
type ConnCache struct {
mu *sync.Mutex
addrCache map[string]*connEntry // keyed by (protocol, address, blessingNames)
ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
started map[string]bool // keyed by (protocol, address, blessingNames)
cond *sync.Cond
}
type connEntry struct {
conn *conn.Conn
rid naming.RoutingID
addrKey string
}
func NewConnCache() *ConnCache {
mu := &sync.Mutex{}
cond := sync.NewCond(mu)
return &ConnCache{
mu: mu,
addrCache: make(map[string]*connEntry),
ridCache: make(map[naming.RoutingID]*connEntry),
started: make(map[string]bool),
cond: cond,
}
}
// ReservedFind returns a Conn where the remote end of the connection is
// identified by the provided (protocol, address, blessingNames). nil is
// returned if there is no such Conn.
//
// ReservedFind will return an error iff the cache is closed.
// If no error is returned, the caller is required to call Unreserve, to avoid
// deadlock. The (protocol, address, blessingNames) provided to Unreserve must
// be the same as the arguments provided to ReservedFind.
// All new ReservedFind calls for the (protocol, address, blessings) will Block
// until the corresponding Unreserve call is made.
func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*conn.Conn, error) {
k := key(protocol, address, blessingNames)
defer c.mu.Unlock()
c.mu.Lock()
for c.started[k] {
c.cond.Wait()
}
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
c.started[k] = true
entry := c.addrCache[k]
if entry == nil {
return nil, nil
}
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
return nil, nil
}
return entry.conn, nil
}
// Unreserve marks the status of the (protocol, address, blessingNames) as no
// longer started, and broadcasts waiting threads.
func (c *ConnCache) Unreserve(protocol, address string, blessingNames []string) {
c.mu.Lock()
delete(c.started, key(protocol, address, blessingNames))
c.cond.Broadcast()
c.mu.Unlock()
}
// Insert adds conn to the cache.
// An error will be returned iff the cache has been closed.
func (c *ConnCache) Insert(conn *conn.Conn) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
ep := conn.RemoteEndpoint()
k := key(ep.Addr().Network(), ep.Addr().String(), ep.BlessingNames())
entry := &connEntry{
conn: conn,
rid: ep.RoutingID(),
addrKey: k,
}
c.addrCache[k] = entry
c.ridCache[entry.rid] = entry
return nil
}
// InsertWithRoutingID add conn to the cache keyed only by conn's RoutingID.
func (c *ConnCache) InsertWithRoutingID(conn *conn.Conn) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
entry := &connEntry{
conn: conn,
rid: conn.RemoteEndpoint().RoutingID(),
}
c.ridCache[entry.rid] = entry
return nil
}
// Close marks the ConnCache as closed and closes all Conns in the cache.
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
c.mu.Lock()
c.addrCache, c.started = nil, nil
for _, d := range c.ridCache {
d.conn.Close(ctx, NewErrCacheClosed(ctx))
}
}
// KillConnections will close and remove num LRU Conns in the cache.
// If connections are already closed they will be removed from the cache.
// This is useful when the manager is approaching system FD limits.
// If num is greater than the number of connections in the cache, all cached
// connections will be closed and removed.
// KillConnections returns an error iff the cache is closed.
func (c *ConnCache) KillConnections(ctx *context.T, num int) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(ctx)
}
err := NewErrConnKilledToFreeResources(ctx)
pq := make(connEntries, 0, len(c.ridCache))
for _, e := range c.ridCache {
if isClosed(e.conn) {
delete(c.addrCache, e.addrKey)
delete(c.ridCache, e.rid)
continue
}
pq = append(pq, e)
}
sort.Sort(pq)
for i := 0; i < num; i++ {
d := pq[i]
d.conn.Close(ctx, err)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
}
return nil
}
// TODO(suharshs): If sorting the connections becomes too slow, switch to
// container/heap instead of sorting all the connections.
type connEntries []*connEntry
func (c connEntries) Len() int {
return len(c)
}
func (c connEntries) Less(i, j int) bool {
return c[i].conn.LastUsedTime().Before(c[j].conn.LastUsedTime())
}
func (c connEntries) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}
// FindWithRoutingID returns a Conn where the remote end of the connection
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
if rid == naming.NullRoutingID {
return nil, nil
}
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
entry := c.ridCache[rid]
if entry == nil {
return nil, nil
}
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
return nil, nil
}
return entry.conn, nil
}
func key(protocol, address string, blessingNames []string) string {
// TODO(suharshs): We may be able to do something more inclusive with our
// blessingNames.
return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
}
func isClosed(conn *conn.Conn) bool {
select {
case <-conn.Closed():
return true
default:
return false
}
}