blob: c79cf8ff02fab3d74ac80f979a8a4bdf07ea2584 [file] [log] [blame]
package ipc
import (
"sync"
"veyron2"
"veyron2/context"
"veyron2/ipc"
"veyron2/security"
"veyron2/vdl/vdlutil"
"veyron2/vlog"
)
// prepareDischarges retrieves the caveat discharges required for using blessing
// at server. The discharges are either found in the client cache, in the call
// options, or requested from the discharge issuer indicated on the caveat.
// Note that requesting a discharge is an ipc call, so one copy of this
// function must be able to successfully terminate while another is blocked.
func (c *client) prepareDischarges(ctx context.T, blessing, server security.PublicID,
method string, opts []ipc.CallOpt) (ret []security.ThirdPartyDischarge) {
// TODO(andreser,ataly): figure out whether this should return an error and how that should be handled
// Missing discharges do not necessarily mean the blessing is invalid (e.g., SetID)
if blessing == nil {
return
}
var caveats []security.ThirdPartyCaveat
for _, cav := range blessing.ThirdPartyCaveats() {
if server.Match(cav.Service) {
caveats = append(caveats, cav.Caveat.(security.ThirdPartyCaveat))
}
}
if len(caveats) == 0 {
return
}
discharges := make([]security.ThirdPartyDischarge, len(caveats))
dischargesFromOpts(caveats, opts, discharges)
c.dischargeCache.Discharges(caveats, discharges)
if shouldFetchDischarges(opts) {
c.fetchDischarges(ctx, caveats, opts, discharges)
}
for _, d := range discharges {
if d != nil {
ret = append(ret, d)
}
}
return
}
// dischargeCache is a concurrency-safe cache for third party caveat discharges.
type dischargeCache struct {
sync.RWMutex
security.CaveatDischargeMap // GUARDED_BY(RWMutex)
}
// Add inserts the argument to the cache, possibly overwriting previous
// discharges for the same caveat.
func (dcc *dischargeCache) Add(discharges ...security.ThirdPartyDischarge) {
dcc.Lock()
for _, d := range discharges {
dcc.CaveatDischargeMap[d.CaveatID()] = d
}
dcc.Unlock()
}
// Invalidate removes discharges from the cache.
func (dcc *dischargeCache) Invalidate(discharges ...security.ThirdPartyDischarge) {
dcc.Lock()
for _, d := range discharges {
if dcc.CaveatDischargeMap[d.CaveatID()] == d {
delete(dcc.CaveatDischargeMap, d.CaveatID())
}
}
dcc.Unlock()
}
// Discharges takes a slice of caveats and a slice of discharges of the same
// length and fills in nil entries in the discharges slice with discharges
// from the cache (if there are any).
// REQUIRES: len(caveats) == len(out)
func (dcc *dischargeCache) Discharges(caveats []security.ThirdPartyCaveat, out []security.ThirdPartyDischarge) {
dcc.Lock()
for i, d := range out {
if d != nil {
continue
}
out[i] = dcc.CaveatDischargeMap[caveats[i].ID()]
}
dcc.Unlock()
}
// dischargesFromOpts fills in the nils in the out argument with discharges in
// opts that match the caveat at the same index in caveats.
// REQUIRES: len(caveats) == len(out)
func dischargesFromOpts(caveats []security.ThirdPartyCaveat, opts []ipc.CallOpt,
out []security.ThirdPartyDischarge) {
for _, opt := range opts {
d, ok := opt.(veyron2.DischargeOpt)
if !ok {
continue
}
for i, cav := range caveats {
if out[i] == nil && d.CaveatID() == cav.ID() {
out[i] = d
}
}
}
}
// fetchDischarges fills in out by fetching discharges for caveats from the
// appropriate discharge service. Since there may be dependencies in the
// caveats, fetchDischarges keeps retrying until either all discharges can be
// fetched or no new discharges are fetched.
// REQUIRES: len(caveats) == len(out)
func (c *client) fetchDischarges(ctx context.T, caveats []security.ThirdPartyCaveat, opts []ipc.CallOpt, out []security.ThirdPartyDischarge) {
opts = append([]ipc.CallOpt{dontFetchDischarges{}}, opts...)
var wg sync.WaitGroup
for {
type fetched struct {
idx int
discharge security.ThirdPartyDischarge
}
discharges := make(chan fetched, len(caveats))
for i := range caveats {
if out[i] != nil {
continue
}
wg.Add(1)
go func(i int, cav security.ThirdPartyCaveat) {
defer wg.Done()
vlog.VI(3).Infof("Fetching discharge for %T from %v", cav.ID(), cav, cav.Location())
call, err := c.StartCall(ctx, cav.Location(), "Discharge", []interface{}{cav}, opts...)
if err != nil {
vlog.VI(3).Infof("Discharge fetch for caveat %T from %v failed: %v", cav, cav.Location(), err)
return
}
var dAny vdlutil.Any
// TODO(ashankar): Retry on errors like no-route-to-service, name resolution failures etc.
ierr := call.Finish(&dAny, &err)
if ierr != nil || err != nil {
vlog.VI(3).Infof("Discharge fetch for caveat %T from %v failed: (%v, %v)", cav, cav.Location(), err, ierr)
return
}
d, ok := dAny.(security.ThirdPartyDischarge)
if !ok {
vlog.Errorf("fetchDischarges: server at %s sent a %T (%v) instead of a ThirdPartyDischarge", cav.Location(), dAny, dAny)
}
discharges <- fetched{i, d}
}(i, caveats[i])
}
wg.Wait()
close(discharges)
var got int
for fetched := range discharges {
c.dischargeCache.Add(fetched.discharge)
out[fetched.idx] = fetched.discharge
got++
}
vlog.VI(2).Infof("fetchDischarges: got %d discharges", got)
if got == 0 {
return
}
}
}
// dontFetchDischares is an ipc.CallOpt that indicates that no extre ipc-s
// should be done to fetch discharges for the call with this opt.
// Discharges in the cache and in the call options are still used.
type dontFetchDischarges struct{}
func (dontFetchDischarges) IPCCallOpt() {}
func shouldFetchDischarges(opts []ipc.CallOpt) bool {
for _, opt := range opts {
if _, ok := opt.(dontFetchDischarges); ok {
return false
}
}
return true
}