Merge "ref: BlessingsForPeer -> PeerAuthorizer."
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 1d62312..dd8df81 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -1018,9 +1018,17 @@
 	// Decode the response header, if it hasn't already been decoded by Recv.
 	if fc.response.Error == nil && !fc.response.EndStreamResults {
 		if err := fc.dec.Decode(&fc.response); err != nil {
-			id, verr := decodeNetError(fc.ctx, err)
-			berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
-			return fc.close(berr)
+			select {
+			case <-fc.ctx.Done():
+				if fc.ctx.Err() == context.Canceled {
+					return verror.New(verror.ErrCanceled, fc.ctx)
+				}
+				return verror.New(verror.ErrTimeout, fc.ctx)
+			default:
+				id, verr := decodeNetError(fc.ctx, err)
+				berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+				return fc.close(berr)
+			}
 		}
 		// The response header must indicate the streaming results have ended.
 		if fc.response.Error == nil && !fc.response.EndStreamResults {
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index 42ef4c2..cd5d9e9 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -176,7 +176,7 @@
 	// Fill up all the write buffers to ensure that cancelling works even when the stream
 	// is blocked.
 	if ref.RPCTransitionState() >= ref.XServers {
-		call.Send(conn.DefaultBytesBufferedPerFlow)
+		call.Send(make([]byte, conn.DefaultBytesBufferedPerFlow-2048))
 	} else {
 		call.Send(make([]byte, vc.MaxSharedBytes))
 		call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
diff --git a/runtime/internal/rpc/test/full_test.go b/runtime/internal/rpc/test/full_test.go
index 80807fc..83cda0e 100644
--- a/runtime/internal/rpc/test/full_test.go
+++ b/runtime/internal/rpc/test/full_test.go
@@ -289,13 +289,13 @@
 	}
 	status := server.Status()
 	if got, want := len(status.Endpoints), 0; got != want {
-		t.Errorf("got %q, want %q", got, want)
+		t.Errorf("got %d, want %d", got, want)
 	}
 	if got, want := len(status.Errors), 1; got != want {
-		t.Errorf("got %q, want %q", got, want)
+		t.Errorf("got %d, want %d", got, want)
 	}
 	if got, want := status.Errors[0].Error(), "oops"; got != want {
-		t.Errorf("got %q, want %q", got, want)
+		t.Errorf("got %d, want %d", got, want)
 	}
 }
 
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 04cc52d..9e65ed7 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -572,7 +572,8 @@
 func (fc *flowXClient) start(suffix, method string, args []interface{}, deadline time.Time, opts []rpc.CallOpt) error {
 	grantedB, err := fc.initSecurity(fc.ctx, method, suffix, opts)
 	if err != nil {
-		fc.close(err)
+		berr := verror.New(verror.ErrNotTrusted, fc.ctx, err)
+		return fc.close(berr)
 	}
 	req := rpc.Request{
 		Suffix:           suffix,
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index f2bdbac..f072f0d 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -49,7 +49,6 @@
 	settingsPublisher *pubsub.Publisher   // pubsub publisher for dhcp
 	settingsName      string              // pubwsub stream name for dhcp
 	dhcpState         *dhcpState          // dhcpState, nil if not using dhcp
-	principal         security.Principal
 	blessings         security.Blessings
 	typeCache         *typeCache
 	addressChooser    rpc.AddressChooser
@@ -807,7 +806,7 @@
 }
 func (fs *xflowServer) LocalPrincipal() security.Principal {
 	//nologcall
-	return fs.server.principal
+	return v23.GetPrincipal(fs.server.ctx)
 }
 func (fs *xflowServer) LocalBlessings() security.Blessings {
 	//nologcall