wspr: Treat WSPR as an RPC service, part 5.
Transitioning namespace calls to an RPC interface.
MultiPart: 2/2
Change-Id: Ice8a5b90258b3e8eb893c2b8aebd263ebb6920dc
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 32231b8..2e959e0 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -103,6 +103,11 @@
// Store for all the Blessings that javascript has a handle to.
blessingsStore *principal.JSBlessingsHandles
+
+ // reservedServices contains a map of reserved service names. These
+ // are objects that serve requests in wspr without actually making
+ // an outgoing rpc call.
+ reservedServices map[string]ipc.Invoker
}
// NewController creates a new Controller. writerCreator will be used to create a new flow for rpcs to
@@ -129,6 +134,19 @@
blessingsStore: principal.NewJSBlessingsHandles(),
}
+ controllerInvoker, err := ipc.ReflectInvoker(ControllerServer(controller))
+ if err != nil {
+ return nil, err
+ }
+ namespaceInvoker, err := ipc.ReflectInvoker(namespace.New(ctx))
+ if err != nil {
+ return nil, err
+ }
+ controller.reservedServices = map[string]ipc.Invoker{
+ "__controller": controllerInvoker,
+ "__namespace": namespaceInvoker,
+ }
+
controller.setup()
return controller, nil
}
@@ -347,9 +365,23 @@
ctx *context.T
vrpc *VeyronRPCRequest
tags []*vdl.Value
+ w lib.ClientWriter
}
-func (l *localCall) Send(interface{}) error { return nil }
+func (l *localCall) Send(item interface{}) error {
+ vomItem, err := lib.VomEncode(item)
+ if err != nil {
+ err = verror.New(marshallingError, l.ctx, item, err)
+ l.w.Error(err)
+ return err
+ }
+ if err := l.w.Send(lib.ResponseStream, vomItem); err != nil {
+ err = verror.New(marshallingError, l.ctx, item)
+ l.w.Error(err)
+ return err
+ }
+ return nil
+}
func (l *localCall) Recv(interface{}) error { return nil }
func (l *localCall) Blessings() security.Blessings { return nil }
func (l *localCall) Server() ipc.Server { return nil }
@@ -366,12 +398,7 @@
func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
-func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
- invoker, err := ipc.ReflectInvoker(ControllerServer(c))
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
+func (c *Controller) handleInternalCall(ctx *context.T, invoker ipc.Invoker, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
@@ -383,11 +410,17 @@
return
}
}
- results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+ results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags, w}, argptrs)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
}
+ if msg.IsStreaming {
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+ }
+ }
+
// Convert results from []interface{} to []*vdl.Value.
vresults := make([]*vdl.Value, len(results))
for i, res := range results {
@@ -436,8 +469,8 @@
}
// If this message is for an internal service, do a short-circuit dispatch here.
- if msg.Name == "controller" {
- go c.handleInternalCall(ctx, &msg, decoder, w, span)
+ if invoker, ok := c.reservedServices[msg.Name]; ok {
+ go c.handleInternalCall(ctx, invoker, &msg, decoder, w, span)
return
}
@@ -701,8 +734,3 @@
blessings, _ := call.RemoteBlessings()
return blessings, nil
}
-
-// HandleNamespaceRequest uses the namespace client to respond to namespace specific requests such as glob
-func (c *Controller) HandleNamespaceRequest(ctx *context.T, data string, w lib.ClientWriter) {
- namespace.HandleRequest(ctx, data, w)
-}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 7a334ce..7fd7106 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -363,7 +363,7 @@
veyron2.GetNamespace(controller.Context()).SetRoots("/" + endpoint.String())
req, err := makeRequest(VeyronRPCRequest{
- Name: "controller",
+ Name: "__controller",
Method: "Serve",
NumInArgs: 2,
NumOutArgs: 1,
diff --git a/services/wsprd/app/messaging.go b/services/wsprd/app/messaging.go
index a737889..7865870 100644
--- a/services/wsprd/app/messaging.go
+++ b/services/wsprd/app/messaging.go
@@ -116,9 +116,6 @@
case AuthResponseMessage:
go c.HandleAuthResponse(msg.Id, msg.Data)
- case NamespaceRequestMessage:
- go c.HandleNamespaceRequest(ctx, msg.Data, w)
-
default:
w.Error(verror.New(errUnknownMessageType, ctx, msg.Type))
}
diff --git a/services/wsprd/namespace/namespace.vdl b/services/wsprd/namespace/namespace.vdl
new file mode 100644
index 0000000..d8ce654
--- /dev/null
+++ b/services/wsprd/namespace/namespace.vdl
@@ -0,0 +1,33 @@
+// Package namespace defines an RPC services that allows remoting of the
+// namespace client library over the wire. This is useful for
+// javascript so it doesn't have to implement the library.
+// This should be kept in sync with the namespace library (v.io/core/veyron2/naming).
+package namespace
+
+import (
+ "time"
+
+ "v.io/core/veyron2/naming"
+)
+
+type Namespace interface {
+ // Run a glob query and stream the results.
+ Glob(pattern string) stream<_, naming.VDLGlobReply> error
+ // Mount mounts a server under the given name.
+ Mount(name, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(name, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(name string) ([]string | error)
+ // ResolveToMt resolves a name to the address of the mounttable directly
+ // hosting it.
+ ResolveToMT(name string) ([]string | error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(name string) (bool | error)
+ // DisableCache disables the naming cache.
+ DisableCache(disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots() ([]string | error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(roots []string) error
+}
diff --git a/services/wsprd/namespace/namespace.vdl.go b/services/wsprd/namespace/namespace.vdl.go
new file mode 100644
index 0000000..4cbe8c9
--- /dev/null
+++ b/services/wsprd/namespace/namespace.vdl.go
@@ -0,0 +1,476 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: namespace.vdl
+
+// Package namespace defines an RPC services that allows remoting of the
+// namespace client library over the wire. This is useful for
+// javascript so it doesn't have to implement the library.
+// This should be kept in sync with the namespace library (v.io/core/veyron2/naming).
+package namespace
+
+import (
+ // VDL system imports
+ "io"
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
+
+ // VDL user imports
+ "time"
+ "v.io/core/veyron2/naming"
+ _ "v.io/core/veyron2/vdl/vdlroot/src/time"
+)
+
+// NamespaceClientMethods is the client interface
+// containing Namespace methods.
+type NamespaceClientMethods interface {
+ // Run a glob query and stream the results.
+ Glob(ctx *context.T, pattern string, opts ...ipc.CallOpt) (NamespaceGlobCall, error)
+ // Mount mounts a server under the given name.
+ Mount(ctx *context.T, name string, server string, ttl time.Duration, replace bool, opts ...ipc.CallOpt) error
+ // Unmount removes an existing mount point.
+ Unmount(ctx *context.T, name string, server string, opts ...ipc.CallOpt) error
+ // Resolve resolves a name to an address.
+ Resolve(ctx *context.T, name string, opts ...ipc.CallOpt) ([]string, error)
+ // ResolveToMt resolves a name to the address of the mounttable directly
+ // hosting it.
+ ResolveToMT(ctx *context.T, name string, opts ...ipc.CallOpt) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(ctx *context.T, name string, opts ...ipc.CallOpt) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(ctx *context.T, disable bool, opts ...ipc.CallOpt) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(*context.T, ...ipc.CallOpt) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(ctx *context.T, roots []string, opts ...ipc.CallOpt) error
+}
+
+// NamespaceClientStub adds universal methods to NamespaceClientMethods.
+type NamespaceClientStub interface {
+ NamespaceClientMethods
+ ipc.UniversalServiceMethods
+}
+
+// NamespaceClient returns a client stub for Namespace.
+func NamespaceClient(name string, opts ...ipc.BindOpt) NamespaceClientStub {
+ var client ipc.Client
+ for _, opt := range opts {
+ if clientOpt, ok := opt.(ipc.Client); ok {
+ client = clientOpt
+ }
+ }
+ return implNamespaceClientStub{name, client}
+}
+
+type implNamespaceClientStub struct {
+ name string
+ client ipc.Client
+}
+
+func (c implNamespaceClientStub) c(ctx *context.T) ipc.Client {
+ if c.client != nil {
+ return c.client
+ }
+ return veyron2.GetClient(ctx)
+}
+
+func (c implNamespaceClientStub) Glob(ctx *context.T, i0 string, opts ...ipc.CallOpt) (ocall NamespaceGlobCall, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Glob", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ ocall = &implNamespaceGlobCall{Call: call}
+ return
+}
+
+func (c implNamespaceClientStub) Mount(ctx *context.T, i0 string, i1 string, i2 time.Duration, i3 bool, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Mount", []interface{}{i0, i1, i2, i3}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Unmount(ctx *context.T, i0 string, i1 string, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Unmount", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Resolve(ctx *context.T, i0 string, opts ...ipc.CallOpt) (o0 []string, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Resolve", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) ResolveToMT(ctx *context.T, i0 string, opts ...ipc.CallOpt) (o0 []string, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "ResolveToMT", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) FlushCacheEntry(ctx *context.T, i0 string, opts ...ipc.CallOpt) (o0 bool, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "FlushCacheEntry", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) DisableCache(ctx *context.T, i0 bool, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "DisableCache", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Roots(ctx *context.T, opts ...ipc.CallOpt) (o0 []string, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Roots", nil, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) SetRoots(ctx *context.T, i0 []string, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "SetRoots", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+// NamespaceGlobClientStream is the client stream for Namespace.Glob.
+type NamespaceGlobClientStream interface {
+ // RecvStream returns the receiver side of the Namespace.Glob client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() naming.VDLGlobReply
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// NamespaceGlobCall represents the call returned from Namespace.Glob.
+type NamespaceGlobCall interface {
+ NamespaceGlobClientStream
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implNamespaceGlobCall struct {
+ ipc.Call
+ valRecv naming.VDLGlobReply
+ errRecv error
+}
+
+func (c *implNamespaceGlobCall) RecvStream() interface {
+ Advance() bool
+ Value() naming.VDLGlobReply
+ Err() error
+} {
+ return implNamespaceGlobCallRecv{c}
+}
+
+type implNamespaceGlobCallRecv struct {
+ c *implNamespaceGlobCall
+}
+
+func (c implNamespaceGlobCallRecv) Advance() bool {
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implNamespaceGlobCallRecv) Value() naming.VDLGlobReply {
+ return c.c.valRecv
+}
+func (c implNamespaceGlobCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implNamespaceGlobCall) Finish() (err error) {
+ err = c.Call.Finish()
+ return
+}
+
+// NamespaceServerMethods is the interface a server writer
+// implements for Namespace.
+type NamespaceServerMethods interface {
+ // Run a glob query and stream the results.
+ Glob(ctx NamespaceGlobContext, pattern string) error
+ // Mount mounts a server under the given name.
+ Mount(ctx ipc.ServerContext, name string, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(ctx ipc.ServerContext, name string, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(ctx ipc.ServerContext, name string) ([]string, error)
+ // ResolveToMt resolves a name to the address of the mounttable directly
+ // hosting it.
+ ResolveToMT(ctx ipc.ServerContext, name string) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(ctx ipc.ServerContext, name string) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(ctx ipc.ServerContext, disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(ipc.ServerContext) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(ctx ipc.ServerContext, roots []string) error
+}
+
+// NamespaceServerStubMethods is the server interface containing
+// Namespace methods, as expected by ipc.Server.
+// The only difference between this interface and NamespaceServerMethods
+// is the streaming methods.
+type NamespaceServerStubMethods interface {
+ // Run a glob query and stream the results.
+ Glob(ctx *NamespaceGlobContextStub, pattern string) error
+ // Mount mounts a server under the given name.
+ Mount(ctx ipc.ServerContext, name string, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(ctx ipc.ServerContext, name string, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(ctx ipc.ServerContext, name string) ([]string, error)
+ // ResolveToMt resolves a name to the address of the mounttable directly
+ // hosting it.
+ ResolveToMT(ctx ipc.ServerContext, name string) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(ctx ipc.ServerContext, name string) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(ctx ipc.ServerContext, disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(ipc.ServerContext) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(ctx ipc.ServerContext, roots []string) error
+}
+
+// NamespaceServerStub adds universal methods to NamespaceServerStubMethods.
+type NamespaceServerStub interface {
+ NamespaceServerStubMethods
+ // Describe the Namespace interfaces.
+ Describe__() []ipc.InterfaceDesc
+}
+
+// NamespaceServer returns a server stub for Namespace.
+// It converts an implementation of NamespaceServerMethods into
+// an object that may be used by ipc.Server.
+func NamespaceServer(impl NamespaceServerMethods) NamespaceServerStub {
+ stub := implNamespaceServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := ipc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := ipc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implNamespaceServerStub struct {
+ impl NamespaceServerMethods
+ gs *ipc.GlobState
+}
+
+func (s implNamespaceServerStub) Glob(ctx *NamespaceGlobContextStub, i0 string) error {
+ return s.impl.Glob(ctx, i0)
+}
+
+func (s implNamespaceServerStub) Mount(ctx ipc.ServerContext, i0 string, i1 string, i2 time.Duration, i3 bool) error {
+ return s.impl.Mount(ctx, i0, i1, i2, i3)
+}
+
+func (s implNamespaceServerStub) Unmount(ctx ipc.ServerContext, i0 string, i1 string) error {
+ return s.impl.Unmount(ctx, i0, i1)
+}
+
+func (s implNamespaceServerStub) Resolve(ctx ipc.ServerContext, i0 string) ([]string, error) {
+ return s.impl.Resolve(ctx, i0)
+}
+
+func (s implNamespaceServerStub) ResolveToMT(ctx ipc.ServerContext, i0 string) ([]string, error) {
+ return s.impl.ResolveToMT(ctx, i0)
+}
+
+func (s implNamespaceServerStub) FlushCacheEntry(ctx ipc.ServerContext, i0 string) (bool, error) {
+ return s.impl.FlushCacheEntry(ctx, i0)
+}
+
+func (s implNamespaceServerStub) DisableCache(ctx ipc.ServerContext, i0 bool) error {
+ return s.impl.DisableCache(ctx, i0)
+}
+
+func (s implNamespaceServerStub) Roots(ctx ipc.ServerContext) ([]string, error) {
+ return s.impl.Roots(ctx)
+}
+
+func (s implNamespaceServerStub) SetRoots(ctx ipc.ServerContext, i0 []string) error {
+ return s.impl.SetRoots(ctx, i0)
+}
+
+func (s implNamespaceServerStub) Globber() *ipc.GlobState {
+ return s.gs
+}
+
+func (s implNamespaceServerStub) Describe__() []ipc.InterfaceDesc {
+ return []ipc.InterfaceDesc{NamespaceDesc}
+}
+
+// NamespaceDesc describes the Namespace interface.
+var NamespaceDesc ipc.InterfaceDesc = descNamespace
+
+// descNamespace hides the desc to keep godoc clean.
+var descNamespace = ipc.InterfaceDesc{
+ Name: "Namespace",
+ PkgPath: "v.io/wspr/veyron/services/wsprd/namespace",
+ Methods: []ipc.MethodDesc{
+ {
+ Name: "Glob",
+ Doc: "// Run a glob query and stream the results.",
+ InArgs: []ipc.ArgDesc{
+ {"pattern", ``}, // string
+ },
+ },
+ {
+ Name: "Mount",
+ Doc: "// Mount mounts a server under the given name.",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ {"server", ``}, // string
+ {"ttl", ``}, // time.Duration
+ {"replace", ``}, // bool
+ },
+ },
+ {
+ Name: "Unmount",
+ Doc: "// Unmount removes an existing mount point.",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ {"server", ``}, // string
+ },
+ },
+ {
+ Name: "Resolve",
+ Doc: "// Resolve resolves a name to an address.",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "ResolveToMT",
+ Doc: "// ResolveToMt resolves a name to the address of the mounttable directly\n// hosting it.",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "FlushCacheEntry",
+ Doc: "// FlushCacheEntry removes the namespace cache entry for a given name.",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // bool
+ },
+ },
+ {
+ Name: "DisableCache",
+ Doc: "// DisableCache disables the naming cache.",
+ InArgs: []ipc.ArgDesc{
+ {"disable", ``}, // bool
+ },
+ },
+ {
+ Name: "Roots",
+ Doc: "// Roots returns the addresses of the current mounttable roots.",
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "SetRoots",
+ Doc: "// SetRoots sets the current mounttable roots.",
+ InArgs: []ipc.ArgDesc{
+ {"roots", ``}, // []string
+ },
+ },
+ },
+}
+
+// NamespaceGlobServerStream is the server stream for Namespace.Glob.
+type NamespaceGlobServerStream interface {
+ // SendStream returns the send side of the Namespace.Glob server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item naming.VDLGlobReply) error
+ }
+}
+
+// NamespaceGlobContext represents the context passed to Namespace.Glob.
+type NamespaceGlobContext interface {
+ ipc.ServerContext
+ NamespaceGlobServerStream
+}
+
+// NamespaceGlobContextStub is a wrapper that converts ipc.ServerCall into
+// a typesafe stub that implements NamespaceGlobContext.
+type NamespaceGlobContextStub struct {
+ ipc.ServerCall
+}
+
+// Init initializes NamespaceGlobContextStub from ipc.ServerCall.
+func (s *NamespaceGlobContextStub) Init(call ipc.ServerCall) {
+ s.ServerCall = call
+}
+
+// SendStream returns the send side of the Namespace.Glob server stream.
+func (s *NamespaceGlobContextStub) SendStream() interface {
+ Send(item naming.VDLGlobReply) error
+} {
+ return implNamespaceGlobContextSend{s}
+}
+
+type implNamespaceGlobContextSend struct {
+ s *NamespaceGlobContextStub
+}
+
+func (s implNamespaceGlobContextSend) Send(item naming.VDLGlobReply) error {
+ return s.s.Send(item)
+}
diff --git a/services/wsprd/namespace/request_handler.go b/services/wsprd/namespace/request_handler.go
index e578015..fcf2cc5 100644
--- a/services/wsprd/namespace/request_handler.go
+++ b/services/wsprd/namespace/request_handler.go
@@ -1,119 +1,95 @@
package namespace
import (
- "encoding/json"
"time"
"v.io/core/veyron2"
"v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/verror"
-
- "v.io/wspr/veyron/services/wsprd/lib"
)
-// Function to format endpoints. Used by browspr to swap 'tcp' for 'ws'.
-var EpFormatter func([]string) ([]string, error) = nil
-
-// request struct represents a request to call a method on the runtime's namespace client
-type request struct {
- Method namespaceMethod
- Args json.RawMessage
+type Server struct {
+ ns naming.Namespace
}
-type namespaceMethod int
-
-// enumerates the methods available to be called on the runtime's namespace client
-const (
- methodGlob namespaceMethod = 0
- methodMount = 1
- methodUnmount = 2
- methodResolve = 3
- methodResolveToMt = 4
- methodFlushCacheEntry = 5
- methodDisableCache = 6
- methodRoots = 7
- methodSetRoots = 8
-)
-
-// globArgs defines the args for the glob method
-type globArgs struct {
- Pattern string
+func New(ctx *context.T) *Server {
+ return &Server{veyron2.GetNamespace(ctx)}
}
-// mountArgs defines the args for the mount method
-type mountArgs struct {
- Name string
- Server string
- Ttl time.Duration
- replaceMount bool
-}
-
-// unmountArgs defines the args for the unmount method
-type unmountArgs struct {
- Name string
- Server string
-}
-
-// resolveArgs defines the args for the resolve method
-type resolveArgs struct {
- Name string
-}
-
-// resolveToMtArgs defines the args for the resolveToMt method
-type resolveToMtArgs struct {
- Name string
-}
-
-// flushCacheEntryArgs defines the args for the flushCacheEntry method
-type flushCacheEntryArgs struct {
- Name string
-}
-
-// disableCacheArgs defines the args for the disableCache method
-type disableCacheArgs struct {
- Disable bool
-}
-
-// setRootsArgs defines the args for the setRoots method
-type setRootsArgs struct {
- Roots []string
-}
-
-// handleRequest uses the namespace client to respond to namespace specific requests such as glob
-func HandleRequest(ctx *context.T, data string, w lib.ClientWriter) {
- // Decode the request
- var req request
- if err := json.Unmarshal([]byte(data), &req); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
+func (s *Server) Glob(ctx *NamespaceGlobContextStub, pattern string) error {
+ // Call Glob on the namespace client instance
+ ch, err := s.ns.Glob(ctx.Context(), pattern)
+ if err != nil {
+ return err
}
- // Get the runtime's Namespace client
- var ns = veyron2.GetNamespace(ctx)
+ stream := ctx.SendStream()
- switch req.Method {
- case methodGlob:
- glob(ctx, ns, w, req.Args)
- case methodMount:
- mount(ctx, ns, w, req.Args)
- case methodUnmount:
- unmount(ctx, ns, w, req.Args)
- case methodResolve:
- resolve(ctx, ns, w, req.Args)
- case methodResolveToMt:
- resolveToMt(ctx, ns, w, req.Args)
- case methodFlushCacheEntry:
- flushCacheEntry(ctx, ns, w, req.Args)
- case methodDisableCache:
- disableCache(ctx, ns, w, req.Args)
- case methodRoots:
- roots(ctx, ns, w)
- case methodSetRoots:
- setRoots(ctx, ns, w, req.Args)
- default:
- w.Error(verror.New(verror.ErrNoExist, ctx, req.Method))
+ for mp := range ch {
+ var reply naming.VDLGlobReply
+ switch v := mp.(type) {
+ case *naming.GlobError:
+ reply = naming.VDLGlobReplyError{*v}
+ case *naming.MountEntry:
+ reply = naming.VDLGlobReplyEntry{convertToVDLEntry(*v)}
+ }
+ if err = stream.Send(reply); err != nil {
+ return err
+ }
}
+ return nil
+}
+
+func (s *Server) Mount(ctx ipc.ServerContext, name, server string, ttl time.Duration, replace bool) error {
+ rmOpt := naming.ReplaceMountOpt(replace)
+ err := s.ns.Mount(ctx.Context(), name, server, ttl, rmOpt)
+ if err != nil {
+ err = verror.Convert(verror.ErrInternal, ctx.Context(), err)
+ }
+ return err
+}
+
+func (s *Server) Unmount(ctx ipc.ServerContext, name, server string) error {
+ return s.ns.Unmount(ctx.Context(), name, server)
+}
+
+func (s *Server) Resolve(ctx ipc.ServerContext, name string) ([]string, error) {
+ me, err := s.ns.Resolve(ctx.Context(), name)
+ if err != nil {
+ return nil, verror.Convert(verror.ErrInternal, ctx.Context(), err)
+ }
+ return me.Names(), nil
+}
+
+func (s *Server) ResolveToMT(ctx ipc.ServerContext, name string) ([]string, error) {
+ me, err := s.ns.ResolveToMountTable(ctx.Context(), name)
+ if err != nil {
+ return nil, verror.Convert(verror.ErrInternal, ctx.Context(), err)
+ }
+ return me.Names(), nil
+}
+
+func (s *Server) FlushCacheEntry(ctx ipc.ServerContext, name string) (bool, error) {
+ return s.ns.FlushCacheEntry(name), nil
+}
+
+func (s *Server) DisableCache(ctx ipc.ServerContext, disable bool) error {
+ disableCacheCtl := naming.DisableCache(disable)
+ _ = s.ns.CacheCtl(disableCacheCtl)
+ return nil
+}
+
+func (s *Server) Roots(ctx ipc.ServerContext) ([]string, error) {
+ return s.ns.Roots(), nil
+}
+
+func (s *Server) SetRoots(ctx ipc.ServerContext, roots []string) error {
+ if err := s.ns.SetRoots(roots...); err != nil {
+ return verror.Convert(verror.ErrInternal, ctx.Context(), err)
+ }
+ return nil
}
func convertToVDLEntry(value naming.MountEntry) naming.VDLMountEntry {
@@ -130,190 +106,3 @@
}
return result
}
-
-func glob(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args globArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- // Call Glob on the namespace client instance
- ch, err := ns.Glob(ctx, args.Pattern)
-
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- for mp := range ch {
- switch v := mp.(type) {
- // send results that have error through the error stream.
- // TODO(aghassemi) we want mp to be part of the error's ParamsList, but there is no good way right now in verror to do this.
- case *naming.GlobError:
- err := verror.Convert(verror.ErrUnknown, ctx, v.Error)
- w.Error(err)
- case *naming.MountEntry:
- val, err := lib.VomEncode(convertToVDLEntry(*v))
- if err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseStream, val); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, *v))
- return
- }
- }
- }
-
- if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseStreamClose"))
- }
-
- if err := w.Send(lib.ResponseFinal, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func mount(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args mountArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- rmOpt := naming.ReplaceMountOpt(args.replaceMount)
- err := ns.Mount(ctx, args.Name, args.Server, args.Ttl, rmOpt)
-
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseFinal, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func unmount(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args unmountArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- err := ns.Unmount(ctx, args.Name, args.Server)
-
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseFinal, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func resolve(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args resolveArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- me, err := ns.Resolve(ctx, args.Name)
-
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseFinal, me.Names()); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func resolveToMt(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args resolveToMtArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- me, err := ns.ResolveToMountTable(ctx, args.Name)
-
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseFinal, me.Names()); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func flushCacheEntry(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args flushCacheEntryArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- flushed := ns.FlushCacheEntry(args.Name)
-
- if err := w.Send(lib.ResponseFinal, flushed); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func disableCache(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args disableCacheArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- disableCacheCtl := naming.DisableCache(args.Disable)
- _ = ns.CacheCtl(disableCacheCtl)
-
- if err := w.Send(lib.ResponseFinal, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func roots(ctx *context.T, ns naming.Namespace, w lib.ClientWriter) {
- roots := ns.Roots()
-
- if err := w.Send(lib.ResponseFinal, roots); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}
-
-func setRoots(ctx *context.T, ns naming.Namespace, w lib.ClientWriter, rawArgs json.RawMessage) {
- var args setRootsArgs
- if err := json.Unmarshal([]byte(rawArgs), &args); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- var formattedRoots []string
- var err error
- if EpFormatter != nil {
- formattedRoots, err = EpFormatter(args.Roots)
- if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- }
- } else {
- formattedRoots = args.Roots
- }
-
- if err := ns.SetRoots(formattedRoots...); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, ctx, err))
- return
- }
-
- if err := w.Send(lib.ResponseFinal, nil); err != nil {
- w.Error(verror.New(verror.ErrInternal, ctx, "ResponseFinal"))
- }
-}