veyron/runtimes/google/ipc: VC had a general purpose cache for data
that can be shared amongst Flows on a VC. Blessings use this cache in ipc.Server
and ipc.Client.

TODO in the future (as needed): Add stats for each individual VC cache.

This is described more in: https://docs.google.com/document/d/1jHDDGHUxKcQNJBHwdkyLwZVzR9fm_ctzds-6_TeRL2Y/edit.

Change-Id: Iecbadbc6f86e50675c7b5870e54bd81b2d852bfa
diff --git a/runtimes/google/ipc/blessings_cache.go b/runtimes/google/ipc/blessings_cache.go
new file mode 100644
index 0000000..ff6a93b
--- /dev/null
+++ b/runtimes/google/ipc/blessings_cache.go
@@ -0,0 +1,153 @@
+package ipc
+
+import (
+	"fmt"
+	"reflect"
+	"sync"
+
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/ipc/stream"
+	"veyron.io/veyron/veyron2/security"
+)
+
+// clientEncodeBlessings gets or inserts the blessings into the cache.
+func clientEncodeBlessings(cache stream.VCDataCache, blessings security.Blessings) ipc.BlessingsRequest {
+	blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
+	blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
+	return blessingsCache.getOrInsert(blessings)
+}
+
+// clientAckBlessings verifies that the server has updated its cache to include blessings.
+// This means that subsequent rpcs from the client with blessings can send only a cache key.
+func clientAckBlessings(cache stream.VCDataCache, blessings security.Blessings) {
+	blessingsCacheAny := cache.GetOrInsert(clientBlessingsKey{}, newClientBlessingsCache)
+	blessingsCache := blessingsCacheAny.(*clientBlessingsCache)
+	blessingsCache.acknowledge(blessings)
+}
+
+// serverDecodeBlessings insert the key and blessings into the cache or get the blessings if only
+// key is provided in req.
+func serverDecodeBlessings(cache stream.VCDataCache, req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
+	blessingsCacheAny := cache.GetOrInsert(serverBlessingsKey{}, newServerBlessingsCache)
+	blessingsCache := blessingsCacheAny.(*serverBlessingsCache)
+	return blessingsCache.getOrInsert(req, stats)
+}
+
+// IMPLEMENTATION DETAILS BELOW
+
+// clientBlessingsCache is a thread-safe map from blessings to cache key.
+type clientBlessingsCache struct {
+	sync.RWMutex
+	m   map[security.Blessings]clientCacheValue
+	key uint64
+}
+
+type clientCacheValue struct {
+	key uint64
+	// ack is set to true once the server has confirmed receipt of the cache key.
+	// Clients that insert into the cache when ack is false must send both the key
+	// and the blessings.
+	ack bool
+}
+
+// clientBlessingsKey is the key used to retrieve the clientBlessingsCache from the VCDataCache.
+type clientBlessingsKey struct{}
+
+func newClientBlessingsCache() interface{} {
+	return &clientBlessingsCache{m: make(map[security.Blessings]clientCacheValue)}
+}
+
+func (c *clientBlessingsCache) getOrInsert(blessings security.Blessings) ipc.BlessingsRequest {
+	c.RLock()
+	val, exists := c.m[blessings]
+	c.RUnlock()
+	if exists {
+		return c.makeBlessingsRequest(val, blessings)
+	}
+	// if the val doesn't exist we must create a new key, update the cache, and send the key and blessings.
+	c.Lock()
+	defer c.Unlock()
+	// we must check that the val wasn't inserted in the time we changed locks.
+	val, exists = c.m[blessings]
+	if exists {
+		return c.makeBlessingsRequest(val, blessings)
+	}
+	newVal := clientCacheValue{key: c.nextKeyLocked()}
+	c.m[blessings] = newVal
+	return c.makeBlessingsRequest(newVal, blessings)
+}
+
+func (c *clientBlessingsCache) acknowledge(blessings security.Blessings) {
+	c.Lock()
+	val := c.m[blessings]
+	val.ack = true
+	c.m[blessings] = val
+	c.Unlock()
+}
+
+func (c *clientBlessingsCache) makeBlessingsRequest(val clientCacheValue, blessings security.Blessings) ipc.BlessingsRequest {
+	if val.ack {
+		// when the value is acknowledged, only send the key, since the server has confirmed that it knows the key.
+		return ipc.BlessingsRequest{Key: val.key}
+	}
+	// otherwise we still need to send both key and blessings, but we must ensure that we send the same key.
+	wireBlessings := security.MarshalBlessings(blessings)
+	return ipc.BlessingsRequest{val.key, &wireBlessings}
+}
+
+// nextKeyLocked creates a new key for inserting blessings. It must be called after acquiring a writer lock.
+func (c *clientBlessingsCache) nextKeyLocked() uint64 {
+	c.key++
+	return c.key
+}
+
+// serverBlessingsCache is a thread-safe map from cache key to blessings.
+type serverBlessingsCache struct {
+	sync.RWMutex
+	m map[uint64]security.Blessings
+}
+
+// serverBlessingsKey is the key used to retrieve the serverBlessingsCache from the VCDataCache.
+type serverBlessingsKey struct{}
+
+func newServerBlessingsCache() interface{} {
+	return &serverBlessingsCache{m: make(map[uint64]security.Blessings)}
+}
+
+func (c *serverBlessingsCache) getOrInsert(req ipc.BlessingsRequest, stats *ipcStats) (security.Blessings, error) {
+	// In the case that the key sent is 0, we are running in VCSecurityNone and should
+	// return nil for the client Blessings.
+	if req.Key == 0 {
+		return nil, nil
+	}
+	if req.Blessings == nil {
+		// Fastpath, lookup based on the key.
+		c.RLock()
+		cached, exists := c.m[req.Key]
+		c.RUnlock()
+		if !exists {
+			return nil, fmt.Errorf("ipc: key was not in the cache")
+		}
+		stats.recordBlessingCache(true)
+		return cached, nil
+	}
+	// Slowpath, might need to update the cache, or check that the received blessings are
+	// the same as what's in the cache.
+	recv, err := security.NewBlessings(*req.Blessings)
+	if err != nil {
+		return nil, fmt.Errorf("ipc: create new client blessings failed: %v", err)
+	}
+	c.Lock()
+	defer c.Unlock()
+	if cached, exists := c.m[req.Key]; exists {
+		// TODO(suharshs): Replace this reflect.DeepEqual() with a less expensive check.
+		if !reflect.DeepEqual(cached, recv) {
+			return nil, fmt.Errorf("client sent invalid Blessings")
+		}
+		stats.recordBlessingCache(true)
+		return cached, nil
+	}
+	c.m[req.Key] = recv
+	stats.recordBlessingCache(false)
+	return recv, nil
+}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 8475dea..5edc884 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -61,7 +61,9 @@
 
 	errRequestEncoding = verror.Register(pkgPath+".requestEncoding", verror.NoRetry, "failed to encode request {3}{:4}")
 
