veyron/runtimes/google/ipc: use discharges for third-party caveat authorization
Change-Id: I385e60c5f7115a2a3472481a098335b8c2c8cd85
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 49b3915..0aa2458 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -40,6 +40,8 @@
vcMapMu sync.Mutex
// TODO(ashankar): Additionally, should vcMap be keyed with other options also?
vcMap map[string]*vcInfo // map from endpoint.String() to vc info
+
+ dischargeCache dischargeCache
}
type vcInfo struct {
@@ -50,10 +52,11 @@
func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
c := &client{
- streamMgr: streamMgr,
- ns: ns,
- vcMap: make(map[string]*vcInfo),
- callTimeout: defaultCallTimeout,
+ streamMgr: streamMgr,
+ ns: ns,
+ vcMap: make(map[string]*vcInfo),
+ callTimeout: defaultCallTimeout,
+ dischargeCache: dischargeCache{CaveatDischargeMap: make(security.CaveatDischargeMap)},
}
for _, opt := range opts {
// Collect all client opts that are also vc opts.
@@ -152,8 +155,11 @@
flow.Close()
continue
}
+
+ discharges := c.prepareDischarges(ctx, flow.LocalID(), flow.RemoteID(), method, opts)
+
lastErr = nil
- fc := newFlowClient(flow)
+ fc := newFlowClient(flow, &c.dischargeCache, discharges)
if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil {
return nil, verr
}
@@ -172,7 +178,7 @@
if server == nil {
return nil, fmt.Errorf("server identity cannot be nil")
}
- // TODO(ataly): Fetch third-party discharges from the server.
+ // TODO(ataly,andreser): Check the third-party discharges the server presents
// TODO(ataly): What should the label be for the context? Typically the label is the security.Label
// of the method but we don't have that information here at the client.
authID, err := server.Authorize(isecurity.NewContext(isecurity.ContextArgs{
@@ -236,16 +242,21 @@
flow stream.Flow // the underlying flow
response ipc.Response // each decoded response message is kept here
+ discharges []security.ThirdPartyDischarge // discharges used for this request
+ dischargeCache *dischargeCache // client-global discharge cache reference type
+
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
}
-func newFlowClient(flow stream.Flow) *flowClient {
+func newFlowClient(flow stream.Flow, dischargeCache *dischargeCache, discharges []security.ThirdPartyDischarge) *flowClient {
return &flowClient{
// TODO(toddw): Support different codecs
- dec: vom.NewDecoder(flow),
- enc: vom.NewEncoder(flow),
- flow: flow,
+ dec: vom.NewDecoder(flow),
+ enc: vom.NewEncoder(flow),
+ flow: flow,
+ discharges: discharges,
+ dischargeCache: dischargeCache,
}
}
@@ -258,11 +269,12 @@
func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E {
req := ipc.Request{
- Suffix: suffix,
- Method: method,
- NumPosArgs: uint64(len(args)),
- Timeout: int64(timeout),
- HasBlessing: blessing != nil,
+ Suffix: suffix,
+ Method: method,
+ NumPosArgs: uint64(len(args)),
+ Timeout: int64(timeout),
+ HasBlessing: blessing != nil,
+ NumDischarges: uint64(len(fc.discharges)),
}
if err := fc.enc.Encode(req); err != nil {
return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
@@ -272,6 +284,11 @@
return fc.close(verror.BadProtocolf("ipc: blessing encoding failed: %v", err))
}
}
+ for _, d := range fc.discharges {
+ if err := fc.enc.Encode(d); err != nil {
+ return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.CaveatID(), err))
+ }
+ }
for ix, arg := range args {
if err := fc.enc.Encode(arg); err != nil {
return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
@@ -374,6 +391,14 @@
}
}
if fc.response.Error != nil {
+ if verror.Is(fc.response.Error, verror.NotAuthorized) {
+ // In case the error was caused by a bad discharge, we do not want to get stuck
+ // with retrying again and again with this discharge. As there is no direct way
+ // to detect it, we conservatively flush all discharges we used from the cache.
+ // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
+ vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
+ fc.dischargeCache.Invalidate(fc.discharges...)
+ }
return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
}
if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {