| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vc |
| |
| import ( |
| "fmt" |
| "sync" |
| |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| |
| "v.io/v23/naming" |
| "v.io/v23/verror" |
| ) |
| |
| var errXVCCacheClosed = reg(".errXVCCacheClosed", "vc cache has been closed") |
| |
| // XVCCache implements a set of VIFs keyed by the endpoint of the remote end and the |
| // local principal. Multiple goroutines can invoke methods on the XVCCache simultaneously. |
| type XVCCache struct { |
| mu sync.Mutex |
| epCache map[string]*xvc // GUARDED_BY(mu) |
| ridCache map[naming.RoutingID]*xvc // GUARDED_BY(mu) |
| started map[string]bool // GUARDED_BY(mu) |
| cond *sync.Cond |
| } |
| |
| // NewXVCCache returns a new cache for XVCs. |
| func NewXVCCache() *XVCCache { |
| c := &XVCCache{ |
| epCache: make(map[string]*xvc), |
| ridCache: make(map[naming.RoutingID]*xvc), |
| started: make(map[string]bool), |
| } |
| c.cond = sync.NewCond(&c.mu) |
| return c |
| } |
| |
| // ReservedFind returns a XVC where the remote end of the underlying connection |
| // is identified by the provided (ep). Returns nil if there is no |
| // such XVC. |
| // |
| // Iff the cache is closed, ReservedFind will return an error. |
| // If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock. |
| // The ep in Unreserve must be the same as the one used in the ReservedFind call. |
| // During this time, all new ReservedFind calls for this ep will Block until |
| // the corresponding Unreserve call is made. |
| func (c *XVCCache) ReservedFind(ep naming.Endpoint) (stream.XVC, error) { |
| if ret, ok := c.ridCache[ep.RoutingID()]; ok { |
| return ret, nil |
| } |
| k := ep.String() |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| for c.started[k] { |
| c.cond.Wait() |
| } |
| if c.epCache == nil { |
| return nil, verror.New(errXVCCacheClosed, nil) |
| } |
| c.started[k] = true |
| ret := c.epCache[k] |
| if ret == nil { |
| return nil, nil |
| } |
| return ret, nil |
| } |
| |
| // Unreserve marks the status of the ep as no longer started, and |
| // broadcasts waiting threads. |
| func (c *XVCCache) Unreserve(ep naming.Endpoint) { |
| c.mu.Lock() |
| delete(c.started, ep.String()) |
| c.cond.Broadcast() |
| c.mu.Unlock() |
| } |
| |
| // Insert adds vc to the cache and returns an error iff the cache has been closed. |
| func (c *XVCCache) Insert(v stream.XVC) error { |
| vc, ok := v.(*xvc) |
| if !ok { |
| return fmt.Errorf("vc %#v is not of type *vc.xvc", vc) |
| } |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| if c.epCache == nil { |
| return verror.New(errXVCCacheClosed, nil) |
| } |
| c.epCache[vc.RemoteEndpoint().String()] = vc |
| c.ridCache[vc.RemoteEndpoint().RoutingID()] = vc |
| return nil |
| } |
| |
| // Close marks the XVCCache as closed and returns the XVCs remaining in the cache. |
| func (c *XVCCache) Close() []*xvc { |
| c.mu.Lock() |
| vcs := make([]*xvc, 0, len(c.epCache)) |
| for _, vc := range c.epCache { |
| vcs = append(vcs, vc) |
| } |
| c.epCache = nil |
| c.ridCache = nil |
| c.started = nil |
| c.mu.Unlock() |
| return vcs |
| } |
| |
| // Delete removes vc from the cache, returning an error iff the cache has been closed. |
| func (c *XVCCache) Delete(vc *xvc) error { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| if c.epCache == nil { |
| return verror.New(errXVCCacheClosed, nil) |
| } |
| delete(c.epCache, vc.RemoteEndpoint().String()) |
| delete(c.ridCache, vc.RemoteEndpoint().RoutingID()) |
| return nil |
| } |