-	errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharge {3}{:4}")
+	errDischargeEncoding = verror.Register(pkgPath+".dischargeEncoding", verror.NoRetry, "failed to encode discharges {:3}")
+
+	errBlessingEncoding = verror.Register(pkgPath+".blessingEncoding", verror.NoRetry, "failed to encode blessing {3}{:4}")
 
 	errArgEncoding = verror.Register(pkgPath+".argEncoding", verror.NoRetry, "failed to encode arg #{3}{:4:}")
 
@@ -656,6 +658,8 @@
 	discharges []security.Discharge // discharges used for this request
 	dc         vc.DischargeClient   // client-global discharge-client
 
+	blessings security.Blessings // the local blessings for the current RPC.
+
 	sendClosedMu sync.Mutex
 	sendClosed   bool // is the send side already closed? GUARDED_BY(sendClosedMu)
 	finished     bool // has Finish() already been called?
@@ -703,25 +707,32 @@
 	if self := fc.flow.LocalBlessings(); self != nil && fc.dc != nil {
 		fc.discharges = fc.dc.PrepareDischarges(fc.ctx, self.ThirdPartyCaveats(), mkDischargeImpetus(fc.server, method, args))
 	}
+	// Encode the Blessings information for the client to authorize the flow.
+	var blessingsRequest ipc.BlessingsRequest
+	if fc.flow.LocalPrincipal() != nil {
+		localBlessings := fc.flow.LocalPrincipal().BlessingStore().ForPeer(fc.server...)
+		blessingsRequest = clientEncodeBlessings(fc.flow.VCDataCache(), localBlessings)
+	}
+	// TODO(suharshs, ataly): Make security.Discharge a vdl type.
+	anyDischarges := make([]vdlutil.Any, len(fc.discharges))
+	for i, d := range fc.discharges {
+		anyDischarges[i] = d
+	}
 	req := ipc.Request{
 		Suffix:           suffix,
 		Method:           method,
 		NumPosArgs:       uint64(len(args)),
 		Timeout:          int64(timeout),
 		GrantedBlessings: security.MarshalBlessings(blessings),
-		NumDischarges:    uint64(len(fc.discharges)),
+		Blessings:        blessingsRequest,
+		Discharges:       anyDischarges,
 		TraceRequest:     ivtrace.Request(fc.ctx),
 	}
 	if err := fc.enc.Encode(req); err != nil {
 		berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
 		return fc.close(berr)
 	}
