Merge "rpc/x{server,client}: Implement the rest of security for xserver/xclient."
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index c72439d..846ac32 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -54,6 +54,7 @@
errFailedToParseIP = reg(".errFailedToParseIP", "failed to parse {3} as an IP host")
errUnexpectedSuffix = reg(".errUnexpectedSuffix", "suffix {3} was not expected because either server has the option IsLeaf set to true or it served an object and not a dispatcher")
errNoListeners = reg(".errNoListeners", "failed to ceate any listeners{:3}")
+ errBlessingsNotBound = reg(".errBlessingNotBound", "blessing granted not bound to this server({3} vs {4})")
)
type DeprecatedServer interface {
@@ -1327,7 +1328,7 @@
// this - should servers be able to assume that a blessing is something that
// does not have the authorizations that the server's own identity has?
if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
- return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
+ return verror.New(verror.ErrNoAccess, fs.ctx, verror.New(errBlessingsNotBound, fs.ctx, got, want))
}
fs.grantedBlessings = req.GrantedBlessings
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 87d749f..7e6c618 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -24,13 +24,17 @@
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/v23/vtrace"
+
"v.io/x/ref/lib/apilog"
+ slib "v.io/x/ref/lib/security"
inaming "v.io/x/ref/runtime/internal/naming"
)
const (
dataFlow = 'd'
typeFlow = 't'
+
+ dischargeBuffer = time.Minute
)
type xclient struct {
@@ -138,7 +142,7 @@
// authorizer, both during creation of the VC underlying the flow and the
// flow itself.
// TODO(cnicolaou): implement real, configurable load balancing.
-func (c *xclient) tryCreateFlow(ctx *context.T, index int, name, server, method string, auth security.Authorizer, ch chan<- *xserverStatus) {
+func (c *xclient) tryCreateFlow(ctx *context.T, index int, name, server, method string, args []interface{}, auth security.Authorizer, ch chan<- *xserverStatus) {
defer c.wg.Done()
status := &xserverStatus{index: index, server: server}
var span vtrace.Span
@@ -168,7 +172,7 @@
status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
return
}
- bfp := blessingsForPeer{auth, method, suffix}.run
+ bfp := blessingsForPeer{auth, method, suffix, args}.run
if status.flow, err = c.flowMgr.Dial(ctx, ep, bfp); err != nil {
ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
status.serverErr = suberr(err)
@@ -195,6 +199,7 @@
auth security.Authorizer
method string
suffix string
+ args []interface{}
}
func (x blessingsForPeer) run(
@@ -212,7 +217,6 @@
RemoteBlessings: remoteBlessings,
RemoteDischarges: remoteDischarges,
RemoteEndpoint: remoteEP,
- // TODO(toddw): MethodTags, LocalDischarges
})
if err := x.auth.Authorize(ctx, call); err != nil {
return security.Blessings{}, nil, verror.New(errServerAuthorizeFailed, ctx, call.RemoteBlessings(), err)
@@ -224,8 +228,11 @@
// send the <nil> blessings to the server.
return security.Blessings{}, nil, verror.New(errNoBlessingsForPeer, ctx, serverB, serverBRejected)
}
- // TODO(toddw): Return discharge map.
- return clientB, nil, nil
+ impetus, err := mkDischargeImpetus(serverB, x.method, x.args)
+ if err != nil {
+ return security.Blessings{}, nil, err
+ }
+ return clientB, slib.PrepareDischarges(ctx, clientB, impetus, dischargeBuffer), nil
}
// tryCall makes a single attempt at a call. It may connect to multiple servers
@@ -280,7 +287,7 @@
c.wg.Add(1)
c.mu.Unlock()
- go c.tryCreateFlow(ctx, i, name, server, method, authorizer, ch)
+ go c.tryCreateFlow(ctx, i, name, server, method, args, authorizer, ch)
}
for {
@@ -356,7 +363,7 @@
*/
deadline, _ := ctx.Deadline()
- if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
+ if verr := fc.start(r.suffix, method, args, deadline, opts); verr != nil {
return nil, verror.NoRetry, false, verr
}
return fc, verror.NoRetry, false, nil
@@ -450,13 +457,12 @@
// flowXClient implements the RPC client-side protocol for a single RPC, over a
// flow that's already connected to the server.
type flowXClient struct {
- ctx *context.T // context to annotate with call details
- flow flow.Flow // the underlying flow
- dec *vom.Decoder // to decode responses and results from the server
- enc *vom.Encoder // to encode requests and args to the server
- response rpc.Response // each decoded response message is kept here
-
- grantedBlessings security.Blessings // the blessings granted to the server.
+ ctx *context.T // context to annotate with call details
+ flow flow.Flow // the underlying flow
+ dec *vom.Decoder // to decode responses and results from the server
+ enc *vom.Encoder // to encode requests and args to the server
+ response rpc.Response // each decoded response message is kept here
+ remoteBNames []string
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
@@ -526,15 +532,19 @@
return err
}
-func (fc *flowXClient) start(suffix, method string, args []interface{}, deadline time.Time) error {
+func (fc *flowXClient) start(suffix, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) error {
+ grantedB, err := fc.initSecurity(fc.ctx, method, suffix, opts)
+ if err != nil {
+ fc.close(err)
+ }
req := rpc.Request{
- Suffix: suffix,
- Method: method,
- NumPosArgs: uint64(len(args)),
- Deadline: vtime.Deadline{Time: deadline},
- // TODO(toddw): Handle GrantedBlessings.
- TraceRequest: vtrace.GetRequest(fc.ctx),
- Language: string(i18n.GetLangID(fc.ctx)),
+ Suffix: suffix,
+ Method: method,
+ NumPosArgs: uint64(len(args)),
+ Deadline: vtime.Deadline{Time: deadline},
+ GrantedBlessings: grantedB,
+ TraceRequest: vtrace.GetRequest(fc.ctx),
+ Language: string(i18n.GetLangID(fc.ctx)),
}
if err := fc.enc.Encode(req); err != nil {
berr := verror.New(verror.ErrBadProtocol, fc.ctx, verror.New(errRequestEncoding, fc.ctx, fmt.Sprintf("%#v", req), err))
@@ -549,6 +559,35 @@
return nil
}
+func (fc *flowXClient) initSecurity(ctx *context.T, method, suffix string, opts []rpc.CallOpt) (security.Blessings, error) {
+ call := security.NewCall(&security.CallParams{
+ LocalPrincipal: v23.GetPrincipal(ctx),
+ LocalBlessings: fc.flow.LocalBlessings(),
+ RemoteBlessings: fc.flow.RemoteBlessings(),
+ LocalEndpoint: fc.flow.Conn().LocalEndpoint(),
+ RemoteEndpoint: fc.flow.Conn().RemoteEndpoint(),
+ LocalDischarges: fc.flow.LocalDischarges(),
+ RemoteDischarges: fc.flow.RemoteDischarges(),
+ Method: method,
+ Suffix: suffix,
+ })
+ // TODO(suharshs): Its unfortunate that we compute these here and also in the
+ // blessingsForPeer.run function. Find a way to only do this once.
+ fc.remoteBNames, _ = security.RemoteBlessingNames(ctx, call)
+ var grantedB security.Blessings
+ for _, o := range opts {
+ switch v := o.(type) {
+ case rpc.Granter:
+ if b, err := v.Grant(ctx, call); err != nil {
+ return grantedB, verror.New(errBlessingGrant, fc.ctx, err)
+ } else if grantedB, err = security.UnionOfBlessings(grantedB, b); err != nil {
+ return grantedB, verror.New(errBlessingAdd, fc.ctx, err)
+ }
+ }
+ }
+ return grantedB, nil
+}
+
func (fc *flowXClient) Send(item interface{}) error {
defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if fc.sendClosed {
@@ -676,19 +715,19 @@
vtrace.GetStore(fc.ctx).Merge(fc.response.TraceResponse)
if fc.response.Error != nil {
id := verror.ErrorID(fc.response.Error)
- /*
- TODO(toddw): We need to invalidate discharges somehow; there's a method
- on the BlessingStore to do this.
-
- if id == verror.ErrNoAccess.ID && fc.dc != nil {
- // In case the error was caused by a bad discharge, we do not want to get stuck
- // with retrying again and again with this discharge. As there is no direct way
- // to detect it, we conservatively flush all discharges we used from the cache.
- // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
- fc.ctx.VI(3).Infof("Discarding %d discharges as RPC failed with %v", len(fc.discharges), fc.response.Error)
- fc.dc.Invalidate(fc.ctx, fc.discharges...)
- }
- */
+ if id == verror.ErrNoAccess.ID {
+ // In case the error was caused by a bad discharge, we do not want to get stuck
+ // with retrying again and again with this discharge. As there is no direct way
+ // to detect it, we conservatively flush all discharges we used from the cache.
+ // TODO(ataly,andreser): add verror.BadDischarge and handle it explicitly?
+ l := len(fc.flow.LocalDischarges())
+ dis := make([]security.Discharge, 0, l)
+ for _, d := range fc.flow.LocalDischarges() {
+ dis = append(dis, d)
+ }
+ fc.ctx.VI(3).Infof("Discarding %d discharges as RPC failed with %v", l, fc.response.Error)
+ v23.GetPrincipal(fc.ctx).BlessingStore().ClearDischarges(dis...)
+ }
if id == errBadNumInputArgs.ID || id == errBadInputArg.ID {
return fc.close(verror.New(verror.ErrBadProtocol, fc.ctx, fc.response.Error))
}
@@ -710,5 +749,5 @@
func (fc *flowXClient) RemoteBlessings() ([]string, security.Blessings) {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- return nil /*TODO(toddw)*/, fc.flow.RemoteBlessings()
+ return fc.remoteBNames, fc.flow.RemoteBlessings()
}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 110f359..0d026b9 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -8,6 +8,7 @@
"fmt"
"io"
"net"
+ "reflect"
"strings"
"sync"
"time"
@@ -523,8 +524,6 @@
flow flow.Flow // underlying flow
// Fields filled in during the server invocation.
- clientBlessings security.Blessings
- ackBlessings bool
grantedBlessings security.Blessings
method, suffix string
tags []*vdl.Value
@@ -593,7 +592,6 @@
EndStreamResults: true,
NumPosResults: uint64(len(results)),
TraceResponse: traceResponse,
- AckBlessings: fs.ackBlessings,
}
if err := fs.enc.Encode(response); err != nil {
if err == io.EOF {
@@ -682,8 +680,7 @@
// TODO(toddw): Explicitly cancel the context when the flow is done.
_ = cancel
- // Initialize security: blessings, discharges, etc.
- if err := fs.initSecurity(req); err != nil {
+ if err := fs.readGrantedBlessings(req); err != nil {
fs.drainDecoderArgs(int(req.NumPosArgs))
return nil, err
}
@@ -775,48 +772,22 @@
return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
}
-func (fs *xflowServer) initSecurity(req *rpc.Request) error {
- // TODO(toddw): Do something with this.
- /*
- // LocalPrincipal is nil which means we are operating under
- // SecurityNone.
- if fs.LocalPrincipal() == nil {
- return nil
- }
-
- // If additional credentials are provided, make them available in the context
- // Detect unusable blessings now, rather then discovering they are unusable on
- // first use.
- //
- // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
- // the server's identity as the blessing. Figure out what we want to do about
- // this - should servers be able to assume that a blessing is something that
- // does not have the authorizations that the server's own identity has?
- if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
- return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
- }
- fs.grantedBlessings = req.GrantedBlessings
-
- var err error
- if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
- // When the server can't access the blessings cache, the client is not following
- // protocol, so the server closes the VCs corresponding to the client endpoint.
- // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
- // of all VCs connected to the RemoteEndpoint.
- fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
- return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
- }
- // Verify that the blessings sent by the client in the request have the same public
- // key as those sent by the client during VC establishment.
- if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
- return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
- }
- fs.ackBlessings = true
-
- for _, d := range req.Discharges {
- fs.discharges[d.ID()] = d
- }
- */
+func (fs *xflowServer) readGrantedBlessings(req *rpc.Request) error {
+ if req.GrantedBlessings.IsZero() {
+ return nil
+ }
+ // If additional credentials are provided, make them available in the context
+ // Detect unusable blessings now, rather then discovering they are unusable on
+ // first use.
+ //
+ // TODO(ashankar,ataly): Potential confused deputy attack: The client provides
+ // the server's identity as the blessing. Figure out what we want to do about
+ // this - should servers be able to assume that a blessing is something that
+ // does not have the authorizations that the server's own identity has?
+ if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+ return verror.New(verror.ErrNoAccess, fs.ctx, verror.New(errBlessingsNotBound, fs.ctx, got, want))
+ }
+ fs.grantedBlessings = req.GrantedBlessings
return nil
}