veyron/runtimes/google/ipc: VC had a general purpose cache for data
that can be shared amongst Flows on a VC. Blessings use this cache in ipc.Server
and ipc.Client.

TODO in the future (as needed): Add stats for each individual VC cache.

This is described more in: https://docs.google.com/document/d/1jHDDGHUxKcQNJBHwdkyLwZVzR9fm_ctzds-6_TeRL2Y/edit.

Change-Id: Iecbadbc6f86e50675c7b5870e54bd81b2d852bfa
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 6fbdd4e..fbc32e7 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -17,6 +17,7 @@
 	"veyron.io/veyron/veyron2/options"
 	"veyron.io/veyron/veyron2/security"
 	"veyron.io/veyron/veyron2/services/security/access"
+	"veyron.io/veyron/veyron2/vdl"
 	old_verror "veyron.io/veyron/veyron2/verror"
 	verror "veyron.io/veyron/veyron2/verror2"
 	"veyron.io/veyron/veyron2/vlog"
@@ -794,13 +795,15 @@
 	flow   stream.Flow    // underlying flow
 
 	// Fields filled in during the server invocation.
-	blessings      security.Blessings
-	method, suffix string
-	tags           []interface{}
-	discharges     map[string]security.Discharge
-	starttime      time.Time
-	endStreamArgs  bool // are the stream args at EOF?
-	allowDebug     bool // true if the caller is permitted to view debug information.
+	clientBlessings security.Blessings
+	ackBlessings    bool
+	blessings       security.Blessings
+	method, suffix  string
+	tags            []interface{}
+	discharges      map[string]security.Discharge
+	starttime       time.Time
+	endStreamArgs   bool // are the stream args at EOF?
+	allowDebug      bool // true if the caller is permitted to view debug information.
 }
 
 var _ ipc.Stream = (*flowServer)(nil)
@@ -895,6 +898,7 @@
 		EndStreamResults: true,
 		NumPosResults:    uint64(len(results)),
 		TraceResponse:    traceResponse,
+		AckBlessings:     fs.ackBlessings,
 	}
 	if err := fs.enc.Encode(response); err != nil {
 		if err == io.EOF {
@@ -1080,13 +1084,27 @@
 	if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
 		return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
 	}
-	// Receive third party caveat discharges the client sent
-	for i := uint64(0); i < req.NumDischarges; i++ {
-		var d security.Discharge
-		if err := fs.dec.Decode(&d); err != nil {
-			return old_verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
+	fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats)
+	if err != nil {
+		// When the server can't access the blessings cache, the client is not following
+		// protocol, so the server closes the VCs corresponding to the client endpoint.
+		// TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+		// of all VCs connected to the RemoteEndpoint.
+		fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+		return old_verror.BadProtocolf("ipc: blessings cache failed: %v", err)
+	}
+	fs.ackBlessings = true
+
+	// TODO(suharshs, ataly): Make security.Discharge a vdl type.
+	for i, d := range req.Discharges {
+		if dis, ok := d.(security.Discharge); ok {
+			fs.discharges[dis.ID()] = dis
+			continue
 		}
-		fs.discharges[d.ID()] = d
+		if v, ok := d.(*vdl.Value); ok {
+			return old_verror.BadProtocolf("ipc: discharge #%d of type %s isn't registered", i, v.Type())
+		}
+		return old_verror.BadProtocolf("ipc: discharge #%d of type %T doesn't implement security.Discharge", i, d)
 	}
 	return nil
 }
@@ -1193,6 +1211,9 @@
 }
 func (fs *flowServer) RemoteBlessings() security.Blessings {
 	//nologcall
+	if fs.clientBlessings != nil {
+		return fs.clientBlessings
+	}
 	return fs.flow.RemoteBlessings()
 }
 func (fs *flowServer) Blessings() security.Blessings {