-	for _, d := range fc.discharges {
-		if err := fc.enc.Encode(d); err != nil {
-			berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errDischargeEncoding, fc.ctx, d.ID(), err))
-			return fc.close(berr)
-		}
-	}
+
 	for ix, arg := range args {
 		if err := fc.enc.Encode(arg); err != nil {
 			berr := verror.Make(verror.BadProtocol, fc.ctx, verror.Make(errArgEncoding, fc.ctx, ix, err))
@@ -874,6 +885,9 @@
 			return fc.close(berr)
 		}
 	}
+	if fc.response.AckBlessings {
+		clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
+	}
 	// Incorporate any VTrace info that was returned.
 	ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
 	if fc.response.Error != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index bd95b94..6705ef0 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -28,6 +28,7 @@
 	"veyron.io/veyron/veyron2/vlog"
 
 	"veyron.io/veyron/veyron/lib/netstate"
+	"veyron.io/veyron/veyron/lib/stats"
 	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/lib/testutil"
 	tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
@@ -1589,6 +1590,107 @@
 	}
 }
 
+// TestBlessingsCache tests that the VCCache is used to sucessfully used to cache duplicate
+// calls blessings.
+func TestBlessingsCache(t *testing.T) {
+	var (
+		pserver = tsecurity.NewPrincipal("server")
+		pclient = tsecurity.NewPrincipal("client")
+	)
+	// Make the client recognize all server blessings
+	if err := pclient.AddToRoots(pserver.BlessingStore().Default()); err != nil {
+		t.Fatal(err)
+	}
+
+	ns := tnaming.NewSimpleNamespace()
+	rid, err := naming.NewRoutingID()
+	if err != nil {
+		t.Fatal(err)
+	}
+	runServer := func(principal security.Principal, rid naming.RoutingID) (ipc.Server, stream.Manager, naming.Endpoint) {
+		sm := imanager.InternalNew(rid)
+		server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{principal})
+		if err != nil {
+			t.Fatal(err)
+		}
+		ep, err := server.Listen(listenSpec)
+		if err != nil {
+			t.Fatal(err)
+		}
+		return server, sm, ep[0]
+	}
+
+	server, serverSM, serverEP := runServer(pserver, rid)
+	go server.Serve("mountpoint/testServer", &testServer{}, acceptAllAuthorizer{})
+	defer serverSM.Shutdown()
+
+	newClient := func() ipc.Client {
+		rid, err := naming.NewRoutingID()
+		if err != nil {
+			t.Fatal(err)
+		}
+		smc := imanager.InternalNew(rid)
+		defer smc.Shutdown()
+		client, err := InternalNewClient(smc, ns, vc.LocalPrincipal{pclient})
+		if err != nil {
+			t.Fatalf("failed to create client: %v", err)
+		}
+		return client
+	}
+
+	runClient := func(client ipc.Client) {
+		var call ipc.Call
+		if call, err = client.StartCall(testContext(), "/"+serverEP.String(), "Closure", nil); err != nil {
+			t.Fatalf("failed to StartCall: %v", err)
+		}
+		if err := call.Finish(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	cachePrefix := naming.Join("ipc", "server", "routing-id", rid.String(), "security", "blessings", "cache")
+	cacheHits, err := stats.GetStatsObject(naming.Join(cachePrefix, "hits"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	cacheAttempts, err := stats.GetStatsObject(naming.Join(cachePrefix, "attempts"))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Check that the blessings cache is not used on the first call.
+	clientA := newClient()
+	runClient(clientA)
+	if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 1 || gotHits != 0 {
+		t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(1), cacheHits(0)", gotAttempts, gotHits)
+	}
+	// Check that the cache is hit on the second call with the same blessings.
+	runClient(clientA)
+	if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 2 || gotHits != 1 {
+		t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(2), cacheHits(1)", gotAttempts, gotHits)
+	}
+	clientA.Close()
+	// Check that the cache is not used with a different client.
+	clientB := newClient()
+	runClient(clientB)
+	if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 3 || gotHits != 1 {
+		t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(3), cacheHits(1)", gotAttempts, gotHits)
+	}
+	// clientB changes its blessings, the cache should not be used.
+	blessings, err := pserver.Bless(pclient.PublicKey(), pserver.BlessingStore().Default(), "cav", mkCaveat(security.ExpiryCaveat(time.Now().Add(time.Hour))))
+	if err != nil {
+		t.Fatalf("failed to create Blessings: %v", err)
+	}
+	if _, err = pclient.BlessingStore().Set(blessings, "server"); err != nil {
+		t.Fatalf("failed to set blessings: %v", err)
+	}
+	runClient(clientB)
+	if gotAttempts, gotHits := cacheAttempts.Value().(int64), cacheHits.Value().(int64); gotAttempts != 4 || gotHits != 1 {
+		t.Errorf("got cacheAttempts(%v), cacheHits(%v), expected cacheAttempts(4), cacheHits(1)", gotAttempts, gotHits)
+	}
+	clientB.Close()
+}
+
 func init() {
 	testutil.Init()
 	vdlutil.Register(fakeTimeCaveat(0))
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 6fbdd4e..fbc32e7 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -17,6 +17,7 @@
 	"veyron.io/veyron/veyron2/options"
 	"veyron.io/veyron/veyron2/security"
 	"veyron.io/veyron/veyron2/services/security/access"
+	"veyron.io/veyron/veyron2/vdl"
 	old_verror "veyron.io/veyron/veyron2/verror"
 	verror "veyron.io/veyron/veyron2/verror2"
 	"veyron.io/veyron/veyron2/vlog"
@@ -794,13 +795,15 @@
 	flow   stream.Flow    // underlying flow
 
 	// Fields filled in during the server invocation.
-	blessings      security.Blessings
-	method, suffix string
-	tags           []interface{}
-	discharges     map[string]security.Discharge
-	starttime      time.Time
-	endStreamArgs  bool // are the stream args at EOF?
-	allowDebug     bool // true if the caller is permitted to view debug information.
+	clientBlessings security.Blessings
+	ackBlessings    bool
+	blessings       security.Blessings
+	method, suffix  string
+	tags            []interface{}
+	discharges      map[string]security.Discharge
+	starttime       time.Time
+	endStreamArgs   bool // are the stream args at EOF?
+	allowDebug      bool // true if the caller is permitted to view debug information.
 }
 
 var _ ipc.Stream = (*flowServer)(nil)
@@ -895,6 +898,7 @@
 		EndStreamResults: true,
 		NumPosResults:    uint64(len(results)),
 		TraceResponse:    traceResponse,
+		AckBlessings:     fs.ackBlessings,
 	}
 	if err := fs.enc.Encode(response); err != nil {
 		if err == io.EOF {
@@ -1080,13 +1084,27 @@
 	if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
 		return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
 	}
-	// Receive third party caveat discharges the client sent
-	for i := uint64(0); i < req.NumDischarges; i++ {
-		var d security.Discharge
-		if err := fs.dec.Decode(&d); err != nil {
-			return old_verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
+	fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats)
+	if err != nil {
+		// When the server can't access the blessings cache, the client is not following
+		// protocol, so the server closes the VCs corresponding to the client endpoint.
+		// TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+		// of all VCs connected to the RemoteEndpoint.
+		fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+		return old_verror.BadProtocolf("ipc: blessings cache failed: %v", err)
+	}
+	fs.ackBlessings = true
+
+	// TODO(suharshs, ataly): Make security.Discharge a vdl type.
+	for i, d := range req.Discharges {
+		if dis, ok := d.(security.Discharge); ok {
+			fs.discharges[dis.ID()] = dis
+			continue
 		}
-		fs.discharges[d.ID()] = d
+		if v, ok := d.(*vdl.Value); ok {
+			return old_verror.BadProtocolf("ipc: discharge #%d of type %s isn't registered", i, v.Type())
+		}
+		return old_verror.BadProtocolf("ipc: discharge #%d of type %T doesn't implement security.Discharge", i, d)
 	}
 	return nil
 }
@@ -1193,6 +1211,9 @@
 }
 func (fs *flowServer) RemoteBlessings() security.Blessings {
 	//nologcall
+	if fs.clientBlessings != nil {
+		return fs.clientBlessings
+	}
 	return fs.flow.RemoteBlessings()
 }
 func (fs *flowServer) Blessings() security.Blessings {
diff --git a/runtimes/google/ipc/stats.go b/runtimes/google/ipc/stats.go
index 334ce23..ea5441f 100644
--- a/runtimes/google/ipc/stats.go
+++ b/runtimes/google/ipc/stats.go
@@ -5,19 +5,25 @@
 	"time"
 
 	"veyron.io/veyron/veyron/lib/stats"
+	"veyron.io/veyron/veyron/lib/stats/counter"
 	"veyron.io/veyron/veyron/lib/stats/histogram"
 
 	"veyron.io/veyron/veyron2/naming"
 )
 
 type ipcStats struct {
-	mu      sync.RWMutex
-	prefix  string
-	methods map[string]*perMethodStats
+	mu                  sync.RWMutex
+	prefix              string
+	methods             map[string]*perMethodStats
+	blessingsCacheStats *blessingsCacheStats
 }
 
 func newIPCStats(prefix string) *ipcStats {
-	return &ipcStats{prefix: prefix, methods: make(map[string]*perMethodStats)}
+	return &ipcStats{
+		prefix:              prefix,
+		methods:             make(map[string]*perMethodStats),
+		blessingsCacheStats: newBlessingsCacheStats(prefix),
+	}
 }
 
 type perMethodStats struct {
@@ -41,6 +47,10 @@
 	m.latency.Add(int64(latency / time.Millisecond))
 }
 
+func (s *ipcStats) recordBlessingCache(hit bool) {
+	s.blessingsCacheStats.incr(hit)
+}
+
 // newPerMethodStats creates a new perMethodStats object if one doesn't exist
 // already. It returns the newly created object, or the already existing one.
 func (s *ipcStats) newPerMethodStats(method string) *perMethodStats {
@@ -61,3 +71,25 @@
 	}
 	return m
 }
+
+// blessingsCacheStats keeps blessing cache hits and total calls received to determine
+// how often the blessingCache is being used.
+type blessingsCacheStats struct {
+	callsReceived, cacheHits *counter.Counter
+}
+
+func newBlessingsCacheStats(prefix string) *blessingsCacheStats {
+	cachePrefix := naming.Join(prefix, "security", "blessings", "cache")
+	return &blessingsCacheStats{
+		callsReceived: stats.NewCounter(naming.Join(cachePrefix, "attempts")),
+		cacheHits:     stats.NewCounter(naming.Join(cachePrefix, "hits")),
+	}
+}
+
+// Incr increments the cache attempt counter and the cache hit counter if hit is true.
+func (s *blessingsCacheStats) incr(hit bool) {
+	s.callsReceived.Incr(1)
+	if hit {
+		s.cacheHits.Incr(1)
+	}
+}
diff --git a/runtimes/google/ipc/stream/vc/data_cache.go b/runtimes/google/ipc/stream/vc/data_cache.go
new file mode 100644
index 0000000..8b75ed0
--- /dev/null
+++ b/runtimes/google/ipc/stream/vc/data_cache.go
@@ -0,0 +1,40 @@
+package vc
+
+import (
+	"sync"
+)
+
+// dataCache is a thread-safe map for any two types.
+type dataCache struct {
+	sync.RWMutex
+	m map[interface{}]interface{}
+}
+
+func newDataCache() *dataCache {
+	return &dataCache{m: make(map[interface{}]interface{})}
+}
+
+// GetOrInsert first checks if the key exists in the cache with a reader lock.
+// If it doesn't exist, it instead acquires a writer lock, creates and stores the new value
+// with create and returns value.
+func (c *dataCache) GetOrInsert(key interface{}, create func() interface{}) interface{} {
+	// We use the read lock for the fastpath. This should be the more common case, so we rarely
+	// need a writer lock.
+	c.RLock()
+	value, exists := c.m[key]
+	c.RUnlock()
+	if exists {
+		return value
+	}
+	// We acquire the writer lock for the slowpath, and need to re-check if the key exists
+	// in the map, since other thread may have snuck in.
+	c.Lock()
+	defer c.Unlock()
+	value, exists = c.m[key]
+	if exists {
+		return value
+	}
+	value = create()
+	c.m[key] = value
+	return value
+}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index 958db51..8e32596 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -1,6 +1,7 @@
 package vc
 
 import (
+	"veyron.io/veyron/veyron2/ipc/stream"
 	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/security"
 )
@@ -10,6 +11,7 @@
 	*reader
 	*writer
 	localEndpoint, remoteEndpoint naming.Endpoint
+	dataCache                     *dataCache
 }
 
 type authN interface {
@@ -48,3 +50,9 @@
 	f.reader.Close()
 	f.writer.shutdown(false)
 }
