wspr: Treat WSPR as an RPC service part 2.
Transition the remaining server management calls to the controller interface.
MultiPart: 2/2
Change-Id: I0ed71e7a225fcd4c5869f3412c1c44954e09de7d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 05f30ac..e2d784c 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -55,16 +55,6 @@
retryTimeout = flag.Int("retry-timeout", 2, "Duration in seconds to retry starting an RPC call. 0 means never retry.")
}
-type serveRequest struct {
- Name string
- ServerId uint32
-}
-
-type addRemoveNameRequest struct {
- Name string
- ServerId uint32
-}
-
type outstandingRequest struct {
stream *outstandingStream
cancel context.CancelFunc
@@ -442,7 +432,7 @@
// If this message is for an internal service, do a short-circuit dispatch here.
if msg.Name == "controller" {
- c.handleInternalCall(ctx, &msg, decoder, w, span)
+ go c.handleInternalCall(ctx, &msg, decoder, w, span)
return
}
@@ -506,33 +496,6 @@
return server, nil
}
-func (c *Controller) removeServer(serverId uint32) {
- c.Lock()
- server := c.servers[serverId]
- if server == nil {
- c.Unlock()
- return
- }
- delete(c.servers, serverId)
- c.Unlock()
-
- server.Stop()
-}
-
-// HandleServeRequest takes a request to serve a server, creates a server,
-// registers the provided services and sends true if everything succeeded.
-func (c *Controller) Serve(ctx ipc.ServerContext, name string, serverId uint32) error {
- server, err := c.maybeCreateServer(serverId)
- if err != nil {
- return verror.Convert(verror.ErrInternal, nil, err)
- }
- vlog.VI(2).Infof("serving under name: %q", name)
- if err := server.Serve(name); err != nil {
- return verror.Convert(verror.ErrInternal, nil, err)
- }
- return nil
-}
-
// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
// run by the Javascript server.
func (c *Controller) HandleLookupResponse(id int32, data string) {
@@ -563,77 +526,62 @@
server.HandleAuthResponse(id, data)
}
-// HandleStopRequest takes a request to stop a server.
-func (c *Controller) HandleStopRequest(data string, w lib.ClientWriter) {
- var serverId uint32
- if err := json.Unmarshal([]byte(data), &serverId); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+// Serve instructs WSPR to start listening for calls on behalf
+// of a javascript server.
+func (c *Controller) Serve(_ ipc.ServerContext, name string, serverId uint32) error {
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
}
-
- c.removeServer(serverId)
-
- // Send true to indicate stop has finished
- if err := w.Send(lib.ResponseFinal, true); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+ vlog.VI(2).Infof("serving under name: %q", name)
+ if err := server.Serve(name); err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
}
+ return nil
}
-// HandleAddNameRequest takes a request to add a new name to a server
-func (c *Controller) HandleAddNameRequest(data string, w lib.ClientWriter) {
- var request addRemoveNameRequest
- if err := json.Unmarshal([]byte(data), &request); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+// Stop instructs WSPR to stop listening for calls for the
+// given javascript server.
+func (c *Controller) Stop(_ ipc.ServerContext, serverId uint32) error {
+ c.Lock()
+ server := c.servers[serverId]
+ if server == nil {
+ c.Unlock()
+ return nil
}
+ delete(c.servers, serverId)
+ c.Unlock()
+ server.Stop()
+ return nil
+}
+
+// AddName adds a published name to an existing server.
+func (c *Controller) AddName(_ ipc.ServerContext, serverId uint32, name string) error {
// Create a server for the pipe, if it does not exist already
- server, err := c.maybeCreateServer(request.ServerId)
+ server, err := c.maybeCreateServer(serverId)
if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+ return verror.Convert(verror.ErrInternal, nil, err)
}
-
// Add name
- if err := server.AddName(request.Name); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+ if err := server.AddName(name); err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
}
-
- // Send true to indicate request has finished without error
- if err := w.Send(lib.ResponseFinal, true); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
- }
+ return nil
}
-// HandleRemoveNameRequest takes a request to remove a name from a server
-func (c *Controller) HandleRemoveNameRequest(data string, w lib.ClientWriter) {
- var request addRemoveNameRequest
- if err := json.Unmarshal([]byte(data), &request); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
- }
-
+// RemoveName removes a published name from an existing server.
+func (c *Controller) RemoveName(_ ipc.ServerContext, serverId uint32, name string) error {
// Create a server for the pipe, if it does not exist already
- server, err := c.maybeCreateServer(request.ServerId)
+ server, err := c.maybeCreateServer(serverId)
if err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
+ return verror.Convert(verror.ErrInternal, nil, err)
}
-
// Remove name
- server.RemoveName(request.Name)
-
+ server.RemoveName(name)
// Remove name from signature cache as well
- c.signatureManager.FlushCacheEntry(request.Name)
-
- // Send true to indicate request has finished without error
- if err := w.Send(lib.ResponseFinal, true); err != nil {
- w.Error(verror.Convert(verror.ErrInternal, nil, err))
- return
- }
+ c.signatureManager.FlushCacheEntry(name)
+ return nil
}
// HandleServerResponse handles the completion of outstanding calls to JavaScript services
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 23d7ee4..b69c802 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -388,9 +388,8 @@
t.Fatalf("could not serve server %v", err)
}
- if len(rt.writer.Stream) != 1 {
- t.Errorf("expected only one response, got %d", len(rt.writer.Stream))
- return
+ if err = rt.writer.WaitForMessage(1); err != nil {
+ t.Fatalf("error waiting for response: %v", err)
}
resp := rt.writer.Stream[0]
@@ -415,13 +414,17 @@
return
}
+ if err = rt.writer.WaitForMessage(1); err != nil {
+ t.Fatalf("error waiting for response: %v", err)
+ }
+
// ensure there is only one server and then stop the server
if len(rt.controller.servers) != 1 {
t.Errorf("expected only one server but got: %d", len(rt.controller.servers))
return
}
for serverId := range rt.controller.servers {
- rt.controller.removeServer(serverId)
+ rt.controller.Stop(nil, serverId)
}
// ensure there is no more servers now
@@ -471,9 +474,8 @@
t.Errorf("could not serve server %v", err)
}
- if len(rt.writer.Stream) != 1 {
- t.Errorf("expected only on response, got %d", len(rt.writer.Stream))
- return
+ if err := rt.writer.WaitForMessage(1); err != nil {
+ t.Fatalf("error waiting for message: %v", err)
}
resp := rt.writer.Stream[0]
diff --git a/services/wsprd/app/controller.vdl b/services/wsprd/app/controller.vdl
index 8dcb760..b79ade9 100644
--- a/services/wsprd/app/controller.vdl
+++ b/services/wsprd/app/controller.vdl
@@ -1,5 +1,14 @@
package app
type Controller interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
Serve(name string, serverId uint32) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(serverId uint32) error
+ // AddName adds a published name to an existing server.
+ AddName(serverId uint32, name string) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(serverId uint32, name string) error
}
diff --git a/services/wsprd/app/controller.vdl.go b/services/wsprd/app/controller.vdl.go
index 65cce72..d75b029 100644
--- a/services/wsprd/app/controller.vdl.go
+++ b/services/wsprd/app/controller.vdl.go
@@ -13,7 +13,16 @@
// ControllerClientMethods is the client interface
// containing Controller methods.
type ControllerClientMethods interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
Serve(ctx *context.T, name string, serverId uint32, opts ...ipc.CallOpt) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(ctx *context.T, serverId uint32, opts ...ipc.CallOpt) error
+ // AddName adds a published name to an existing server.
+ AddName(ctx *context.T, serverId uint32, name string, opts ...ipc.CallOpt) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(ctx *context.T, serverId uint32, name string, opts ...ipc.CallOpt) error
}
// ControllerClientStub adds universal methods to ControllerClientMethods.
@@ -54,10 +63,46 @@
return
}
+func (c implControllerClientStub) Stop(ctx *context.T, i0 uint32, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Stop", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) AddName(ctx *context.T, i0 uint32, i1 string, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "AddName", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) RemoveName(ctx *context.T, i0 uint32, i1 string, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "RemoveName", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
// ControllerServerMethods is the interface a server writer
// implements for Controller.
type ControllerServerMethods interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
Serve(ctx ipc.ServerContext, name string, serverId uint32) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(ctx ipc.ServerContext, serverId uint32) error
+ // AddName adds a published name to an existing server.
+ AddName(ctx ipc.ServerContext, serverId uint32, name string) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(ctx ipc.ServerContext, serverId uint32, name string) error
}
// ControllerServerStubMethods is the server interface containing
@@ -99,6 +144,18 @@
return s.impl.Serve(ctx, i0, i1)
}
+func (s implControllerServerStub) Stop(ctx ipc.ServerContext, i0 uint32) error {
+ return s.impl.Stop(ctx, i0)
+}
+
+func (s implControllerServerStub) AddName(ctx ipc.ServerContext, i0 uint32, i1 string) error {
+ return s.impl.AddName(ctx, i0, i1)
+}
+
+func (s implControllerServerStub) RemoveName(ctx ipc.ServerContext, i0 uint32, i1 string) error {
+ return s.impl.RemoveName(ctx, i0, i1)
+}
+
func (s implControllerServerStub) Globber() *ipc.GlobState {
return s.gs
}
@@ -117,10 +174,34 @@
Methods: []ipc.MethodDesc{
{
Name: "Serve",
+ Doc: "// Serve instructs WSPR to start listening for calls on behalf\n// of a javascript server.",
InArgs: []ipc.ArgDesc{
{"name", ``}, // string
{"serverId", ``}, // uint32
},
},
+ {
+ Name: "Stop",
+ Doc: "// Stop instructs WSPR to stop listening for calls for the\n// given javascript server.",
+ InArgs: []ipc.ArgDesc{
+ {"serverId", ``}, // uint32
+ },
+ },
+ {
+ Name: "AddName",
+ Doc: "// AddName adds a published name to an existing server.",
+ InArgs: []ipc.ArgDesc{
+ {"serverId", ``}, // uint32
+ {"name", ``}, // string
+ },
+ },
+ {
+ Name: "RemoveName",
+ Doc: "// RemoveName removes a published name from an existing server.",
+ InArgs: []ipc.ArgDesc{
+ {"serverId", ``}, // uint32
+ {"name", ``}, // string
+ },
+ },
},
}
diff --git a/services/wsprd/app/messaging.go b/services/wsprd/app/messaging.go
index 71f4c7e..f605426 100644
--- a/services/wsprd/app/messaging.go
+++ b/services/wsprd/app/messaging.go
@@ -108,12 +108,6 @@
c.SendOnStream(msg.Id, msg.Data, w)
case StreamCloseMessage:
c.CloseStream(msg.Id)
- case StopServerMessage:
- go c.HandleStopRequest(msg.Data, w)
- case AddName:
- go c.HandleAddNameRequest(msg.Data, w)
- case RemoveName:
- go c.HandleRemoveNameRequest(msg.Data, w)
case ServerResponseMessage:
go c.HandleServerResponse(msg.Id, msg.Data)
case SignatureRequestMessage: