veyron/runtimes/google/ipc: use discharges for third-party caveat authorization

Change-Id: I385e60c5f7115a2a3472481a098335b8c2c8cd85
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 49b3915..0aa2458 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -40,6 +40,8 @@
 	vcMapMu sync.Mutex
 	// TODO(ashankar): Additionally, should vcMap be keyed with other options also?
 	vcMap map[string]*vcInfo // map from endpoint.String() to vc info
+
+	dischargeCache dischargeCache
 }
 
 type vcInfo struct {
@@ -50,10 +52,11 @@
 
 func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
 	c := &client{
-		streamMgr:   streamMgr,
-		ns:          ns,
-		vcMap:       make(map[string]*vcInfo),
-		callTimeout: defaultCallTimeout,
+		streamMgr:      streamMgr,
+		ns:             ns,
+		vcMap:          make(map[string]*vcInfo),
+		callTimeout:    defaultCallTimeout,
+		dischargeCache: dischargeCache{CaveatDischargeMap: make(security.CaveatDischargeMap)},
 	}
 	for _, opt := range opts {
 		// Collect all client opts that are also vc opts.
@@ -152,8 +155,11 @@
 			flow.Close()
 			continue
 		}
+
+		discharges := c.prepareDischarges(ctx, flow.LocalID(), flow.RemoteID(), method, opts)
+
 		lastErr = nil
-		fc := newFlowClient(flow)
+		fc := newFlowClient(flow, &c.dischargeCache, discharges)
 		if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil {
 			return nil, verr
 		}
@@ -172,7 +178,7 @@
 	if server == nil {
 		return nil, fmt.Errorf("server identity cannot be nil")
 	}
-	// TODO(ataly): Fetch third-party discharges from the server.
+	// TODO(ataly,andreser): Check the third-party discharges the server presents
 	// TODO(ataly): What should the label be for the context? Typically the label is the security.Label
 	// of the method but we don't have that information here at the client.
 	authID, err := server.Authorize(isecurity.NewContext(isecurity.ContextArgs{
@@ -236,16 +242,21 @@
 	flow     stream.Flow  // the underlying flow
 	response ipc.Response // each decoded response message is kept here
 
+	discharges     []security.ThirdPartyDischarge // discharges used for this request
+	dischargeCache *dischargeCache                // client-global discharge cache reference type
+
 	sendClosedMu sync.Mutex
 	sendClosed   bool // is the send side already closed? GUARDED_BY(sendClosedMu)
 }
 
-func newFlowClient(flow stream.Flow) *flowClient {
+func newFlowClient(flow stream.Flow, dischargeCache *dischargeCache, discharges []security.ThirdPartyDischarge) *flowClient {
 	return &flowClient{
 		// TODO(toddw): Support different codecs
-		dec:  vom.NewDecoder(flow),
-		enc:  vom.NewEncoder(flow),
-		flow: flow,
+		dec:            vom.NewDecoder(flow),
+		enc:            vom.NewEncoder(flow),
+		flow:           flow,
+		discharges:     discharges,
+		dischargeCache: dischargeCache,
 	}
 }
 
@@ -258,11 +269,12 @@
 
 func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E {
 	req := ipc.Request{
-		Suffix:      suffix,
-		Method:      method,
-		NumPosArgs:  uint64(len(args)),
-		Timeout:     int64(timeout),
-		HasBlessing: blessing != nil,
+		Suffix:        suffix,
+		Method:        method,
+		NumPosArgs:    uint64(len(args)),
+		Timeout:       int64(timeout),
+		HasBlessing:   blessing != nil,
+		NumDischarges: uint64(len(fc.discharges)),
 	}
 	if err := fc.enc.Encode(req); err != nil {
 		return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
@@ -272,6 +284,11 @@
 			return fc.close(verror.BadProtocolf("ipc: blessing encoding failed: %v", err))
 		}
 	}
+	for _, d := range fc.discharges {
+		if err := fc.enc.Encode(d); err != nil {
+			return fc.close(verror.BadProtocolf("ipc: failed to encode discharge for %x: %v", d.CaveatID(), err))
+		}
+	}
 	for ix, arg := range args {
 		if err := fc.enc.Encode(arg); err != nil {
 			return fc.close(verror.BadProtocolf("ipc: arg %d encoding failed: %v", ix, err))
@@ -374,6 +391,14 @@
 		}
 	}
 	if fc.response.Error != nil {
+		if verror.Is(fc.response.Error, verror.NotAuthorized) {
+			// In case the error was caused by a bad discharge, we do not want to get stuck
+			// with retrying again and again with this discharge. As there is no direct way
+			// to detect it, we conservatively flush all discharges we used from the cache.
+			// TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
+			vlog.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
+			fc.dischargeCache.Invalidate(fc.discharges...)
+		}
 		return fc.close(verror.ConvertWithDefault(verror.Internal, fc.response.Error))
 	}
 	if got, want := fc.response.NumPosResults, uint64(len(resultptrs)); got != want {