+
+// VCDataCache returns the stream.VCDataCache object that allows information to be
+// shared across the Flow's parent VC.
+func (f *flow) VCDataCache() stream.VCDataCache {
+	return f.dataCache
+}
diff --git a/runtimes/google/ipc/stream/vc/listener_test.go b/runtimes/google/ipc/stream/vc/listener_test.go
index 098ff33..3f42607 100644
--- a/runtimes/google/ipc/stream/vc/listener_test.go
+++ b/runtimes/google/ipc/stream/vc/listener_test.go
@@ -3,6 +3,7 @@
 import (
 	"testing"
 
+	"veyron.io/veyron/veyron2/ipc/stream"
 	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/security"
 )
@@ -25,6 +26,7 @@
 func (*noopFlow) RemoteBlessings() security.Blessings             { return nil }
 func (*noopFlow) RemoteDischarges() map[string]security.Discharge { return nil }
 func (*noopFlow) SetDeadline(<-chan struct{})                     {}
+func (*noopFlow) VCDataCache() stream.VCDataCache                 { return nil }
 
 func TestListener(t *testing.T) {
 	ln := newListener()
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index 38d8db6..b522f9e 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -61,8 +61,9 @@
 	crypter             crypto.Crypter
 	closeReason         string // reason why the VC was closed
 
-	helper  Helper
-	version version.IPCVersion
+	helper    Helper
+	version   version.IPCVersion
+	dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
 }
 
 // NoDischarges specifies that the RPC call should not fetch discharges.
@@ -164,6 +165,7 @@
 		crypter:        crypto.NewNullCrypter(),
 		helper:         p.Helper,
 		version:        p.Version,
+		dataCache:      newDataCache(),
 	}
 }
 
@@ -183,6 +185,7 @@
 		writer:         writer,
 		localEndpoint:  vc.localEP,
 		remoteEndpoint: vc.remoteEP,
+		dataCache:      vc.dataCache,
 	}
 	vc.mu.Lock()
 	if vc.flowMap != nil {
@@ -286,6 +289,7 @@
 		writer:         writer,
 		localEndpoint:  vc.localEP,
 		remoteEndpoint: vc.remoteEP,
+		dataCache:      vc.dataCache,
 	}
 	if err = vc.listener.Enqueue(f); err != nil {
 		f.Shutdown()