blob: 73818d8c2cc7b8f02d86b85509d6cc38b17a70d8 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"fmt"
"sync"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/v23/naming"
"v.io/v23/verror"
)
var errXVCCacheClosed = reg(".errXVCCacheClosed", "vc cache has been closed")
// XVCCache implements a set of VIFs keyed by the endpoint of the remote end and the
// local principal. Multiple goroutines can invoke methods on the XVCCache simultaneously.
type XVCCache struct {
mu sync.Mutex
epCache map[string]*xvc // GUARDED_BY(mu)
ridCache map[naming.RoutingID]*xvc // GUARDED_BY(mu)
started map[string]bool // GUARDED_BY(mu)
cond *sync.Cond
}
// NewXVCCache returns a new cache for XVCs.
func NewXVCCache() *XVCCache {
c := &XVCCache{
epCache: make(map[string]*xvc),
ridCache: make(map[naming.RoutingID]*xvc),
started: make(map[string]bool),
}
c.cond = sync.NewCond(&c.mu)
return c
}
// ReservedFind returns a XVC where the remote end of the underlying connection
// is identified by the provided (ep). Returns nil if there is no
// such XVC.
//
// Iff the cache is closed, ReservedFind will return an error.
// If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock.
// The ep in Unreserve must be the same as the one used in the ReservedFind call.
// During this time, all new ReservedFind calls for this ep will Block until
// the corresponding Unreserve call is made.
func (c *XVCCache) ReservedFind(ep naming.Endpoint) (stream.XVC, error) {
if ret, ok := c.ridCache[ep.RoutingID()]; ok {
return ret, nil
}
k := ep.String()
c.mu.Lock()
defer c.mu.Unlock()
for c.started[k] {
c.cond.Wait()
}
if c.epCache == nil {
return nil, verror.New(errXVCCacheClosed, nil)
}
c.started[k] = true
ret := c.epCache[k]
if ret == nil {
return nil, nil
}
return ret, nil
}
// Unreserve marks the status of the ep as no longer started, and
// broadcasts waiting threads.
func (c *XVCCache) Unreserve(ep naming.Endpoint) {
c.mu.Lock()
delete(c.started, ep.String())
c.cond.Broadcast()
c.mu.Unlock()
}
// Insert adds vc to the cache and returns an error iff the cache has been closed.
func (c *XVCCache) Insert(v stream.XVC) error {
vc, ok := v.(*xvc)
if !ok {
return fmt.Errorf("vc %#v is not of type *vc.xvc", vc)
}
c.mu.Lock()
defer c.mu.Unlock()
if c.epCache == nil {
return verror.New(errXVCCacheClosed, nil)
}
c.epCache[vc.RemoteEndpoint().String()] = vc
c.ridCache[vc.RemoteEndpoint().RoutingID()] = vc
return nil
}
// Close marks the XVCCache as closed and returns the XVCs remaining in the cache.
func (c *XVCCache) Close() []*xvc {
c.mu.Lock()
vcs := make([]*xvc, 0, len(c.epCache))
for _, vc := range c.epCache {
vcs = append(vcs, vc)
}
c.epCache = nil
c.ridCache = nil
c.started = nil
c.mu.Unlock()
return vcs
}
// Delete removes vc from the cache, returning an error iff the cache has been closed.
func (c *XVCCache) Delete(vc *xvc) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.epCache == nil {
return verror.New(errXVCCacheClosed, nil)
}
delete(c.epCache, vc.RemoteEndpoint().String())
delete(c.ridCache, vc.RemoteEndpoint().RoutingID())
return nil
}