blob: effa8e89e342ce40c851dd1b99fb1451598631ca [file] [log] [blame]
package ipc
import (
"fmt"
"reflect"
"sync"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/security"
)
// clientEncodeBlessings gets or inserts the blessings into the cache.
func clientEncodeBlessings(cache stream.VCDataCache, blessings security.Blessings) ipc.BlessingsRequest {
blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
return blessingsCache.getOrInsert(blessings)
}
// clientAckBlessings verifies that the server has updated its cache to include blessings.
// This means that subsequent rpcs from the client with blessings can send only a cache key.
func clientAckBlessings(cache stream.VCDataCache, blessings security.Blessings) {
blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
blessingsCache.acknowledge(blessings)
}
// serverDecodeBlessings insert the key and blessings into the cache or get the blessings if only
// key is provided in req.
func serverDecodeBlessings(cache stream.VCDataCache, req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
blessingsCacheAny := cache.GetOrInsert(serverBlessingsKey{}, newServerBlessingsCache)
blessingsCache := blessingsCacheAny.(*serverBlessingsCache)
return blessingsCache.getOrInsert(req, stats)
}
// IMPLEMENTATION DETAILS BELOW
// clientBlessingsCache is a thread-safe map from blessings to cache key.
type clientBlessingsCache struct {
sync.RWMutex
m map[security.Blessings]clientCacheValue
key uint64
}
type clientCacheValue struct {
key uint64
// ack is set to true once the server has confirmed receipt of the cache key.
// Clients that insert into the cache when ack is false must send both the key
// and the blessings.
ack bool
}
// clientBlessingsKey is the key used to retrieve the clientBlessingsCache from the VCDataCache.
type clientBlessingsKey struct{}
func newClientBlessingsCache() interface{} {
return &clientBlessingsCache{m: make(map[security.Blessings]clientCacheValue)}
}
func (c *clientBlessingsCache) getOrInsert(blessings security.Blessings) ipc.BlessingsRequest {
c.RLock()
val, exists := c.m[blessings]
c.RUnlock()
if exists {
return c.makeBlessingsRequest(val, blessings)
}
// if the val doesn't exist we must create a new key, update the cache, and send the key and blessings.
c.Lock()
defer c.Unlock()
// we must check that the val wasn't inserted in the time we changed locks.
val, exists = c.m[blessings]
if exists {
return c.makeBlessingsRequest(val, blessings)
}
newVal := clientCacheValue{key: c.nextKeyLocked()}
c.m[blessings] = newVal
return c.makeBlessingsRequest(newVal, blessings)
}
func (c *clientBlessingsCache) acknowledge(blessings security.Blessings) {
c.Lock()
val := c.m[blessings]
val.ack = true
c.m[blessings] = val
c.Unlock()
}
func (c *clientBlessingsCache) makeBlessingsRequest(val clientCacheValue, blessings security.Blessings) ipc.BlessingsRequest {
if val.ack {
// when the value is acknowledged, only send the key, since the server has confirmed that it knows the key.
return ipc.BlessingsRequest{Key: val.key}
}
// otherwise we still need to send both key and blessings, but we must ensure that we send the same key.
wireBlessings := security.MarshalBlessings(blessings)
return ipc.BlessingsRequest{val.key, &wireBlessings}
}
// nextKeyLocked creates a new key for inserting blessings. It must be called after acquiring a writer lock.
func (c *clientBlessingsCache) nextKeyLocked() uint64 {
c.key++
return c.key
}
// serverBlessingsCache is a thread-safe map from cache key to blessings.
type serverBlessingsCache struct {
sync.RWMutex
m map[uint64]security.Blessings
}
// serverBlessingsKey is the key used to retrieve the serverBlessingsCache from the VCDataCache.
type serverBlessingsKey struct{}
func newServerBlessingsCache() interface{} {
return &serverBlessingsCache{m: make(map[uint64]security.Blessings)}
}
func (c *serverBlessingsCache) getOrInsert(req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
// In the case that the key sent is 0, we are running in VCSecurityNone and should
// return nil for the client Blessings.
if req.Key == 0 {
return nil, nil
}
if req.Blessings == nil {
// Fastpath, lookup based on the key.
c.RLock()
cached, exists := c.m[req.Key]
c.RUnlock()
if !exists {
return nil, fmt.Errorf("ipc: key was not in the cache")
}
stats.recordBlessingCache(true)
return cached, nil
}
// Slowpath, might need to update the cache, or check that the received blessings are
// the same as what's in the cache.
recv, err := security.NewBlessings(*req.Blessings)
if err != nil {
return nil, fmt.Errorf("ipc: create new client blessings failed: %v", err)
}
c.Lock()
defer c.Unlock()
if cached, exists := c.m[req.Key]; exists {
// TODO(suharshs): Replace this reflect.DeepEqual() with a less expensive check.
if !reflect.DeepEqual(cached, recv) {
return nil, fmt.Errorf("client sent invalid Blessings")
}
stats.recordBlessingCache(true)
return cached, nil
}
c.m[req.Key] = recv
stats.recordBlessingCache(false)
return recv, nil
}