veyron/runtimes/google/ipc: VC had a general purpose cache for data
that can be shared amongst Flows on a VC. Blessings use this cache in ipc.Server
and ipc.Client.
TODO in the future (as needed): Add stats for each individual VC cache.
This is described more in: https://docs.google.com/document/d/1jHDDGHUxKcQNJBHwdkyLwZVzR9fm_ctzds-6_TeRL2Y/edit.
Change-Id: Iecbadbc6f86e50675c7b5870e54bd81b2d852bfa
diff --git a/runtimes/google/ipc/blessings_cache.go b/runtimes/google/ipc/blessings_cache.go
new file mode 100644
index 0000000..ff6a93b
--- /dev/null
+++ b/runtimes/google/ipc/blessings_cache.go
@@ -0,0 +1,153 @@
+package ipc
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/ipc/stream"
+ "veyron.io/veyron/veyron2/security"
+)
+
+// clientEncodeBlessings gets or inserts the blessings into the cache.
+func clientEncodeBlessings(cache stream.VCDataCache, blessings security.Blessings) ipc.BlessingsRequest {
+ blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
+ blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
+ return blessingsCache.getOrInsert(blessings)
+}
+
+// clientAckBlessings verifies that the server has updated its cache to include blessings.
+// This means that subsequent rpcs from the client with blessings can send only a cache key.
+func clientAckBlessings(cache stream.VCDataCache, blessings security.Blessings) {
+ blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
+ blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
+ blessingsCache.acknowledge(blessings)
+}
+
+// serverDecodeBlessings insert the key and blessings into the cache or get the blessings if only
+// key is provided in req.
+func serverDecodeBlessings(cache stream.VCDataCache, req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
+ blessingsCacheAny := cache.GetOrInsert(serverBlessingsKey{}, newServerBlessingsCache)
+ blessingsCache := blessingsCacheAny.(*serverBlessingsCache)
+ return blessingsCache.getOrInsert(req, stats)
+}
+
+// IMPLEMENTATION DETAILS BELOW
+
+// clientBlessingsCache is a thread-safe map from blessings to cache key.
+type clientBlessingsCache struct {
+ sync.RWMutex
+ m map[security.Blessings]clientCacheValue
+ key uint64
+}
+
+type clientCacheValue struct {
+ key uint64
+ // ack is set to true once the server has confirmed receipt of the cache key.
+ // Clients that insert into the cache when ack is false must send both the key
+ // and the blessings.
+ ack bool
+}
+
+// clientBlessingsKey is the key used to retrieve the clientBlessingsCache from the VCDataCache.
+type clientBlessingsKey struct{}
+
+func newClientBlessingsCache() interface{} {
+ return &clientBlessingsCache{m: make(map[security.Blessings]clientCacheValue)}
+}
+
+func (c *clientBlessingsCache) getOrInsert(blessings security.Blessings) ipc.BlessingsRequest {
+ c.RLock()
+ val, exists := c.m[blessings]
+ c.RUnlock()
+ if exists {
+ return c.makeBlessingsRequest(val, blessings)
+ }
+ // if the val doesn't exist we must create a new key, update the cache, and send the key and blessings.
+ c.Lock()
+ defer c.Unlock()
+ // we must check that the val wasn't inserted in the time we changed locks.
+ val, exists = c.m[blessings]
+ if exists {
+ return c.makeBlessingsRequest(val, blessings)
+ }
+ newVal := clientCacheValue{key: c.nextKeyLocked()}
+ c.m[blessings] = newVal
+ return c.makeBlessingsRequest(newVal, blessings)
+}
+
+func (c *clientBlessingsCache) acknowledge(blessings security.Blessings) {
+ c.Lock()
+ val := c.m[blessings]
+ val.ack = true
+ c.m[blessings] = val
+ c.Unlock()
+}
+
+func (c *clientBlessingsCache) makeBlessingsRequest(val clientCacheValue, blessings security.Blessings) ipc.BlessingsRequest {
+ if val.ack {
+ // when the value is acknowledged, only send the key, since the server has confirmed that it knows the key.
+ return ipc.BlessingsRequest{Key: val.key}
+ }
+ // otherwise we still need to send both key and blessings, but we must ensure that we send the same key.
+ wireBlessings := security.MarshalBlessings(blessings)
+ return ipc.BlessingsRequest{val.key, &wireBlessings}
+}
+
+// nextKeyLocked creates a new key for inserting blessings. It must be called after acquiring a writer lock.
+func (c *clientBlessingsCache) nextKeyLocked() uint64 {
+ c.key++
+ return c.key
+}
+
+// serverBlessingsCache is a thread-safe map from cache key to blessings.
+type serverBlessingsCache struct {
+ sync.RWMutex
+ m map[uint64]security.Blessings
+}
+
+// serverBlessingsKey is the key used to retrieve the serverBlessingsCache from the VCDataCache.
+type serverBlessingsKey struct{}
+
+func newServerBlessingsCache() interface{} {
+ return &serverBlessingsCache{m: make(map[uint64]security.Blessings)}
+}
+
+func (c *serverBlessingsCache) getOrInsert(req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
+ // In the case that the key sent is 0, we are running in VCSecurityNone and should
+ // return nil for the client Blessings.
+ if req.Key == 0 {
+ return nil, nil
+ }
+ if req.Blessings == nil {
+ // Fastpath, lookup based on the key.
+ c.RLock()
+ cached, exists := c.m[req.Key]
+ c.RUnlock()
+ if !exists {
+ return nil, fmt.Errorf("ipc: key was not in the cache")
+ }
+ stats.recordBlessingCache(true)
+ return cached, nil
+ }
+ // Slowpath, might need to update the cache, or check that the received blessings are
+ // the same as what's in the cache.
+ recv, err := security.NewBlessings(*req.Blessings)
+ if err != nil {
+ return nil, fmt.Errorf("ipc: create new client blessings failed: %v", err)
+ }
+ c.Lock()
+ defer c.Unlock()
+ if cached, exists := c.m[req.Key]; exists {
+ // TODO(suharshs): Replace this reflect.DeepEqual() with a less expensive check.
+ if !reflect.DeepEqual(cached, recv) {
+ return nil, fmt.Errorf("client sent invalid Blessings")
+ }
+ stats.recordBlessingCache(true)
+ return cached, nil
+ }
+ c.m[req.Key] = recv
+ stats.recordBlessingCache(false)
+ return recv, nil
+}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 8475dea..5edc884 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -61,7 +61,9 @@
errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
- errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharge {3}{:4}")
+ errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
+
+ errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
@@ -656,6 +658,8 @@
discharges []security.Discharge // discharges used for this request
dc vc.DischargeClient // client-global discharge-client
+ blessings security.Blessings // the local blessings for the current RPC.
+
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
finished bool // has Finish() already been called?
@@ -703,25 +707,32 @@
if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
}
+ // Encode the Blessings information for the client to authorize the flow.
+ var blessingsRequest ipc.BlessingsRequest
+ if fc.flow.LocalPrincipal() != nil {
+ localBlessings := fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...)
+ blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), localBlessings)
+ }
+ // TODO(suharshs, ataly): Make security.Discharge a vdl type.
+ anyDischarges := make([]vdlutil.Any, len(fc.discharges))
+ for i, d := range fc.discharges {
+ anyDischarges[i] = d
+ }
req := ipc.Request{
Suffix: suffix,
Method: method,
NumPosArgs: uint64(len(args)),
Timeout: int64(timeout),
GrantedBlessings: security.MarshalBlessings(blessings),
- NumDischarges: uint64(len(fc.discharges)),
+ Blessings: blessingsRequest,
+ Discharges: anyDischarges,
TraceRequest: ivtrace.Request(fc.ctx),
}
if err := fc.enc.Encode(req); err != nil {
berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
return fc.close(berr)
}
- for _, d := range fc.discharges {
- if err := fc.enc.Encode(d); err != nil {
- berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errDischargeEncoding, fc.ctx, d.ID(), err))
- return fc.close(berr)
- }
- }
+
for ix, arg := range args {
if err := fc.enc.Encode(arg); err != nil {
berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, ix, err))
@@ -874,6 +885,9 @@
return fc.close(berr)
}
}
+ if fc.response.AckBlessings {
+ clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
+ }
// Incorporate any VTrace info that was returned.
ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
if fc.response.Error != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index bd95b94..6705ef0 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -28,6 +28,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/netstate"
+ "veyron.io/veyron/veyron/lib/stats"
_ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/lib/testutil"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
@@ -1589,6 +1590,107 @@
}
}
+// TestBlessingsCache tests that the VCCache is used to sucessfully used to cache duplicate
+// calls blessings.
+func TestBlessingsCache(t *testing.T) {
+ var (
+ pserver = tsecurity.NewPrincipal("server")
+ pclient = tsecurity.NewPrincipal("client")
+ )
+ // Make the client recognize all server blessings
+ if err := pclient.AddToRoots(pserver.BlessingStore().Default()); err != nil {
+ t.Fatal(err)
+ }
+
+ ns := tnaming.NewSimpleNamespace()
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ t.Fatal(err)
+ }
+ runServer := func(principal security.Principal, rid naming.RoutingID) (ipc.Server, stream.Manager, naming.Endpoint) {
+ sm := imanager.InternalNew(rid)
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{principal})
+ if err != nil {
+ t.Fatal(err)
+ }
+ ep, err := server.Listen(listenSpec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return server, sm, ep[0]
+ }
+
+ server, serverSM, serverEP := runServer(pserver, rid)
+ go server.Serve("mountpoint/testServer", &testServer{}, acceptAllAuthorizer{})
+ defer serverSM.Shutdown()
+
+ newClient := func() ipc.Client {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ t.Fatal(err)
+ }
+ smc := imanager.InternalNew(rid)
+ defer smc.Shutdown()
+ client, err := InternalNewClient(smc, ns, vc.LocalPrincipal{pclient})
+ if err != nil {
+ t.Fatalf("failed to create client: %v", err)
+ }
+ return client
+ }
+
+ runClient := func(client ipc.Client) {
+ var call ipc.Call
+ if call, err = client.StartCall(testContext(), "/"+serverEP.String(), "Closure", nil); err != nil {
+ t.Fatalf("failed to StartCall: %v", err)
+ }
+ if err := call.Finish(); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ cachePrefix := naming.Join("ipc", "server", "routing-id", rid.String(), "security", "blessings", "cache")
+ cacheHits, err := stats.GetStatsObject(naming.Join(cachePrefix, "hits"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ cacheAttempts, err := stats.GetStatsObject(naming.Join(cachePrefix, "attempts"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Check that the blessings cache is not used on the first call.
+ clientA := newClient()
+ runClient(clientA)
+ if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 1 || gotHits != 0 {
+ t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(1), cacheHits(0)", gotAttempts, gotHits)
+ }
+ // Check that the cache is hit on the second call with the same blessings.
+ runClient(clientA)
+ if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 2 || gotHits != 1 {
+ t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(2), cacheHits(1)", gotAttempts, gotHits)
+ }
+ clientA.Close()
+ // Check that the cache is not used with a different client.
+ clientB := newClient()
+ runClient(clientB)
+ if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 3 || gotHits != 1 {
+ t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(3), cacheHits(1)", gotAttempts, gotHits)
+ }
+ // clientB changes its blessings, the cache should not be used.
+ blessings, err := pserver.Bless(pclient.PublicKey(), pserver.BlessingStore().Default(), "cav", mkCaveat(security.ExpiryCaveat(time.Now().Add(time.Hour))))
+ if err != nil {
+ t.Fatalf("failed to create Blessings: %v", err)
+ }
+ if _, err = pclient.BlessingStore().Set(blessings, "server"); err != nil {
+ t.Fatalf("failed to set blessings: %v", err)
+ }
+ runClient(clientB)
+ if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 4 || gotHits != 1 {
+ t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(4), cacheHits(1)", gotAttempts, gotHits)
+ }
+ clientB.Close()
+}
+
func init() {
testutil.Init()
vdlutil.Register(fakeTimeCaveat(0))
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 6fbdd4e..fbc32e7 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -17,6 +17,7 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/services/security/access"
+ "veyron.io/veyron/veyron2/vdl"
old_verror "veyron.io/veyron/veyron2/verror"
verror "veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
@@ -794,13 +795,15 @@
flow stream.Flow // underlying flow
// Fields filled in during the server invocation.
- blessings security.Blessings
- method, suffix string
- tags []interface{}
- discharges map[string]security.Discharge
- starttime time.Time
- endStreamArgs bool // are the stream args at EOF?
- allowDebug bool // true if the caller is permitted to view debug information.
+ clientBlessings security.Blessings
+ ackBlessings bool
+ blessings security.Blessings
+ method, suffix string
+ tags []interface{}
+ discharges map[string]security.Discharge
+ starttime time.Time
+ endStreamArgs bool // are the stream args at EOF?
+ allowDebug bool // true if the caller is permitted to view debug information.
}
var _ ipc.Stream = (*flowServer)(nil)
@@ -895,6 +898,7 @@
EndStreamResults: true,
NumPosResults: uint64(len(results)),
TraceResponse: traceResponse,
+ AckBlessings: fs.ackBlessings,
}
if err := fs.enc.Encode(response); err != nil {
if err == io.EOF {
@@ -1080,13 +1084,27 @@
if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
}
- // Receive third party caveat discharges the client sent
- for i := uint64(0); i < req.NumDischarges; i++ {
- var d security.Discharge
- if err := fs.dec.Decode(&d); err != nil {
- return old_verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
+ fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats)
+ if err != nil {
+ // When the server can't access the blessings cache, the client is not following
+ // protocol, so the server closes the VCs corresponding to the client endpoint.
+ // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+ // of all VCs connected to the RemoteEndpoint.
+ fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+ return old_verror.BadProtocolf("ipc: blessings cache failed: %v", err)
+ }
+ fs.ackBlessings = true
+
+ // TODO(suharshs, ataly): Make security.Discharge a vdl type.
+ for i, d := range req.Discharges {
+ if dis, ok := d.(security.Discharge); ok {
+ fs.discharges[dis.ID()] = dis
+ continue
}
- fs.discharges[d.ID()] = d
+ if v, ok := d.(*vdl.Value); ok {
+ return old_verror.BadProtocolf("ipc: discharge #%d of type %s isn't registered", i, v.Type())
+ }
+ return old_verror.BadProtocolf("ipc: discharge #%d of type %T doesn't implement security.Discharge", i, d)
}
return nil
}
@@ -1193,6 +1211,9 @@
}
func (fs *flowServer) RemoteBlessings() security.Blessings {
//nologcall
+ if fs.clientBlessings != nil {
+ return fs.clientBlessings
+ }
return fs.flow.RemoteBlessings()
}
func (fs *flowServer) Blessings() security.Blessings {
diff --git a/runtimes/google/ipc/stats.go b/runtimes/google/ipc/stats.go
index 334ce23..ea5441f 100644
--- a/runtimes/google/ipc/stats.go
+++ b/runtimes/google/ipc/stats.go
@@ -5,19 +5,25 @@
"time"
"veyron.io/veyron/veyron/lib/stats"
+ "veyron.io/veyron/veyron/lib/stats/counter"
"veyron.io/veyron/veyron/lib/stats/histogram"
"veyron.io/veyron/veyron2/naming"
)
type ipcStats struct {
- mu sync.RWMutex
- prefix string
- methods map[string]*perMethodStats
+ mu sync.RWMutex
+ prefix string
+ methods map[string]*perMethodStats
+ blessingsCacheStats *blessingsCacheStats
}
func newIPCStats(prefix string) *ipcStats {
- return &ipcStats{prefix: prefix, methods: make(map[string]*perMethodStats)}
+ return &ipcStats{
+ prefix: prefix,
+ methods: make(map[string]*perMethodStats),
+ blessingsCacheStats: newBlessingsCacheStats(prefix),
+ }
}
type perMethodStats struct {
@@ -41,6 +47,10 @@
m.latency.Add(int64(latency / time.Millisecond))
}
+func (s *ipcStats) recordBlessingCache(hit bool) {
+ s.blessingsCacheStats.incr(hit)
+}
+
// newPerMethodStats creates a new perMethodStats object if one doesn't exist
// already. It returns the newly created object, or the already existing one.
func (s *ipcStats) newPerMethodStats(method string) *perMethodStats {
@@ -61,3 +71,25 @@
}
return m
}
+
+// blessingsCacheStats keeps blessing cache hits and total calls received to determine
+// how often the blessingCache is being used.
+type blessingsCacheStats struct {
+ callsReceived, cacheHits *counter.Counter
+}
+
+func newBlessingsCacheStats(prefix string) *blessingsCacheStats {
+ cachePrefix := naming.Join(prefix, "security", "blessings", "cache")
+ return &blessingsCacheStats{
+ callsReceived: stats.NewCounter(naming.Join(cachePrefix, "attempts")),
+ cacheHits: stats.NewCounter(naming.Join(cachePrefix, "hits")),
+ }
+}
+
+// Incr increments the cache attempt counter and the cache hit counter if hit is true.
+func (s *blessingsCacheStats) incr(hit bool) {
+ s.callsReceived.Incr(1)
+ if hit {
+ s.cacheHits.Incr(1)
+ }
+}
diff --git a/runtimes/google/ipc/stream/vc/data_cache.go b/runtimes/google/ipc/stream/vc/data_cache.go
new file mode 100644
index 0000000..8b75ed0
--- /dev/null
+++ b/runtimes/google/ipc/stream/vc/data_cache.go
@@ -0,0 +1,40 @@
+package vc
+
+import (
+ "sync"
+)
+
+// dataCache is a thread-safe map for any two types.
+type dataCache struct {
+ sync.RWMutex
+ m map[interface{}]interface{}
+}
+
+func newDataCache() *dataCache {
+ return &dataCache{m: make(map[interface{}]interface{})}
+}
+
+// GetOrInsert first checks if the key exists in the cache with a reader lock.
+// If it doesn't exist, it instead acquires a writer lock, creates and stores the new value
+// with create and returns value.
+func (c *dataCache) GetOrInsert(key interface{}, create func() interface{}) interface{} {
+ // We use the read lock for the fastpath. This should be the more common case, so we rarely
+ // need a writer lock.
+ c.RLock()
+ value, exists := c.m[key]
+ c.RUnlock()
+ if exists {
+ return value
+ }
+ // We acquire the writer lock for the slowpath, and need to re-check if the key exists
+ // in the map, since other thread may have snuck in.
+ c.Lock()
+ defer c.Unlock()
+ value, exists = c.m[key]
+ if exists {
+ return value
+ }
+ value = create()
+ c.m[key] = value
+ return value
+}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index 958db51..8e32596 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -1,6 +1,7 @@
package vc
import (
+ "veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
)
@@ -10,6 +11,7 @@
*reader
*writer
localEndpoint, remoteEndpoint naming.Endpoint
+ dataCache *dataCache
}
type authN interface {
@@ -48,3 +50,9 @@
f.reader.Close()
f.writer.shutdown(false)
}
+
+// VCDataCache returns the stream.VCDataCache object that allows information to be
+// shared across the Flow's parent VC.
+func (f *flow) VCDataCache() stream.VCDataCache {
+ return f.dataCache
+}
diff --git a/runtimes/google/ipc/stream/vc/listener_test.go b/runtimes/google/ipc/stream/vc/listener_test.go
index 098ff33..3f42607 100644
--- a/runtimes/google/ipc/stream/vc/listener_test.go
+++ b/runtimes/google/ipc/stream/vc/listener_test.go
@@ -3,6 +3,7 @@
import (
"testing"
+ "veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
)
@@ -25,6 +26,7 @@
func (*noopFlow) RemoteBlessings() security.Blessings { return nil }
func (*noopFlow) RemoteDischarges() map[string]security.Discharge { return nil }
func (*noopFlow) SetDeadline(<-chan struct{}) {}
+func (*noopFlow) VCDataCache() stream.VCDataCache { return nil }
func TestListener(t *testing.T) {
ln := newListener()
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index 38d8db6..b522f9e 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -61,8 +61,9 @@
crypter crypto.Crypter
closeReason string // reason why the VC was closed
- helper Helper
- version version.IPCVersion
+ helper Helper
+ version version.IPCVersion
+ dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
}
// NoDischarges specifies that the RPC call should not fetch discharges.
@@ -164,6 +165,7 @@
crypter: crypto.NewNullCrypter(),
helper: p.Helper,
version: p.Version,
+ dataCache: newDataCache(),
}
}
@@ -183,6 +185,7 @@
writer: writer,
localEndpoint: vc.localEP,
remoteEndpoint: vc.remoteEP,
+ dataCache: vc.dataCache,
}
vc.mu.Lock()
if vc.flowMap != nil {
@@ -286,6 +289,7 @@
writer: writer,
localEndpoint: vc.localEP,
remoteEndpoint: vc.remoteEP,
+ dataCache: vc.dataCache,
}
if err = vc.listener.Enqueue(f); err != nil {
f.Shutdown()