veyron/runtimes/google/ipc: VC had a general purpose cache for data
that can be shared amongst Flows on a VC. Blessings use this cache in ipc.Server
and ipc.Client.
TODO in the future (as needed): Add stats for each individual VC cache.
This is described more in: https://docs.google.com/document/d/1jHDDGHUxKcQNJBHwdkyLwZVzR9fm_ctzds-6_TeRL2Y/edit.
Change-Id: Iecbadbc6f86e50675c7b5870e54bd81b2d852bfa
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 6fbdd4e..fbc32e7 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -17,6 +17,7 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/services/security/access"
+ "veyron.io/veyron/veyron2/vdl"
old_verror "veyron.io/veyron/veyron2/verror"
verror "veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
@@ -794,13 +795,15 @@
flow stream.Flow // underlying flow
// Fields filled in during the server invocation.
- blessings security.Blessings
- method, suffix string
- tags []interface{}
- discharges map[string]security.Discharge
- starttime time.Time
- endStreamArgs bool // are the stream args at EOF?
- allowDebug bool // true if the caller is permitted to view debug information.
+ clientBlessings security.Blessings
+ ackBlessings bool
+ blessings security.Blessings
+ method, suffix string
+ tags []interface{}
+ discharges map[string]security.Discharge
+ starttime time.Time
+ endStreamArgs bool // are the stream args at EOF?
+ allowDebug bool // true if the caller is permitted to view debug information.
}
var _ ipc.Stream = (*flowServer)(nil)
@@ -895,6 +898,7 @@
EndStreamResults: true,
NumPosResults: uint64(len(results)),
TraceResponse: traceResponse,
+ AckBlessings: fs.ackBlessings,
}
if err := fs.enc.Encode(response); err != nil {
if err == io.EOF {
@@ -1080,13 +1084,27 @@
if blessings != nil && !reflect.DeepEqual(blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey()) {
return old_verror.NoAccessf("ipc: blessing granted not bound to this server(%v vs %v)", blessings.PublicKey(), fs.flow.LocalPrincipal().PublicKey())
}
- // Receive third party caveat discharges the client sent
- for i := uint64(0); i < req.NumDischarges; i++ {
- var d security.Discharge
- if err := fs.dec.Decode(&d); err != nil {
- return old_verror.BadProtocolf("ipc: decoding discharge %d of %d failed: %v", i, req.NumDischarges, err)
+ fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats)
+ if err != nil {
+ // When the server can't access the blessings cache, the client is not following
+ // protocol, so the server closes the VCs corresponding to the client endpoint.
+ // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+ // of all VCs connected to the RemoteEndpoint.
+ fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+ return old_verror.BadProtocolf("ipc: blessings cache failed: %v", err)
+ }
+ fs.ackBlessings = true
+
+ // TODO(suharshs, ataly): Make security.Discharge a vdl type.
+ for i, d := range req.Discharges {
+ if dis, ok := d.(security.Discharge); ok {
+ fs.discharges[dis.ID()] = dis
+ continue
}
- fs.discharges[d.ID()] = d
+ if v, ok := d.(*vdl.Value); ok {
+ return old_verror.BadProtocolf("ipc: discharge #%d of type %s isn't registered", i, v.Type())
+ }
+ return old_verror.BadProtocolf("ipc: discharge #%d of type %T doesn't implement security.Discharge", i, d)
}
return nil
}
@@ -1193,6 +1211,9 @@
}
func (fs *flowServer) RemoteBlessings() security.Blessings {
//nologcall
+ if fs.clientBlessings != nil {
+ return fs.clientBlessings
+ }
return fs.flow.RemoteBlessings()
}
func (fs *flowServer) Blessings() security.Blessings {