RPC: Dial all possible connections, even when a connection to the
desired server already exists.
Currently when we have one working connection to a server we never dial
another. This is unfortunate when the connection we do have is
poor (say a bluetooth classic connection) but a better network (say
wifi) becomes available.
Change-Id: I66d0af0541ca6288fcb472d97f73795041dfa904
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
index ed4f715..042c813 100644
--- a/runtime/internal/flow/flowtest/flowtest.go
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -79,12 +79,10 @@
RemoteEndpoint: remoteEndpoint,
})
peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
- ctx.Infof("validating against %v", peerNames)
for _, pattern := range p {
if security.BlessingPattern(pattern).MatchedBy(peerNames...) {
return peerNames, rejectedPeerNames, nil
}
- ctx.Infof("no match %v", pattern)
}
return peerNames, rejectedPeerNames, fmt.Errorf("not authorized")
}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 7645c14..3bd8654 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -8,40 +8,126 @@
"bytes"
"fmt"
"sort"
- "strings"
- "sync"
"time"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/x/lib/nsync"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/flow/conn"
)
+const maxErrorAge = time.Minute * 5
+
// ConnCache is a cache from (protocol, address) and (routingID) to a set of Conns.
// Multiple goroutines may invoke methods on the ConnCache simultaneously.
type ConnCache struct {
- mu *sync.Mutex
- cond *sync.Cond
- addrCache map[string][]*connEntry // keyed by "protocol,address"
- ridCache map[naming.RoutingID][]*connEntry // keyed by remote.RoutingID
- started map[string]bool // keyed by "protocol,address"
+ mu nsync.Mu
+ cond nsync.CV
+
+ conns map[CachedConn]*connEntry
+ cache map[interface{}][]*connEntry
+ errors map[interface{}]dialError
+ reserved map[interface{}]*Reservation
idleExpiry time.Duration
}
type connEntry struct {
- conn cachedConn
- rid naming.RoutingID
- addrKey string
- proxy bool
+ conn CachedConn
+ rid naming.RoutingID
+ proxy bool
+ // cancel is a context.CancelFunc that corresponds to the context
+ // used to dial the connection. Since connections live longer than the
+ // RPC calls which precipiated their being dialed, we have to use
+ // context.WithRootCancel to make a context to dial. This means we need
+ // to cancel that context at some point when the connection is no longer
+ // needed. In our case that's when we eject the context from the cache.
+ cancel context.CancelFunc
+ keys []interface{}
}
-// cachedConn is the interface implemented by *conn.Conn that is used by ConnCache.
+type dialError struct {
+ err error
+ when time.Time
+}
+
+// Reservation represents the right to dial a connection. We only
+// hand out one reservation for a given connection at a time.
+type Reservation struct {
+ cache *ConnCache
+ ctx *context.T
+ cancel context.CancelFunc
+ keys []interface{}
+ remote naming.Endpoint
+ waitForProxy bool
+}
+
+// Context returns the context that should be used to dial the new connection.
+func (r *Reservation) Context() *context.T {
+ return r.ctx
+}
+
+// ProxyConn returns a connection to a relevant proxy if it exists. Otherwise
+// it returns nil and the reservation holder should dial the proxy if necessary.
+func (r *Reservation) ProxyConn() CachedConn {
+ if !r.waitForProxy {
+ return nil
+ }
+ keys := []interface{}{key(r.remote.Protocol, r.remote.Address)}
+ // We ignore the error here. The worst thing that can happen is we try
+ // to dial the proxy again.
+ conn, _, _, _ := r.cache.internalFind(r.ctx, r.remote, keys, nil, true)
+ return conn
+}
+
+// Unreserve removes this reservation, and broadcasts waiting threads to
+// continue with their halted Find call.
+func (r *Reservation) Unreserve(conn, proxyConn CachedConn, err error) error {
+ c := r.cache
+
+ defer c.mu.Unlock()
+ c.mu.Lock()
+
+ if c.conns == nil {
+ return NewErrCacheClosed(r.ctx)
+ }
+
+ for _, k := range r.keys {
+ delete(c.reserved, k)
+ }
+
+ if proxyConn != nil {
+ c.insertConnLocked(r.remote, proxyConn, true, true, r.cancel)
+ r.cancel = nil
+ }
+
+ if conn != nil {
+ c.insertConnLocked(r.remote, conn, proxyConn != nil, proxyConn == nil, r.cancel)
+ r.cancel = nil
+ } else if err != nil {
+ e := dialError{
+ err: err,
+ when: time.Now(),
+ }
+ c.errors[key(r.remote.Protocol, r.remote.Address)] = e
+ c.errors[r.remote.RoutingID] = e
+ }
+
+ if r.cancel != nil {
+ r.cancel()
+ }
+
+ c.cond.Broadcast()
+ return nil
+}
+
+// CachedConn is the interface implemented by *conn.Conn that is used by ConnCache.
// We make the ConnCache API take this interface to make testing easier.
-type cachedConn interface {
+type CachedConn interface {
Status() conn.Status
IsEncapsulated() bool
IsIdle(*context.T, time.Duration) bool
@@ -61,74 +147,207 @@
// NewConnCache creates a ConnCache with an idleExpiry for connections.
// If idleExpiry is zero, connections will never expire.
func NewConnCache(idleExpiry time.Duration) *ConnCache {
- mu := &sync.Mutex{}
- cond := sync.NewCond(mu)
return &ConnCache{
- mu: mu,
- cond: cond,
- addrCache: make(map[string][]*connEntry),
- ridCache: make(map[naming.RoutingID][]*connEntry),
- started: make(map[string]bool),
+ conns: make(map[CachedConn]*connEntry),
+ cache: make(map[interface{}][]*connEntry),
+ errors: make(map[interface{}]dialError),
+ reserved: make(map[interface{}]*Reservation),
idleExpiry: idleExpiry,
}
}
// Insert adds conn to the cache, keyed by both (protocol, address) and (routingID).
// An error will be returned iff the cache has been closed.
-func (c *ConnCache) Insert(conn cachedConn, proxy bool) error {
+func (c *ConnCache) Insert(conn CachedConn, proxy bool) error {
defer c.mu.Unlock()
c.mu.Lock()
- if c.addrCache == nil {
+ if c.conns == nil {
return NewErrCacheClosed(nil)
}
- c.insertConnLocked(conn, proxy, true)
+ c.insertConnLocked(conn.RemoteEndpoint(), conn, proxy, true, nil)
return nil
}
// InsertWithRoutingID adds conn to the cache keyed only by conn's RoutingID.
-func (c *ConnCache) InsertWithRoutingID(conn cachedConn, proxy bool) error {
+func (c *ConnCache) InsertWithRoutingID(conn CachedConn, proxy bool) error {
defer c.mu.Unlock()
c.mu.Lock()
- if c.addrCache == nil {
+ if c.conns == nil {
return NewErrCacheClosed(nil)
}
- c.insertConnLocked(conn, proxy, false)
+ c.insertConnLocked(conn.RemoteEndpoint(), conn, proxy, false, nil)
return nil
}
// Find returns a Conn based on the input remoteEndpoint.
// nil is returned if there is no such Conn.
//
-// If no error is returned, the caller is required to call Unreserve, to avoid
-// deadlock. The (network, address) provided to Unreserve must be the same as
-// the arguments provided to Find.
-// All new Find calls for the (network, address) will Block
-// until the corresponding Unreserve call is made.
-// p is used to check the cache for resolved protocols.
-func (c *ConnCache) Find(ctx *context.T, remote naming.Endpoint, network, address string, auth flow.PeerAuthorizer,
- p flow.Protocol) (conn cachedConn, names []string, rejected []security.RejectedBlessing, err error) {
- if conn, names, rejected, err = c.findWithRoutingID(ctx, remote, auth); err != nil || conn != nil {
- return conn, names, rejected, err
+// Find calls for the will block if the desired connections are
+// currently being dialed. Find will return immediately if the given
+// context is canceled.
+func (c *ConnCache) Find(
+ ctx *context.T,
+ remote naming.Endpoint,
+ auth flow.PeerAuthorizer,
+) (conn CachedConn, names []string, rejected []security.RejectedBlessing, err error) {
+ var keys []interface{}
+ if keys, conn, names, rejected, err = c.internalFindCached(ctx, remote, auth); conn != nil {
+ return conn, names, rejected, nil
}
- if conn, names, rejected, err = c.findWithAddress(ctx, remote, network, address, auth); err != nil || conn != nil {
- return conn, names, rejected, err
- }
- return c.findWithResolvedAddress(ctx, remote, network, address, auth, p)
+ // Finally try waiting for any outstanding dials to complete.
+ return c.internalFind(ctx, remote, keys, auth, true)
}
-// Find returns a Conn based only on the RoutingID of remote.
-func (c *ConnCache) FindWithRoutingID(ctx *context.T, remote naming.Endpoint,
- auth flow.PeerAuthorizer) (cachedConn, []string, []security.RejectedBlessing, error) {
- return c.findWithRoutingID(ctx, remote, auth)
+// FindCached returns a Conn only if it's already in the cache.
+func (c *ConnCache) FindCached(
+ ctx *context.T,
+ remote naming.Endpoint,
+ auth flow.PeerAuthorizer) (conn CachedConn, names []string, rejected []security.RejectedBlessing, err error) {
+ _, conn, names, rejected, err = c.internalFindCached(ctx, remote, auth)
+ return
}
-// Unreserve marks the status of the (network, address) as no longer started, and
-// broadcasts waiting threads to continue with their halted Find call.
-func (c *ConnCache) Unreserve(network, address string) {
+func (c *ConnCache) internalFind(
+ ctx *context.T,
+ remote naming.Endpoint,
+ keys []interface{},
+ auth flow.PeerAuthorizer,
+ wait bool,
+) (CachedConn, []string, []security.RejectedBlessing, error) {
+ c.mu.Lock()
+ var err error
+ var entries rttEntries
+ for {
+ if c.conns == nil {
+ c.mu.Unlock()
+ return nil, nil, nil, NewErrCacheClosed(ctx)
+ }
+ entries, err = c.rttEntriesLocked(ctx, keys)
+ if len(entries) > 0 || !wait || !c.hasReservationsLocked(keys) {
+ break
+ }
+ if c.cond.WaitWithDeadline(&c.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
+ c.mu.Unlock()
+ switch ctx.Err() {
+ case context.Canceled:
+ return nil, nil, nil, verror.NewErrCanceled(ctx)
+ default:
+ return nil, nil, nil, verror.NewErrTimeout(ctx)
+ }
+ }
+ }
+ c.mu.Unlock()
+
+ if len(entries) == 0 {
+ if err == nil {
+ err = NewErrConnNotInCache(ctx, remote.String())
+ }
+ return nil, nil, nil, err
+ }
+ return c.pickFirstAuthorizedConn(ctx, remote, entries, auth)
+}
+
+func (c *ConnCache) internalFindCached(
+ ctx *context.T,
+ remote naming.Endpoint,
+ auth flow.PeerAuthorizer) (keys []interface{}, conn CachedConn, names []string, rejected []security.RejectedBlessing, err error) {
+ // If we have an RID, there's no point in looking under anything else.
+ if rid := remote.RoutingID; rid != naming.NullRoutingID {
+ keys = []interface{}{rid, pathkey(remote.Protocol, remote.Address, rid)}
+ conn, names, rejected, err = c.internalFind(ctx, remote, keys, auth, false)
+ return keys, conn, names, rejected, err
+ }
+
+ // Try looking under the address if there wasn't a routing ID.
+ addrKey := key(remote.Protocol, remote.Address)
+ keys = []interface{}{addrKey}
+ if conn, names, rejected, err = c.internalFind(ctx, remote, keys, auth, false); conn != nil {
+ return keys, conn, names, rejected, nil
+ }
+
+ // If that didn't work, try resolving the address and looking again.
+ p, _ := flow.RegisteredProtocol(remote.Protocol)
+ network, addresses, rerr := resolve(ctx, p, remote.Protocol, remote.Address)
+ if rerr != nil {
+ // TODO(suharshs): Add a unittest for failed resolution.
+ ctx.Errorf("Failed to resolve (%v, %v): %v", remote.Protocol, remote.Address, err)
+ }
+ for _, a := range addresses {
+ if k := key(network, a); k != addrKey {
+ keys = append(keys, k)
+ }
+ }
+ if len(keys) > 1 {
+ conn, names, rejected, err = c.internalFind(ctx, remote, keys, auth, false)
+ }
+ return keys, conn, names, rejected, err
+}
+
+// Reserve reserves the right to dial a remote endpoint.
+func (c *ConnCache) Reserve(ctx *context.T, remote naming.Endpoint) *Reservation {
+ if remote.Protocol == "bidi" {
+ return nil
+ }
defer c.mu.Unlock()
c.mu.Lock()
- delete(c.started, key(network, address))
- c.cond.Broadcast()
+ if c.conns == nil {
+ // Cache is closed.
+ return nil
+ }
+
+ k := key(remote.Protocol, remote.Address)
+
+ if remote.RoutingID == naming.NullRoutingID {
+ if len(c.cache[k]) > 0 || c.reserved[k] != nil {
+ // There are either connections or a reservation for this
+ // address, and the routing id is not given, so no reservations
+ // are needed.
+ return nil
+ }
+ res := &Reservation{
+ cache: c,
+ remote: remote,
+ keys: []interface{}{k},
+ }
+ res.ctx, res.cancel = context.WithCancel(ctx)
+ c.reserved[k] = res
+ return res
+ }
+
+ // OK, now we're in the more complicated case when there is an address and
+ // routing ID, so a proxy might be involved.
+ // TODO(mattr): We should include the routes in the case of multi-proxying.
+ pk := pathkey(remote.Protocol, remote.Address, remote.RoutingID)
+
+ if len(c.cache[k]) == 0 && c.reserved[k] == nil {
+ // Nobody is dialing the address. We'll reserve both the address and the path.
+ res := &Reservation{
+ cache: c,
+ remote: remote,
+ keys: []interface{}{k, pk},
+ }
+ res.ctx, res.cancel = context.WithCancel(ctx)
+ c.reserved[k] = res
+ c.reserved[pk] = res
+ return res
+ }
+
+ if len(c.cache[pk]) == 0 && c.reserved[pk] == nil {
+ // The address connection exists (or is being dialed), but the path doesn't.
+ // We'll only reserve the path.
+ res := &Reservation{
+ cache: c,
+ remote: remote,
+ keys: []interface{}{pk},
+ waitForProxy: true,
+ }
+ res.ctx, res.cancel = context.WithCancel(ctx)
+ c.reserved[pk] = res
+ return res
+ }
+
+ // No need to reserve anything.
+ return nil
}
// KillConnections will closes at least num Conns in the cache.
@@ -150,16 +369,24 @@
func (c *ConnCache) KillConnections(ctx *context.T, num int) error {
defer c.mu.Unlock()
c.mu.Lock()
- if c.addrCache == nil {
+ if c.conns == nil {
return NewErrCacheClosed(ctx)
}
- // entries will be at least c.ridCache size.
- entries := make(lruEntries, 0, len(c.ridCache))
- for _, es := range c.ridCache {
- for _, e := range es {
- entries = append(entries, e)
+
+ // kill old error records. We keep them for a while to allow new finds
+ // to return errors for recent dial attempts, but we need to eliminate
+ // them eventually.
+ now := time.Now()
+ for k, v := range c.errors {
+ if v.when.Add(maxErrorAge).Before(now) {
+ delete(c.errors, k)
}
}
+
+ entries := make(lruEntries, 0, len(c.conns))
+ for _, e := range c.conns {
+ entries = append(entries, e)
+ }
k := 0
for _, e := range entries {
if status := e.conn.Status(); status >= conn.Closing {
@@ -222,12 +449,9 @@
// end to acknowledge the lameduck.
func (c *ConnCache) EnterLameDuckMode(ctx *context.T) {
c.mu.Lock()
- // waitfor will be at least c.ridCache size.
- waitfor := make([]chan struct{}, 0, len(c.ridCache))
- for _, entries := range c.ridCache {
- for _, e := range entries {
- waitfor = append(waitfor, e.conn.EnterLameDuck(ctx))
- }
+ waitfor := make([]chan struct{}, 0, len(c.conns))
+ for _, e := range c.conns {
+ waitfor = append(waitfor, e.conn.EnterLameDuck(ctx))
}
c.mu.Unlock()
for _, w := range waitfor {
@@ -240,14 +464,13 @@
defer c.mu.Unlock()
c.mu.Lock()
err := NewErrCacheClosed(ctx)
- for _, entries := range c.ridCache {
- for _, e := range entries {
- e.conn.Close(ctx, err)
- }
+ for _, e := range c.conns {
+ e.conn.Close(ctx, err)
}
- c.addrCache = nil
- c.ridCache = nil
- c.started = nil
+ c.conns = nil
+ c.cache = nil
+ c.reserved = nil
+ c.errors = nil
}
// String returns a user friendly representation of the connections in the cache.
@@ -255,174 +478,124 @@
defer c.mu.Unlock()
c.mu.Lock()
buf := &bytes.Buffer{}
- if c.addrCache == nil {
+ if c.conns == nil {
return "conncache closed"
}
- fmt.Fprintln(buf, "AddressCache:")
- for k, entries := range c.addrCache {
- for _, e := range entries {
- fmt.Fprintf(buf, "%v: %p\n", k, e.conn)
- }
+ fmt.Fprintln(buf, "Cached:")
+ for _, e := range c.conns {
+ fmt.Fprintf(buf, "%v: %v\n", e.keys, e.conn)
}
- fmt.Fprintln(buf, "RIDCache:")
- for k, entries := range c.ridCache {
- for _, e := range entries {
- fmt.Fprintf(buf, "%v: %p\n", k, e.conn)
- }
+ fmt.Fprintln(buf, "Reserved:")
+ for _, r := range c.reserved {
+ fmt.Fprintf(buf, "%v: %p\n", r.keys, r)
}
return buf.String()
}
// ExportStats exports cache information to the global stats.
func (c *ConnCache) ExportStats(prefix string) {
- stats.NewStringFunc(naming.Join(prefix, "addr"), func() string { return c.debugStringForAddrCache() })
- stats.NewStringFunc(naming.Join(prefix, "rid"), func() string { return c.debugStringForRIDCache() })
- stats.NewStringFunc(naming.Join(prefix, "dialing"), func() string { return c.debugStringForDialing() })
+ stats.NewStringFunc(naming.Join(prefix, "cache"), func() string { return c.debugStringForCache() })
+ stats.NewStringFunc(naming.Join(prefix, "reserved"), func() string { return c.debugStringForDialing() })
}
-func (c *ConnCache) insertConnLocked(conn cachedConn, proxy bool, keyByAddr bool) {
+func (c *ConnCache) insertConnLocked(remote naming.Endpoint, conn CachedConn, proxy bool, keyByAddr bool, cancel context.CancelFunc) {
+ if _, ok := c.conns[conn]; ok {
+ // If the conn is already in the cache, don't re-add it.
+ return
+ }
ep := conn.RemoteEndpoint()
entry := &connEntry{
- conn: conn,
- rid: ep.RoutingID,
- proxy: proxy,
+ conn: conn,
+ rid: ep.RoutingID,
+ proxy: proxy,
+ cancel: cancel,
+ keys: make([]interface{}, 0, 3),
}
+ if entry.rid != naming.NullRoutingID {
+ entry.keys = append(entry.keys, entry.rid)
+ }
+ kdialed := key(remote.Protocol, remote.Address)
if keyByAddr {
- addr := ep.Addr()
- k := key(addr.Network(), addr.String())
- entry.addrKey = k
- c.addrCache[k] = append(c.addrCache[k], entry)
+ entry.keys = append(entry.keys, kdialed)
}
- c.ridCache[entry.rid] = append(c.ridCache[entry.rid], entry)
+ entry.keys = append(entry.keys, pathkey(remote.Protocol, remote.Address, ep.RoutingID))
+ if kresolved := key(ep.Protocol, ep.Address); kresolved != kdialed {
+ if keyByAddr {
+ entry.keys = append(entry.keys, kresolved)
+ }
+ entry.keys = append(entry.keys, pathkey(ep.Protocol, ep.Address, ep.RoutingID))
+ }
+ for _, k := range entry.keys {
+ c.cache[k] = append(c.cache[k], entry)
+ }
+ c.conns[entry.conn] = entry
}
-func (c *ConnCache) findWithRoutingID(ctx *context.T, remote naming.Endpoint,
- auth flow.PeerAuthorizer) (cachedConn, []string, []security.RejectedBlessing, error) {
- c.mu.Lock()
- if c.addrCache == nil {
- c.mu.Unlock()
- return nil, nil, nil, NewErrCacheClosed(ctx)
- }
- rid := remote.RoutingID
- if rid == naming.NullRoutingID {
- c.mu.Unlock()
- return nil, nil, nil, nil
- }
- entries := c.makeRTTEntriesLocked(c.ridCache[rid])
- c.mu.Unlock()
-
- conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth)
- return conn, names, rejected, nil
-}
-
-func (c *ConnCache) findWithAddress(ctx *context.T, remote naming.Endpoint, network, address string,
- auth flow.PeerAuthorizer) (cachedConn, []string, []security.RejectedBlessing, error) {
- k := key(network, address)
-
- c.mu.Lock()
- if c.addrCache == nil {
- c.mu.Unlock()
- return nil, nil, nil, NewErrCacheClosed(ctx)
- }
- for c.started[k] {
- c.cond.Wait()
- if c.addrCache == nil {
- c.mu.Unlock()
- return nil, nil, nil, NewErrCacheClosed(ctx)
+func (c *ConnCache) rttEntriesLocked(ctx *context.T, keys []interface{}) (rttEntries, error) {
+ var entries rttEntries
+ var firstError error
+ for _, k := range keys {
+ if found := c.cache[k]; len(found) > 0 {
+ for _, e := range found {
+ if status := e.conn.Status(); status >= conn.Closing {
+ c.removeEntryLocked(e)
+ } else if !e.conn.RemoteLameDuck() {
+ entries = append(entries, e)
+ }
+ }
+ } else if err := c.errors[k].err; firstError == nil && err != nil {
+ firstError = err
}
}
- c.started[k] = true
- entries := c.makeRTTEntriesLocked(c.addrCache[k])
- c.mu.Unlock()
-
- conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth)
- return conn, names, rejected, nil
-}
-
-func (c *ConnCache) findWithResolvedAddress(ctx *context.T, remote naming.Endpoint, network, address string,
- auth flow.PeerAuthorizer, p flow.Protocol) (cachedConn, []string, []security.RejectedBlessing, error) {
- network, addresses, err := resolve(ctx, p, network, address)
- if err != nil {
- // TODO(suharshs): Add a unittest for failed resolution.
- ctx.VI(2).Infof("Failed to resolve (%v, %v): %v", network, address, err)
- return nil, nil, nil, nil
- }
-
- for _, address := range addresses {
- c.mu.Lock()
- if c.addrCache == nil {
- c.mu.Unlock()
- return nil, nil, nil, NewErrCacheClosed(ctx)
- }
- k := key(network, address)
- entries := c.makeRTTEntriesLocked(c.addrCache[k])
- c.mu.Unlock()
-
- if conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth); conn != nil {
- return conn, names, rejected, nil
- }
- }
-
- return nil, nil, nil, nil
-}
-
-func (c *ConnCache) makeRTTEntriesLocked(es []*connEntry) rttEntries {
- if len(es) == 0 {
- return nil
- }
- // Sort connections by RTT.
- entries := make(rttEntries, len(es))
- copy(entries, es)
sort.Sort(entries)
- // Remove undialable connections.
- k := 0
- for _, e := range entries {
- if status := e.conn.Status(); status >= conn.Closing {
- c.removeEntryLocked(e)
- } else if !e.conn.RemoteLameDuck() {
- entries[k] = e
- k++
- }
- }
- return entries[:k]
+ return entries, firstError
}
-func (c *ConnCache) pickFirstAuthorizedConn(ctx *context.T, remote naming.Endpoint,
- entries rttEntries, auth flow.PeerAuthorizer) (cachedConn, []string, []security.RejectedBlessing) {
+func (c *ConnCache) hasReservationsLocked(keys []interface{}) bool {
+ for _, k := range keys {
+ if c.reserved[k] != nil {
+ return true
+ }
+ }
+ return false
+}
+
+func (c *ConnCache) pickFirstAuthorizedConn(
+ ctx *context.T,
+ remote naming.Endpoint,
+ entries rttEntries,
+ auth flow.PeerAuthorizer) (conn CachedConn, names []string, rejected []security.RejectedBlessing, err error) {
for _, e := range entries {
if e.proxy || auth == nil {
- return e.conn, nil, nil
+ return e.conn, nil, nil, nil
}
- names, rejected, err := auth.AuthorizePeer(ctx,
+ names, rejected, err = auth.AuthorizePeer(ctx,
e.conn.LocalEndpoint(),
remote,
e.conn.RemoteBlessings(),
e.conn.RemoteDischarges())
if err == nil {
- return e.conn, names, rejected
+ return e.conn, names, rejected, nil
}
}
- return nil, nil, nil
+ return nil, nil, nil, err
}
func (c *ConnCache) removeEntryLocked(entry *connEntry) {
- addrConns, ok := c.addrCache[entry.addrKey]
- if ok {
- addrConns = removeEntryFromSlice(addrConns, entry)
- if len(addrConns) == 0 {
- delete(c.addrCache, entry.addrKey)
- } else {
- c.addrCache[entry.addrKey] = addrConns
+ for _, k := range entry.keys {
+ entries, ok := c.cache[k]
+ if ok {
+ entries = removeEntryFromSlice(entries, entry)
+ if len(entries) == 0 {
+ delete(c.cache, k)
+ } else {
+ c.cache[k] = entries
+ }
}
}
- ridConns, ok := c.ridCache[entry.rid]
- if ok {
- ridConns = removeEntryFromSlice(ridConns, entry)
- if len(ridConns) == 0 {
- delete(c.ridCache, entry.rid)
- } else {
- c.ridCache[entry.rid] = ridConns
- }
+ delete(c.conns, entry.conn)
+ if entry.cancel != nil {
+ entry.cancel()
}
}
@@ -437,41 +610,22 @@
return entries
}
-func (c *ConnCache) debugStringForAddrCache() string {
+func (c *ConnCache) debugStringForCache() string {
defer c.mu.Unlock()
c.mu.Lock()
- if c.addrCache == nil {
+ if c.cache == nil {
return "<closed>"
}
- buf := &bytes.Buffer{}
// map iteration is unstable, so sort the keys first
- keys := make([]string, len(c.addrCache))
- i := 0
- for k := range c.addrCache {
- keys[i] = k
- i++
+ keys := make(sortedKeys, 0, len(c.cache))
+ for k := range c.cache {
+ keys = append(keys, k)
}
- sort.Strings(keys)
+ sort.Sort(keys)
+ buf := &bytes.Buffer{}
for _, k := range keys {
fmt.Fprintf(buf, "KEY: %v\n", k)
- for _, e := range c.addrCache[k] {
- fmt.Fprintf(buf, "%v\n", e.conn.DebugString())
- }
- fmt.Fprintf(buf, "\n")
- }
- return buf.String()
-}
-
-func (c *ConnCache) debugStringForRIDCache() string {
- defer c.mu.Unlock()
- c.mu.Lock()
- if c.ridCache == nil {
- return "<closed>"
- }
- buf := &bytes.Buffer{}
- for k, entries := range c.ridCache {
- fmt.Fprintf(buf, "KEY: %v\n", k)
- for _, e := range entries {
+ for _, e := range c.cache[k] {
fmt.Fprintf(buf, "%v\n", e.conn.DebugString())
}
fmt.Fprintf(buf, "\n")
@@ -482,23 +636,47 @@
func (c *ConnCache) debugStringForDialing() string {
defer c.mu.Unlock()
c.mu.Lock()
- if c.started == nil {
+ if c.reserved == nil {
return "<closed>"
}
- keys := make([]string, len(c.started))
- i := 0
- for k := range c.started {
- keys[i] = k
- i++
+ keys := make(sortedKeys, 0, len(c.cache))
+ for k := range c.cache {
+ keys = append(keys, k)
}
- sort.Strings(keys)
- return strings.Join(keys, "\n")
+ sort.Sort(keys)
+ buf := &bytes.Buffer{}
+ for _, k := range keys {
+ fmt.Fprintf(buf, "KEY: %v\n", k)
+ fmt.Fprintf(buf, "%#v\n", c.reserved[k])
+ fmt.Fprintf(buf, "\n")
+ }
+ return buf.String()
+}
+
+type sortedKeys []interface{}
+
+func (e sortedKeys) Len() int {
+ return len(e)
+}
+
+func (e sortedKeys) Less(i, j int) bool {
+ return fmt.Sprint(e[i]) < fmt.Sprint(e[j])
+}
+
+func (e sortedKeys) Swap(i, j int) {
+ e[i], e[j] = e[j], e[i]
}
func key(protocol, address string) string {
+ // TODO(mattr): Unalias the default protocol?
return protocol + "," + address
}
+func pathkey(protocol, address string, rid naming.RoutingID) string {
+ // TODO(mattr): Unalias the default protocol?
+ return protocol + "," + address + "," + rid.String()
+}
+
type rttEntries []*connEntry
func (e rttEntries) Len() int {
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index e4371d6..74e5852 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -5,6 +5,7 @@
package manager
import (
+ "reflect"
"strconv"
"testing"
"time"
@@ -22,79 +23,182 @@
"v.io/x/ref/test/goroutines"
)
-func TestCache(t *testing.T) {
+func init() {
+ flow.RegisterProtocol("resolve", &resolveProtocol{
+ protocol: "local",
+ addresses: []string{"address"},
+ })
+ p, _ := flow.RegisteredProtocol("local")
+ flow.RegisterProtocol("wrong", p)
+}
+
+var nextrid uint64 = 100
+
+func makeEPs(ctx *context.T, addr string) (ep, nullep, wprotoep, waddrep, waddrprotoep, resolvep naming.Endpoint) {
+ ep = naming.Endpoint{
+ Protocol: "local",
+ Address: addr,
+ RoutingID: naming.FixedRoutingID(nextrid),
+ }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
+ nextrid++
+
+ nullep = ep
+ nullep.RoutingID = naming.NullRoutingID
+ wprotoep = nullep
+ wprotoep.Protocol = "wrong"
+ waddrep = nullep
+ waddrep.Address = "wrong"
+ resolvep = nullep
+ resolvep.Protocol = "resolve"
+ resolvep.Address = "wrong"
+ waddrprotoep = ep
+ waddrprotoep.Protocol = "wrong"
+ waddrprotoep.Address = "wrong"
+ return
+}
+
+func modep(ep naming.Endpoint, field string, value interface{}) naming.Endpoint {
+ reflect.ValueOf(ep).FieldByName(field).Set(reflect.ValueOf(value))
+ return ep
+}
+
+func makeEP(ctx *context.T, protocol, address string, rid uint64, blessings ...string) naming.Endpoint {
+ routingID := naming.NullRoutingID
+ if rid != 0 {
+ routingID = naming.FixedRoutingID(rid)
+ }
+ return naming.Endpoint{
+ Protocol: protocol,
+ Address: address,
+ RoutingID: routingID,
+ }.WithBlessingNames(blessings)
+}
+
+func TestCacheReserve(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
defer shutdown()
- p, _ := flow.RegisteredProtocol("local")
c := NewConnCache(0)
- remote := naming.Endpoint{
- Protocol: "tcp",
- Address: "127.0.0.1:1111",
- RoutingID: naming.FixedRoutingID(0x5555),
- }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
- nullRemote := naming.Endpoint{
- Protocol: "tcp",
- Address: "127.0.0.1:1111",
- RoutingID: naming.NullRoutingID,
- }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
+ nullep := makeEP(ctx, "local", "a1", 0, "b1")
+ oneep := makeEP(ctx, "local", "a1", 1, "b1")
+ twoep := makeEP(ctx, "local", "a1", 2, "b1")
+ proxyep := makeEP(ctx, "local", "a1", 3, "b1")
- auth := flowtest.NewPeerAuthorizer(remote.BlessingNames())
- caf := makeConnAndFlow(t, ctx, remote)
+ r1 := c.Reserve(ctx, oneep)
+ if r1 == nil {
+ t.Error("expected non-nil reservation.")
+ }
+
+ // That will have reserved both the address and the path.
+ // A second reservation should return nil.
+ if nr := c.Reserve(ctx, oneep); nr != nil {
+ t.Errorf("got %v, want nil", nr)
+ }
+ if nr := c.Reserve(ctx, nullep); nr != nil {
+ t.Errorf("got %v, want nil", nr)
+ }
+
+ // Reserving the proxy now will return a reservation for only
+ // the proxies RID, since at this point we don't know the proxies
+ // RID.
+ pr := c.Reserve(ctx, proxyep)
+ if pr == nil {
+ t.Errorf("epected non-nil reservation")
+ }
+
+ // Reserving a different RID, but the same address will result in a path
+ // reservation.
+ r2 := c.Reserve(ctx, twoep)
+ if r2 == nil {
+ t.Errorf("expected non-nil reservation.")
+ }
+
+ // Since r1 has reserved the proxy address, it's ProxyConn should return nil.
+ if pc := r1.ProxyConn(); pc != nil {
+ t.Errorf("got %v, expected nil.", pc)
+ }
+ proxycaf := makeConnAndFlow(t, ctx, proxyep)
+ defer proxycaf.stop(ctx)
+ onecaf := makeConnAndFlow(t, ctx, oneep)
+ defer onecaf.stop(ctx)
+ r1.Unreserve(onecaf.c, proxycaf.c, nil)
+
+ // Now, asking for the ProxyConn on the proxy reservation should find
+ // the proxy, since we just inserted it.
+ if pc := pr.ProxyConn().(*connpackage.Conn); pc != proxycaf.c {
+ t.Errorf("got %v, expected %v", pc, proxycaf.c)
+ }
+
+ // Now that the conn exists, we should not get another reservation.
+ if nr := c.Reserve(ctx, oneep); nr != nil {
+ t.Errorf("got %v, want nil", nr)
+ }
+ if nr := c.Reserve(ctx, nullep); nr != nil {
+ t.Errorf("got %v, want nil", nr)
+ }
+
+ // Note that the context should not have been canceled.
+ if err := r1.Context().Err(); err != nil {
+ t.Errorf("got %v want nil", err)
+ }
+
+ pc := r2.ProxyConn()
+ if pc == nil || pc.RemoteEndpoint().RoutingID != proxyep.RoutingID {
+ t.Fatalf("got %v, want %v", pc.RemoteEndpoint(), proxyep)
+ }
+ twocaf := makeConnAndFlow(t, ctx, twoep)
+ defer twocaf.stop(ctx)
+
+ r2.Unreserve(twocaf.c, pc, nil)
+}
+
+func TestCacheFind(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+ c := NewConnCache(0)
+
+ ep, nullep, wprotoep, waddrep, waddrprotoep, resolvep := makeEPs(ctx, "address")
+
+ auth := flowtest.NewPeerAuthorizer(ep.BlessingNames())
+ caf := makeConnAndFlow(t, ctx, ep)
defer caf.stop(ctx)
conn := caf.c
if err := c.Insert(conn, false); err != nil {
t.Fatal(err)
}
// We should be able to find the conn in the cache.
- if got, _, _, err := c.Find(ctx, nullRemote, remote.Protocol, remote.Address, auth, p); err != nil || got != conn {
+ if got, _, _, err := c.Find(ctx, nullep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// Changing the protocol should fail.
- if got, _, _, err := c.Find(ctx, nullRemote, "wrong", remote.Address, auth, p); err != nil || got != nil {
+ if got, _, _, err := c.Find(ctx, wprotoep, auth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
- c.Unreserve("wrong", remote.Address)
// Changing the address should fail.
- if got, _, _, err := c.Find(ctx, nullRemote, remote.Protocol, "wrong", auth, p); err != nil || got != nil {
+ if got, _, _, err := c.Find(ctx, waddrep, auth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
- c.Unreserve(remote.Protocol, "wrong")
// Changing the blessingNames should fail.
- if got, _, _, err := c.Find(ctx, nullRemote, remote.Protocol, remote.Address, flowtest.NewPeerAuthorizer([]string{"wrong"}), p); err != nil || got != nil {
+ if got, _, _, err := c.Find(ctx, ep, flowtest.NewPeerAuthorizer([]string{"wrong"})); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// But finding a set of blessings that has at least one blessings in remote.Blessings should succeed.
- if got, _, _, err := c.Find(ctx, nullRemote, remote.Protocol, remote.Address, flowtest.NewPeerAuthorizer([]string{"foo", remote.BlessingNames()[0]}), p); err != nil || got != conn {
+ if got, _, _, err := c.Find(ctx, ep, flowtest.NewPeerAuthorizer([]string{"foo", ep.BlessingNames()[0]})); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// Finding by routing ID should work.
- if got, _, _, err := c.Find(ctx, remote, "wrong", "wrong", auth, p); err != nil || got != conn {
+ if got, _, _, err := c.Find(ctx, waddrprotoep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
- c.Unreserve("wrong", "wrong")
// Finding by a valid resolve protocol and address should work.
- if got, _, _, err := c.Find(ctx, remote, "wrong", "wrong", auth, &resolveProtocol{protocol: remote.Protocol, addresses: []string{remote.Address}}); err != nil || got != conn {
+ if got, _, _, err := c.Find(ctx, resolvep, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
- c.Unreserve("wrong", "wrong")
-
// Caching a proxied connection should not care about endpoint blessings, since the
// blessings only correspond to the end server.
- proxyep := naming.Endpoint{
- Protocol: "tcp",
- Address: "127.0.0.1:2222",
- RoutingID: naming.FixedRoutingID(0x5555),
- }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
- nullProxyep := naming.Endpoint{
- Protocol: "tcp",
- Address: "127.0.0.1:2222",
- RoutingID: naming.NullRoutingID,
- }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
+ proxyep, nullProxyep, _, _, _, _ := makeEPs(ctx, "proxy")
caf = makeConnAndFlow(t, ctx, proxyep)
defer caf.stop(ctx)
proxyConn := caf.c
@@ -102,80 +206,30 @@
t.Fatal(err)
}
// Wrong blessingNames should still work
- if got, _, _, err := c.Find(ctx, nullProxyep, proxyep.Protocol, proxyep.Address, flowtest.NewPeerAuthorizer([]string{"wrong"}), p); err != nil || got != proxyConn {
+ if got, _, _, err := c.Find(ctx, nullProxyep, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != proxyConn {
t.Errorf("got %v, want %v, err: %v", got, proxyConn, err)
}
- c.Unreserve(proxyep.Protocol, proxyep.Address)
+
+ ridep, nullridep, _, _, waddrprotoridep, _ := makeEPs(ctx, "rid")
// Caching with InsertWithRoutingID should only cache by RoutingID, not with network/address.
- ridEP := naming.Endpoint{
- Protocol: "ridonly",
- Address: "ridonly",
- RoutingID: naming.FixedRoutingID(0x1111),
- }.WithBlessingNames(unionBlessing(ctx, "ridonly"))
- nullRIDEP := naming.Endpoint{
- Protocol: "ridonly",
- Address: "ridonly",
- RoutingID: naming.NullRoutingID,
- }.WithBlessingNames(unionBlessing(ctx, "ridonly"))
- ridauth := flowtest.NewPeerAuthorizer(ridEP.BlessingNames())
- caf = makeConnAndFlow(t, ctx, ridEP)
+ ridauth := flowtest.NewPeerAuthorizer(ridep.BlessingNames())
+ caf = makeConnAndFlow(t, ctx, ridep)
defer caf.stop(ctx)
ridConn := caf.c
if err := c.InsertWithRoutingID(ridConn, false); err != nil {
t.Fatal(err)
}
- if got, _, _, err := c.Find(ctx, nullRIDEP, ridEP.Protocol, ridEP.Address, ridauth, p); err != nil || got != nil {
+ if got, _, _, err := c.Find(ctx, nullridep, ridauth); err == nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
- c.Unreserve(ridEP.Protocol, ridEP.Address)
// Finding by routing ID should work.
- if got, _, _, err := c.Find(ctx, ridEP, "wrong", "wrong", ridauth, p); err != nil || got != ridConn {
+ if got, _, _, err := c.Find(ctx, waddrprotoridep, ridauth); err != nil || got != ridConn {
t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
}
- c.Unreserve("wrong", "wrong")
-
- otherEP := naming.Endpoint{
- Protocol: "other",
- Address: "other",
- RoutingID: naming.FixedRoutingID(0x2222),
- }.WithBlessingNames(unionBlessing(ctx, "other"))
- nullOtherEP := naming.Endpoint{
- Protocol: "other",
- Address: "other",
- RoutingID: naming.NullRoutingID,
- }.WithBlessingNames(unionBlessing(ctx, "other"))
- otherAuth := flowtest.NewPeerAuthorizer(otherEP.BlessingNames())
- caf = makeConnAndFlow(t, ctx, otherEP)
- defer caf.stop(ctx)
- otherConn := caf.c
-
- // Looking up a not yet inserted endpoint should fail.
- if got, _, _, err := c.Find(ctx, nullOtherEP, otherEP.Protocol, otherEP.Address, otherAuth, p); err != nil || got != nil {
- t.Errorf("got %v, want <nil>, err: %v", got, err)
- }
- // Looking it up again should block until a matching Unreserve call is made.
- ch := make(chan cachedConn, 1)
- go func(ch chan cachedConn) {
- conn, _, _, err := c.Find(ctx, nullOtherEP, otherEP.Protocol, otherEP.Address, otherAuth, p)
- if err != nil {
- t.Fatal(err)
- }
- ch <- conn
- }(ch)
-
- // We insert the other conn into the cache.
- if err := c.Insert(otherConn, false); err != nil {
- t.Fatal(err)
- }
- c.Unreserve(otherEP.Protocol, otherEP.Address)
- // Now the c.Find should have unblocked and returned the correct Conn.
- if cachedConn := <-ch; cachedConn != otherConn {
- t.Errorf("got %v, want %v", cachedConn, otherConn)
- }
// Insert a duplicate conn to ensure that replaced conns still get closed.
- caf = makeConnAndFlow(t, ctx, remote)
+ caf = makeConnAndFlow(t, ctx, ep)
defer caf.stop(ctx)
dupConn := caf.c
if err := c.Insert(dupConn, false); err != nil {
@@ -190,15 +244,11 @@
if status := dupConn.Status(); status == connpackage.Closed {
t.Fatal("wanted dupConn to not be closed")
}
- if status := otherConn.Status(); status == connpackage.Closed {
- t.Fatal("wanted otherConn to not be closed")
- }
c.Close(ctx)
- // Now the connections should be closed.
+
<-conn.Closed()
<-ridConn.Closed()
<-dupConn.Closed()
- <-otherConn.Closed()
<-proxyConn.Closed()
}
@@ -219,23 +269,20 @@
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
// conns[3:] should not be closed and still in the cache.
// conns[:3] should be closed and removed from the cache.
for _, conn := range conns[3:] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, ctx, c, conn.c) {
+ if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[:3] {
<-conn.c.Closed()
- if isInCache(t, ctx, c, conn.c) {
+ if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -255,22 +302,19 @@
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, ctx, c, conn.c) {
+ if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
- if isInCache(t, ctx, c, conn.c) {
+ if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -290,23 +334,20 @@
if err := c.KillConnections(ctx, 3); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
// conns[:7] should not be closed and still in the cache.
// conns[7:] should be closed and removed from the cache.
for _, conn := range conns[:7] {
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, ctx, c, conn.c) {
+ if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
- if isInCache(t, ctx, c, conn.c) {
+ if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -333,9 +374,6 @@
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
// All connections should be lameducked.
for _, conn := range conns {
if status := conn.c.Status(); status < connpackage.EnteringLameDuck {
@@ -353,7 +391,7 @@
// All connections should be removed from the cache.
for _, conn := range conns {
<-conn.c.Closed()
- if isInCache(t, ctx, c, conn.c) {
+ if isInCache(ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -372,14 +410,11 @@
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
for _, conn := range conns {
if status := conn.c.Status(); status >= connpackage.EnteringLameDuck {
t.Errorf("conn %v should not have been closed or lameducked", conn)
}
- if !isInCache(t, ctx, c, conn.c) {
+ if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
@@ -398,14 +433,11 @@
if err := c.KillConnections(ctx, 0); err != nil {
t.Fatal(err)
}
- if !cacheSizeMatches(c) {
- t.Errorf("the size of the caches and LRU list do not match")
- }
for _, conn := range conns {
if status := conn.c.Status(); status >= connpackage.LameDuckAcknowledged {
t.Errorf("conn %v should not have been closed or lameducked", conn)
}
- if !isInCache(t, ctx, c, conn.c) {
+ if !isInCache(ctx, c, conn.c) {
t.Errorf("conn %v(%p) should still be in cache:\n%s",
conn.c.RemoteEndpoint(), conn.c, c)
}
@@ -418,11 +450,7 @@
defer shutdown()
c := NewConnCache(0)
- remote := naming.Endpoint{
- Protocol: "tcp",
- Address: "127.0.0.1:1111",
- RoutingID: naming.FixedRoutingID(0x5555),
- }.WithBlessingNames(unionBlessing(ctx, "A", "B", "C"))
+ remote, _, _, _, _, _ := makeEPs(ctx, "normal")
auth := flowtest.NewPeerAuthorizer(remote.BlessingNames())
slow, med, fast := 3*time.Millisecond, 2*time.Millisecond, 1*time.Millisecond
// Add a slow connection into the cache and ensure it is found.
@@ -430,37 +458,33 @@
if err := c.Insert(slowConn, false); err != nil {
t.Fatal(err)
}
- if got, _, _, err := c.Find(ctx, remote, remote.Protocol, remote.Address, auth, nil); err != nil || got != slowConn {
+ if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != slowConn {
t.Errorf("got %v, want %v, err: %v", got, slowConn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// Add a fast connection into the cache and ensure it is found over the slow one.
fastConn := newRTTConn(ctx, remote, fast)
if err := c.Insert(fastConn, false); err != nil {
t.Fatal(err)
}
- if got, _, _, err := c.Find(ctx, remote, remote.Protocol, remote.Address, auth, nil); err != nil || got != fastConn {
+ if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != fastConn {
t.Errorf("got %v, want %v, err: %v", got, fastConn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// Add a med connection into the cache and ensure that the fast one is still found.
medConn := newRTTConn(ctx, remote, med)
if err := c.Insert(medConn, false); err != nil {
t.Fatal(err)
}
- if got, _, _, err := c.Find(ctx, remote, remote.Protocol, remote.Address, auth, nil); err != nil || got != fastConn {
+ if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != fastConn {
t.Errorf("got %v, want %v, err: %v", got, fastConn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
// Kill the fast connection and ensure the med connection is found.
fastConn.Close(ctx, nil)
- if got, _, _, err := c.Find(ctx, remote, remote.Protocol, remote.Address, auth, nil); err != nil || got != medConn {
+ if got, _, _, err := c.Find(ctx, remote, auth); err != nil || got != medConn {
t.Errorf("got %v, want %v, err: %v", got, medConn, err)
}
- c.Unreserve(remote.Protocol, remote.Address)
}
func newRTTConn(ctx *context.T, ep naming.Endpoint, rtt time.Duration) *rttConn {
@@ -514,19 +538,10 @@
func (c *rttConn) LastUsed() time.Time { return time.Now() }
func (c *rttConn) DebugString() string { return "" }
-func isInCache(t *testing.T, ctx *context.T, c *ConnCache, conn *connpackage.Conn) bool {
- p, _ := flow.RegisteredProtocol("local")
+func isInCache(ctx *context.T, c *ConnCache, conn *connpackage.Conn) bool {
rep := conn.RemoteEndpoint()
- rfconn, _, _, err := c.Find(ctx, rep, rep.Addr().Network(), rep.Addr().String(), flowtest.NewPeerAuthorizer(rep.BlessingNames()), p)
- if err != nil {
- t.Error(err)
- }
- c.Unreserve(rep.Addr().Network(), rep.Addr().String())
- return rfconn != nil
-}
-
-func cacheSizeMatches(c *ConnCache) bool {
- return len(c.addrCache) == len(c.ridCache)
+ _, _, _, err := c.Find(ctx, rep, flowtest.NewPeerAuthorizer(rep.BlessingNames()))
+ return err == nil
}
type connAndFlow struct {
@@ -558,7 +573,8 @@
cfs := make([]connAndFlow, n)
for i := 0; i < n; i++ {
cfs[i] = makeConnAndFlow(t, ctx, naming.Endpoint{
- Protocol: strconv.Itoa(i),
+ Protocol: "local",
+ Address: strconv.Itoa(i),
RoutingID: naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
})
}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index aff1927..2369207 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -742,25 +742,12 @@
// closed.
func (m *manager) DialCached(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) {
var (
- err error
- cached cachedConn
- names []string
- rejected []security.RejectedBlessing
- addr = remote.Addr()
- unresNetwork, unresAddress = addr.Network(), addr.String()
- protocol, _ = flow.RegisteredProtocol(unresNetwork)
+ err error
+ cached CachedConn
+ names []string
+ rejected []security.RejectedBlessing
)
- if rid := remote.RoutingID; rid != naming.NullRoutingID {
- // In the case the endpoint has a RoutingID we only want to check the cache
- // for the RoutingID because we want to guarantee that the connection we
- // return is to the end server and not a connection to an intermediate proxy.
- cached, names, rejected, err = m.cache.FindWithRoutingID(ctx, remote, auth)
- } else {
- cached, names, rejected, err = m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol)
- if err == nil {
- m.cache.Unreserve(unresNetwork, unresAddress)
- }
- }
+ cached, names, rejected, err = m.cache.FindCached(ctx, remote, auth)
if err != nil {
return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
}
@@ -777,108 +764,160 @@
auth flow.PeerAuthorizer,
channelTimeout time.Duration,
proxy, sideChannel bool) (flow.Flow, error) {
- // Fast path, look for the conn based on unresolved network, address, and routingId first.
- var (
- c *conn.Conn
- addr = remote.Addr()
- unresNetwork, unresAddress = addr.Network(), addr.String()
- protocol, _ = flow.RegisteredProtocol(unresNetwork)
- )
if m.ls != nil && len(m.ls.serverAuthorizedPeers) > 0 {
auth = &peerAuthorizer{auth, m.ls.serverAuthorizedPeers}
}
- cached, names, rejected, err := m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol)
+
+ if res := m.cache.Reserve(ctx, remote); res != nil {
+ go m.dialReserved(res, remote, auth, channelTimeout, proxy)
+ }
+
+ cached, names, rejected, err := m.cache.Find(ctx, remote, auth)
if err != nil {
return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
}
- if cached != nil {
- c = cached.(*conn.Conn)
- m.cache.Unreserve(unresNetwork, unresAddress)
- } else {
- // We didn't find the connection we want in the cache. Dial it.
- defer m.cache.Unreserve(unresNetwork, unresAddress)
- flowConn, err := dial(ctx, protocol, unresNetwork, unresAddress)
- if err != nil {
- switch err := err.(type) {
- case *net.OpError:
- if err, ok := err.Err.(net.Error); ok && err.Timeout() {
- return nil, iflow.MaybeWrapError(verror.ErrTimeout, ctx, err)
- }
- }
- return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
- }
- var fh conn.FlowHandler
- if m.ls != nil {
- m.ls.mu.Lock()
- if stoppedListening := m.ls.listeners == nil; !stoppedListening {
- fh = newHybridHandler(m)
- }
- m.ls.mu.Unlock()
- }
- c, names, rejected, err = conn.NewDialed(
- ctx,
- flowConn,
- localEndpoint(flowConn, m.rid),
- remote,
- version.Supported,
- auth,
- handshakeTimeout,
- 0,
- fh,
- )
- if err != nil {
- flowConn.Close()
- return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
- }
- isProxy := proxy || !c.MatchesRID(remote)
- if err := m.cache.Insert(c, isProxy); err != nil {
- c.Close(ctx, err)
- return nil, flow.NewErrBadState(ctx, err)
- }
- // Now that c is in the cache we can explicitly unreserve.
- m.cache.Unreserve(unresNetwork, unresAddress)
- m.handlerReady(fh, proxy)
- }
-
- // If the connection we found or dialed turns out to not be the person we expected, assume it is a Proxy.
+ c, _ := cached.(*conn.Conn)
+ // If the connection we found or dialed doesn't have the correct RID, assume it is a Proxy.
// We now need to make a flow to a proxy and upgrade it to a conn to the final server.
if !c.MatchesRID(remote) {
- proxyConn := c
- f, err := proxyConn.Dial(ctx, security.Blessings{}, nil, remote, channelTimeout, false)
- if err != nil {
- return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
- }
- var fh conn.FlowHandler
- if m.ls != nil {
- m.ls.mu.Lock()
- if stoppedListening := m.ls.listeners == nil; !stoppedListening {
- fh = &flowHandler{m: m}
- }
- m.ls.mu.Unlock()
- }
- c, names, rejected, err = conn.NewDialed(
- ctx,
- f,
- proxyConn.LocalEndpoint(),
- remote,
- version.Supported,
- auth,
- handshakeTimeout,
- 0,
- fh,
- )
- if err != nil {
- return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
- }
- if err := m.cache.InsertWithRoutingID(c, false); err != nil {
- c.Close(ctx, err)
- return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
+ if c, names, rejected, err = m.dialProxyConn(ctx, remote, c, auth, channelTimeout); err != nil {
+ return nil, err
}
}
-
return dialFlow(ctx, c, remote, names, rejected, channelTimeout, auth, sideChannel)
}
+func (m *manager) dialReserved(
+ res *Reservation,
+ remote naming.Endpoint,
+ auth flow.PeerAuthorizer,
+ channelTimeout time.Duration,
+ proxy bool) {
+ var fh conn.FlowHandler
+ var c, pc *conn.Conn
+ var err error
+ defer func() {
+ // Note we have to do this annoying shuffle because the conncache wants
+ // CachedConn and if we just pass c, pc directly then we have trouble because
+ // (*conn.Conn)(nil) and (CachedConn)(nil) are not the same.
+ var cc, cpc CachedConn
+ if c != nil {
+ cc = c
+ }
+ if pc != nil {
+ cpc = pc
+ }
+ res.Unreserve(cc, cpc, err)
+ // Note: 'proxy' is true when we are server "listening on" the
+ // proxy. 'pc != nil' is true when we are connecting through a
+ // proxy as a client. Thus, we only want to enable the
+ // proxyFlowHandler when 'proxy' is true, not when 'pc != nil'
+ // is true.
+ m.handlerReady(fh, proxy)
+ }()
+
+ if proxyConn := res.ProxyConn(); proxyConn != nil {
+ c = proxyConn.(*conn.Conn)
+ }
+ if c == nil {
+ // We didn't find the connection we want in the cache. Dial it.
+ // TODO(mattr): In this model we're running the auth twice in
+ // the case that we use the conn we dialed. One idea is to have
+ // peerAuthorizer remember it's result since we only use it for
+ // this one invocation of internalDial.
+ if c, fh, err = m.dialConn(res.Context(), remote, auth, proxy); err != nil {
+ return
+ }
+ }
+ // If the connection we found or dialed doesn't have the correct RID, assume it is a Proxy.
+ // We now need to make a flow to a proxy and upgrade it to a conn to the final server.
+ if !c.MatchesRID(remote) {
+ pc = c
+ if c, _, _, err = m.dialProxyConn(res.Context(), remote, pc, auth, channelTimeout); err != nil {
+ return
+ }
+ } else if proxy {
+ pc, c = c, nil
+ }
+}
+
+func (m *manager) dialConn(
+ ctx *context.T,
+ remote naming.Endpoint,
+ auth flow.PeerAuthorizer,
+ proxy bool) (*conn.Conn, conn.FlowHandler, error) {
+ protocol, _ := flow.RegisteredProtocol(remote.Protocol)
+ flowConn, err := dial(ctx, protocol, remote.Protocol, remote.Address)
+ if err != nil {
+ switch err := err.(type) {
+ case *net.OpError:
+ if err, ok := err.Err.(net.Error); ok && err.Timeout() {
+ return nil, nil, iflow.MaybeWrapError(verror.ErrTimeout, ctx, err)
+ }
+ }
+ return nil, nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
+ }
+ var fh conn.FlowHandler
+ if m.ls != nil {
+ m.ls.mu.Lock()
+ if stoppedListening := m.ls.listeners == nil; !stoppedListening {
+ fh = newHybridHandler(m)
+ }
+ m.ls.mu.Unlock()
+ }
+ c, _, _, err := conn.NewDialed(
+ ctx,
+ flowConn,
+ localEndpoint(flowConn, m.rid),
+ remote,
+ version.Supported,
+ auth,
+ handshakeTimeout,
+ 0,
+ fh,
+ )
+ if err != nil {
+ flowConn.Close()
+ return nil, nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
+ }
+ return c, fh, nil
+}
+
+func (m *manager) dialProxyConn(
+ ctx *context.T,
+ remote naming.Endpoint,
+ proxyConn *conn.Conn,
+ auth flow.PeerAuthorizer,
+ channelTimeout time.Duration) (*conn.Conn, []string, []security.RejectedBlessing, error) {
+ f, err := proxyConn.Dial(ctx, security.Blessings{}, nil, remote, channelTimeout, false)
+ if err != nil {
+ return nil, nil, nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
+ }
+ var fh conn.FlowHandler
+ if m.ls != nil {
+ m.ls.mu.Lock()
+ if stoppedListening := m.ls.listeners == nil; !stoppedListening {
+ fh = &flowHandler{m: m}
+ }
+ m.ls.mu.Unlock()
+ }
+ c, names, rejected, err := conn.NewDialed(
+ ctx,
+ f,
+ proxyConn.LocalEndpoint(),
+ remote,
+ version.Supported,
+ auth,
+ handshakeTimeout,
+ 0,
+ fh,
+ )
+ if err != nil {
+ return nil, names, rejected, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err)
+ }
+ return c, names, rejected, nil
+}
+
func dialFlow(ctx *context.T, c *conn.Conn, remote naming.Endpoint, names []string, rejected []security.RejectedBlessing,
channelTimeout time.Duration, auth flow.PeerAuthorizer, sideChannel bool) (flow.Flow, error) {
// Find the proper blessings and dial the final flow.
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index e6d3e22..0057bf0 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -53,23 +53,23 @@
dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
// At first the cache should be empty.
- if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
+ if got, want := len(dm.(*manager).cache.conns), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
- if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
+ if got, want := len(dm.(*manager).cache.conns), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
- old := dm.(*manager).cache.ridCache[am.RoutingID()][0]
+ old := dm.(*manager).cache.cache[am.RoutingID()][0]
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
- if got, want := len(dm.(*manager).cache.ridCache[am.RoutingID()]), 1; got != want {
+ if got, want := len(dm.(*manager).cache.cache[am.RoutingID()]), 1; got != want {
t.Errorf("got cache size %v, want %v", got, want)
}
- if c := dm.(*manager).cache.ridCache[am.RoutingID()][0]; c != old {
- t.Errorf("got kv want %v", c, old)
+ if c := dm.(*manager).cache.cache[am.RoutingID()][0]; c != old {
+ t.Errorf("got %v want %v", c, old)
}
cancel()
@@ -212,13 +212,13 @@
}
dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
// At first the cache should be empty.
- if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
+ if got, want := len(dm.(*manager).cache.conns), 0; got != want {
b.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
auth := flowtest.AllowAllPeersAuthorizer{}
testFlows(b, ctx, dm, am, auth)
- if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
+ if got, want := len(dm.(*manager).cache.conns), 1; got != want {
b.Fatalf("got cache size %v, want %v", got, want)
}
ep := am.Status().Endpoints[0]
diff --git a/runtime/internal/flow/util.go b/runtime/internal/flow/util.go
index ef04a00..8b5bb6a 100644
--- a/runtime/internal/flow/util.go
+++ b/runtime/internal/flow/util.go
@@ -14,6 +14,12 @@
var noWrapPackages = []string{
"v.io/v23/verror",
"v.io/v23/flow",
+ // TODO(mattr): We want to pass on the peerAuthorizedFailed error
+ // from the peerAuthorizer passed to us from the client. The
+ // client detects that specific error to determine which error
+ // to return to users. We should probably have another way
+ // to prevent this excessive wrapping, but this works for now.
+ "v.io/x/ref/runtime/internal/rpc",
}
func MaybeWrapError(idAction verror.IDAction, ctx *context.T, err error) error {
diff --git a/runtime/internal/naming/namespace/all_test.go b/runtime/internal/naming/namespace/all_test.go
index fce8ccb..570eae7 100644
--- a/runtime/internal/naming/namespace/all_test.go
+++ b/runtime/internal/naming/namespace/all_test.go
@@ -714,7 +714,7 @@
if e, err := clientNs.Resolve(serverCtx, "mt/server", options.NameResolutionAuthorizer{security.AllowEveryone()}); err != nil {
t.Errorf("Resolve should succeed when skipping server authorization. Got (%v, %v) %s", e, err, verror.DebugString(err))
} else if e, err := clientNs.Resolve(serverCtx, "mt/server"); verror.ErrorID(err) != verror.ErrNotTrusted.ID {
- t.Errorf("Resolve should have failed with %q because an attacker has taken over the intermediate mounttable. Got (%+v, errorid=%q:%v)", verror.ErrNotTrusted.ID, e, verror.ErrorID(err), err)
+ t.Errorf("Resolve should have failed with %q because an attacker has taken over the intermediate mounttable. Got\n%+v\nerrorid=%q\nerror=%s", verror.ErrNotTrusted.ID, e, verror.ErrorID(err), verror.DebugString(err))
}
}
diff --git a/runtime/internal/vtrace/vtrace_test.go b/runtime/internal/vtrace/vtrace_test.go
index 4133534..4acf3d5 100644
--- a/runtime/internal/vtrace/vtrace_test.go
+++ b/runtime/internal/vtrace/vtrace_test.go
@@ -7,6 +7,7 @@
import (
"bytes"
"fmt"
+ "regexp"
"strings"
"testing"
"time"
@@ -26,6 +27,8 @@
"v.io/x/ref/test/testutil"
)
+var logre = regexp.MustCompile(`^[a-zA-Z0-9]+\.go:[0-9]*\]`)
+
// initForTest initializes the vtrace runtime and starts a mounttable.
func initForTest(t *testing.T) (*context.T, v23.Shutdown, *testutil.IDProvider) {
ctx, shutdown := test.V23InitWithMounttable()
@@ -154,11 +157,15 @@
func summary(span *vtrace.SpanRecord) string {
summary := span.Name
- if len(span.Annotations) > 0 {
- msgs := []string{}
- for _, annotation := range span.Annotations {
- msgs = append(msgs, annotation.Message)
+ msgs := []string{}
+ for _, annotation := range span.Annotations {
+ if logre.Match([]byte(annotation.Message)) {
+ // Skip log annotations since they're so likely to change.
+ continue
}
+ msgs = append(msgs, annotation.Message)
+ }
+ if len(msgs) > 0 {
summary += ": " + strings.Join(msgs, ", ")
}
return summary