wspr: Treat WSPR as an RPC service, part 5.
Transitioning namespace calls to an RPC interface.
MultiPart: 2/2
Change-Id: Ice8a5b90258b3e8eb893c2b8aebd263ebb6920dc
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 32231b8..2e959e0 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -103,6 +103,11 @@
// Store for all the Blessings that javascript has a handle to.
blessingsStore *principal.JSBlessingsHandles
+
+ // reservedServices contains a map of reserved service names. These
+ // are objects that serve requests in wspr without actually making
+ // an outgoing rpc call.
+ reservedServices map[string]ipc.Invoker
}
// NewController creates a new Controller. writerCreator will be used to create a new flow for rpcs to
@@ -129,6 +134,19 @@
blessingsStore: principal.NewJSBlessingsHandles(),
}
+ controllerInvoker, err := ipc.ReflectInvoker(ControllerServer(controller))
+ if err != nil {
+ return nil, err
+ }
+ namespaceInvoker, err := ipc.ReflectInvoker(namespace.New(ctx))
+ if err != nil {
+ return nil, err
+ }
+ controller.reservedServices = map[string]ipc.Invoker{
+ "__controller": controllerInvoker,
+ "__namespace": namespaceInvoker,
+ }
+
controller.setup()
return controller, nil
}
@@ -347,9 +365,23 @@
ctx *context.T
vrpc *VeyronRPCRequest
tags []*vdl.Value
+ w lib.ClientWriter
}
-func (l *localCall) Send(interface{}) error { return nil }
+func (l *localCall) Send(item interface{}) error {
+ vomItem, err := lib.VomEncode(item)
+ if err != nil {
+ err = verror.New(marshallingError, l.ctx, item, err)
+ l.w.Error(err)
+ return err
+ }
+ if err := l.w.Send(lib.ResponseStream, vomItem); err != nil {
+ err = verror.New(marshallingError, l.ctx, item)
+ l.w.Error(err)
+ return err
+ }
+ return nil
+}
func (l *localCall) Recv(interface{}) error { return nil }
func (l *localCall) Blessings() security.Blessings { return nil }
func (l *localCall) Server() ipc.Server { return nil }
@@ -366,12 +398,7 @@
func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
-func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
- invoker, err := ipc.ReflectInvoker(ControllerServer(c))
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
+func (c *Controller) handleInternalCall(ctx *context.T, invoker ipc.Invoker, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
@@ -383,11 +410,17 @@
return
}
}
- results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+ results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags, w}, argptrs)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
+ if msg.IsStreaming {
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+ }
+ }
+
// Convert results from []interface{} to []*vdl.Value.
vresults := make([]*vdl.Value, len(results))
for i, res := range results {
@@ -436,8 +469,8 @@
}
// If this message is for an internal service, do a short-circuit dispatch here.
- if msg.Name == "controller" {
- go c.handleInternalCall(ctx, &msg, decoder, w, span)
+ if invoker, ok := c.reservedServices[msg.Name]; ok {
+ go c.handleInternalCall(ctx, invoker, &msg, decoder, w, span)
return
}
@@ -701,8 +734,3 @@
blessings, _ := call.RemoteBlessings()
return blessings, nil
}
-
-// HandleNamespaceRequest uses the namespace client to respond to namespace specific requests such as glob
-func (c *Controller) HandleNamespaceRequest(ctx *context.T, data string, w lib.ClientWriter) {
- namespace.HandleRequest(ctx, data, w)
-}