Merge "ref: BlessingsForPeer -> PeerAuthorizer."
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 1d62312..dd8df81 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -1018,9 +1018,17 @@
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
- id, verr := decodeNetError(fc.ctx, err)
- berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
- return fc.close(berr)
+ select {
+ case <-fc.ctx.Done():
+ if fc.ctx.Err() == context.Canceled {
+ return verror.New(verror.ErrCanceled, fc.ctx)
+ }
+ return verror.New(verror.ErrTimeout, fc.ctx)
+ default:
+ id, verr := decodeNetError(fc.ctx, err)
+ berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+ return fc.close(berr)
+ }
}
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index 42ef4c2..cd5d9e9 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -176,7 +176,7 @@
// Fill up all the write buffers to ensure that cancelling works even when the stream
// is blocked.
if ref.RPCTransitionState() >= ref.XServers {
- call.Send(conn.DefaultBytesBufferedPerFlow)
+ call.Send(make([]byte, conn.DefaultBytesBufferedPerFlow-2048))
} else {
call.Send(make([]byte, vc.MaxSharedBytes))
call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
diff --git a/runtime/internal/rpc/test/full_test.go b/runtime/internal/rpc/test/full_test.go
index 80807fc..83cda0e 100644
--- a/runtime/internal/rpc/test/full_test.go
+++ b/runtime/internal/rpc/test/full_test.go
@@ -289,13 +289,13 @@
}
status := server.Status()
if got, want := len(status.Endpoints), 0; got != want {
- t.Errorf("got %q, want %q", got, want)
+ t.Errorf("got %d, want %d", got, want)
}
if got, want := len(status.Errors), 1; got != want {
- t.Errorf("got %q, want %q", got, want)
+ t.Errorf("got %d, want %d", got, want)
}
if got, want := status.Errors[0].Error(), "oops"; got != want {
- t.Errorf("got %q, want %q", got, want)
+ t.Errorf("got %d, want %d", got, want)
}
}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 04cc52d..9e65ed7 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -572,7 +572,8 @@
func (fc *flowXClient) start(suffix, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) error {
grantedB, err := fc.initSecurity(fc.ctx, method, suffix, opts)
if err != nil {
- fc.close(err)
+ berr := verror.New(verror.ErrNotTrusted, fc.ctx, err)
+ return fc.close(berr)
}
req := rpc.Request{
Suffix: suffix,
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index f2bdbac..f072f0d 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -49,7 +49,6 @@
settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
settingsName string // pubwsub stream name for dhcp
dhcpState *dhcpState // dhcpState, nil if not using dhcp
- principal security.Principal
blessings security.Blessings
typeCache *typeCache
addressChooser rpc.AddressChooser
@@ -807,7 +806,7 @@
}
func (fs *xflowServer) LocalPrincipal() security.Principal {
//nologcall
- return fs.server.principal
+ return v23.GetPrincipal(fs.server.ctx)
}
func (fs *xflowServer) LocalBlessings() security.Blessings {
//nologcall