veyron/services/wspr: Implementing Stop() functionality for servers.
Similar to Publish and Register, Stop is a function on JavaScript Server object
that allows the user to stop publishing the server. Also similar to Publish
and Register, it simply sends a Stop request to WSPR which on return calls
Stop() on the corresponding Veyron Server object.
Change-Id: I50e6e7105ec01425bbbb620ad95f666b3057a292
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 6ad3b17..fe84d87 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -88,7 +88,7 @@
websocketVeyronRequest websocketMessageType = 0
// Publishing this websocket under a veyron name
- websocketPublish = 1
+ websocketPublishServer = 1
// A response from a service in javascript to a request
// from the proxy.
@@ -102,6 +102,9 @@
// A request to get signature of a remote server
websocketSignatureRequest = 5
+
+ // A request to stop a server
+ websocketStopServer = 6
)
type websocketMessage struct {
@@ -610,8 +613,10 @@
wsp.sendOnStream(msg.Id, msg.Data, ww)
case websocketStreamClose:
wsp.closeStream(msg.Id)
- case websocketPublish:
+ case websocketPublishServer:
go wsp.handlePublishRequest(msg.Data, ww)
+ case websocketStopServer:
+ go wsp.handleStopRequest(msg.Data, ww)
case websocketServerResponse:
go wsp.handleServerResponse(msg.Id, msg.Data)
case websocketSignatureRequest:
@@ -637,6 +642,19 @@
return server, nil
}
+func (wsp *websocketPipe) removeServer(serverId uint64) {
+ wsp.Lock()
+ server := wsp.servers[serverId]
+ if server == nil {
+ wsp.Unlock()
+ return
+ }
+ delete(wsp.servers, serverId)
+ wsp.Unlock()
+
+ server.Stop()
+}
+
func (wsp *websocketPipe) publish(publishRequest publishRequest, w clientWriter) {
// Create a server for the websocket pipe, if it does not exist already
server, err := wsp.maybeCreateServer(publishRequest.ServerId)
@@ -684,6 +702,27 @@
wsp.publish(publishRequest, w)
}
+// handleStopRequest takes a request to stop a server.
+func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) {
+
+ var serverId uint64
+ decoder := json.NewDecoder(bytes.NewBufferString(data))
+ if err := decoder.Decode(&serverId); err != nil {
+ w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ return
+ }
+
+ wsp.removeServer(serverId)
+
+ // Send true to indicate stop has finished
+ result := response{Type: responseFinal, Message: true}
+ if err := vom.ObjToJSON(w, vom.ValueOf(result)); err != nil {
+ w.sendError(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+ w.FinishMessage()
+}
+
// handleServerResponse handles the completion of outstanding calls to JavaScript services
// by filling the corresponding channel with the result from JavaScript.
func (wsp *websocketPipe) handleServerResponse(id int64, data string) {
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index c2e6102..4ea681a 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -373,35 +373,39 @@
})
}
-func TestJavascriptPublish(t *testing.T) {
+type runningTest struct {
+ wspr *WSPR
+ wsp *websocketPipe
+ writer *testWriter
+ mounttableServer ipc.Server
+ proxyServer *proxy.Proxy
+}
+
+func publishServer() (*runningTest, error) {
mounttableServer, endpoint, err := startMountTableServer()
if err != nil {
- t.Errorf("unable to start mounttable: %v", err)
- return
+ return nil, fmt.Errorf("unable to start mounttable: %v", err)
}
- defer mounttableServer.Stop()
-
proxyServer, err := startProxy()
if err != nil {
- t.Errorf("unable to start proxy: %v", err)
- return
+ return nil, fmt.Errorf("unable to start proxy: %v", err)
}
- defer proxyServer.Shutdown()
-
proxyEndpoint := proxyServer.Endpoint().String()
wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String() + "/mt"})
wspr.setup()
wsp := websocketPipe{ctx: wspr}
- wsp.setup()
- defer wsp.cleanup()
writer := testWriter{
logger: wspr.logger,
}
+ wsp.writerCreator = func(int64) clientWriter {
+ return &writer
+ }
+ wsp.setup()
wsp.publish(publishRequest{
Name: "adder",
Services: map[string]JSONServiceSignature{
@@ -409,11 +413,26 @@
},
}, &writer)
- if len(writer.stream) != 1 {
- t.Errorf("expected only on response, got %d", len(writer.stream))
+ return &runningTest{
+ wspr, &wsp, &writer, mounttableServer, proxyServer,
+ }, nil
+}
+
+func TestJavascriptPublishServer(t *testing.T) {
+ rt, err := publishServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.wsp.cleanup()
+
+ if err != nil {
+ t.Errorf("could not publish server %v", err)
}
- resp := writer.stream[0]
+ if len(rt.writer.stream) != 1 {
+ t.Errorf("expected only on response, got %d", len(rt.writer.stream))
+ }
+
+ resp := rt.writer.stream[0]
if resp.Type != responseFinal {
t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
@@ -424,11 +443,39 @@
if _, err := r.NewEndpoint(msg); err == nil {
return
}
-
}
t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
}
+func TestJavascriptStopServer(t *testing.T) {
+ rt, err := publishServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.wsp.cleanup()
+
+ if err != nil {
+ t.Errorf("could not publish server %v", err)
+ return
+ }
+
+ // ensure there is only one server and then stop the server
+ if len(rt.wsp.servers) != 1 {
+ t.Errorf("expected only one server but got: %d", len(rt.wsp.servers))
+ return
+ }
+ for serverId := range rt.wsp.servers {
+ rt.wsp.removeServer(serverId)
+ }
+
+ // ensure there is no more servers now
+ if len(rt.wsp.servers) != 0 {
+ t.Errorf("expected no server after stopping the only one but got: %d", len(rt.wsp.servers))
+ return
+ }
+
+ return
+}
+
type jsServerTestCase struct {
method string
inArgs []interface{}
@@ -458,49 +505,20 @@
}
func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
- mounttableServer, endpoint, err := startMountTableServer()
+ rt, err := publishServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.wsp.cleanup()
if err != nil {
- t.Errorf("unable to start mounttable: %v", err)
- return
+ t.Errorf("could not publish server %v", err)
}
- defer mounttableServer.Stop()
-
- proxyServer, err := startProxy()
-
- if err != nil {
- t.Errorf("unable to start proxy: %v", err)
- return
+ if len(rt.writer.stream) != 1 {
+ t.Errorf("expected only on response, got %d", len(rt.writer.stream))
}
- defer proxyServer.Shutdown()
-
- proxyEndpoint := proxyServer.Endpoint().String()
-
- wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String() + "/mt//"})
- wspr.setup()
- wsp := websocketPipe{ctx: wspr}
- writer := testWriter{
- logger: wspr.logger,
- }
- wsp.writerCreator = func(int64) clientWriter {
- return &writer
- }
- wsp.setup()
- defer wsp.cleanup()
- wsp.publish(publishRequest{
- Name: "adder",
- Services: map[string]JSONServiceSignature{
- "adder": adderServiceSignature,
- },
- }, &writer)
-
- if len(writer.stream) != 1 {
- t.Errorf("expected only on response, got %d", len(writer.stream))
- }
-
- resp := writer.stream[0]
+ resp := rt.writer.stream[0]
if resp.Type != responseFinal {
t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
@@ -516,16 +534,16 @@
t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
}
- writer.stream = nil
+ rt.writer.stream = nil
// Create a client using wspr's runtime so it points to the right mounttable.
- client, err := wspr.rt.NewClient()
+ client, err := rt.wspr.rt.NewClient()
if err != nil {
t.Errorf("unable to create client: %v", err)
}
- call, err := client.StartCall(wspr.rt.NewContext(), "/"+msg+"//adder", test.method, test.inArgs)
+ call, err := client.StartCall(rt.wspr.rt.NewContext(), "/"+msg+"//adder", test.method, test.inArgs)
if err != nil {
t.Errorf("failed to start call: %v", err)
}
@@ -547,7 +565,7 @@
}
// Wait until the rpc has started.
- if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+ if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
t.Errorf("didn't recieve expected message: %v", err)
}
for _, msg := range test.clientStream {
@@ -558,14 +576,14 @@
}
// Wait until all the streaming messages have been acknowledged.
- if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+ if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
t.Errorf("didn't recieve expected message: %v", err)
}
expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStreamClose})
expectedStream := test.serverStream
- go sendServerStream(t, &wsp, &test, &writer)
+ go sendServerStream(t, rt.wsp, &test, rt.writer)
for {
var data interface{}
if err := call.Recv(&data); err != nil {
@@ -599,11 +617,11 @@
}
// Wait until the close streaming messages have been acknowledged.
- if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+ if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
t.Errorf("didn't recieve expected message: %v", err)
}
- checkResponses(&writer, expectedWebsocketMessage, nil, t)
+ checkResponses(rt.writer, expectedWebsocketMessage, nil, t)
}
func TestSimpleJSServer(t *testing.T) {