blob: b435d9c7c8bdee4b531dc1f14aee468609d05a13 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"strings"
"sync"
"v.io/v23/context"
"v.io/v23/naming"
)
// ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
// and (routingID).
// Multiple goroutines can invoke methods on the ConnCache simultaneously.
// TODO(suharshs): We should periodically look for closed connections and remove them.
type ConnCache struct {
mu *sync.Mutex
addrCache map[string]*connEntry // keyed by (protocol, address, blessingNames)
ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
started map[string]bool // keyed by (protocol, address, blessingNames)
cond *sync.Cond
head *connEntry // the head and tail pointer of the linked list for implementing LRU.
}
type connEntry struct {
conn *Conn
rid naming.RoutingID
addrKey string
next, prev *connEntry
}
func NewConnCache() *ConnCache {
mu := &sync.Mutex{}
cond := sync.NewCond(mu)
head := &connEntry{}
head.next, head.prev = head, head
return &ConnCache{
mu: mu,
addrCache: make(map[string]*connEntry),
ridCache: make(map[naming.RoutingID]*connEntry),
started: make(map[string]bool),
cond: cond,
head: head,
}
}
// ReservedFind returns a Conn where the remote end of the connection is
// identified by the provided (protocol, address, blessingNames). nil is
// returned if there is no such Conn.
//
// ReservedFind will return an error iff the cache is closed.
// If no error is returned, the caller is required to call Unreserve, to avoid
// deadlock. The (protocol, address, blessingNames) provided to Unreserve must
// be the same as the arguments provided to ReservedFind.
// All new ReservedFind calls for the (protocol, address, blessings) will Block
// until the corresponding Unreserve call is made.
func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*Conn, error) {
k := key(protocol, address, blessingNames)
defer c.mu.Unlock()
c.mu.Lock()
for c.started[k] {
c.cond.Wait()
}
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
c.started[k] = true
entry := c.addrCache[k]
if entry == nil {
return nil, nil
}
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
entry.removeFromList()
return nil, nil
}
entry.moveAfter(c.head)
return entry.conn, nil
}
// Unreserve marks the status of the (protocol, address, blessingNames) as no
// longer started, and broadcasts waiting threads.
func (c *ConnCache) Unreserve(protocol, address string, blessingNames []string) {
c.mu.Lock()
delete(c.started, key(protocol, address, blessingNames))
c.cond.Broadcast()
c.mu.Unlock()
}
// Insert adds conn to the cache.
// An error will be returned iff the cache has been closed.
func (c *ConnCache) Insert(conn *Conn) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
ep := conn.RemoteEndpoint()
k := key(ep.Addr().Network(), ep.Addr().String(), ep.BlessingNames())
entry := &connEntry{
conn: conn,
rid: ep.RoutingID(),
addrKey: k,
}
c.addrCache[k] = entry
c.ridCache[entry.rid] = entry
entry.moveAfter(c.head)
return nil
}
// Close marks the ConnCache as closed and closes all Conns in the cache.
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
c.mu.Lock()
c.addrCache, c.ridCache, c.started = nil, nil, nil
d := c.head.next
for d != c.head {
d.conn.Close(ctx, nil)
d = d.next
}
c.head = nil
}
// KillConnections will close and remove num LRU Conns in the cache.
// This is useful when the manager is approaching system FD limits.
// If num is greater than the number of connections in the cache, all cached
// connections will be closed and removed.
// KillConnections returns an error iff the cache is closed.
func (c *ConnCache) KillConnections(ctx *context.T, num int) error {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return NewErrCacheClosed(nil)
}
d := c.head.prev
for i := 0; i < num; i++ {
if d == c.head {
break
}
d.conn.Close(ctx, nil)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
prev := d.prev
d.removeFromList()
d = prev
}
return nil
}
// FindWithRoutingID returns a Conn where the remote end of the connection
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*Conn, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
entry := c.ridCache[rid]
if entry == nil {
return nil, nil
}
if isClosed(entry.conn) {
delete(c.addrCache, entry.addrKey)
delete(c.ridCache, entry.rid)
entry.removeFromList()
return nil, nil
}
entry.moveAfter(c.head)
return entry.conn, nil
}
// Size returns the number of Conns stored in the ConnCache.
func (c *ConnCache) Size() int {
defer c.mu.Unlock()
c.mu.Lock()
return len(c.addrCache)
}
func key(protocol, address string, blessingNames []string) string {
// TODO(suharshs): We may be able to do something more inclusive with our
// blessingNames.
return strings.Join(append([]string{protocol, address}, blessingNames...), ",")
}
func (c *connEntry) removeFromList() {
if c.prev != nil {
c.prev.next = c.next
}
if c.next != nil {
c.next.prev = c.prev
}
c.next, c.prev = nil, nil
}
func (c *connEntry) moveAfter(prev *connEntry) {
c.removeFromList()
c.prev = prev
c.next = prev.next
prev.next.prev = c
prev.next = c
}
func isClosed(conn *Conn) bool {
select {
case <-conn.closed:
return true
default:
return false
}
}