veyron/services/wspr: Implementing Stop() functionality for servers.

Similar to Publish and Register, Stop is a function on JavaScript Server object
that allows the user to stop publishing the server. Also similar to Publish
and Register, it simply sends a Stop request to WSPR which on return calls
Stop() on the corresponding Veyron Server object.

Change-Id: I50e6e7105ec01425bbbb620ad95f666b3057a292
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 6ad3b17..fe84d87 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -88,7 +88,7 @@
 	websocketVeyronRequest websocketMessageType = 0
 
 	// Publishing this websocket under a veyron name
-	websocketPublish = 1
+	websocketPublishServer = 1
 
 	// A response from a service in javascript to a request
 	// from the proxy.
@@ -102,6 +102,9 @@
 
 	// A request to get signature of a remote server
 	websocketSignatureRequest = 5
+
+	// A request to stop a server
+	websocketStopServer = 6
 )
 
 type websocketMessage struct {
@@ -610,8 +613,10 @@
 			wsp.sendOnStream(msg.Id, msg.Data, ww)
 		case websocketStreamClose:
 			wsp.closeStream(msg.Id)
-		case websocketPublish:
+		case websocketPublishServer:
 			go wsp.handlePublishRequest(msg.Data, ww)
+		case websocketStopServer:
+			go wsp.handleStopRequest(msg.Data, ww)
 		case websocketServerResponse:
 			go wsp.handleServerResponse(msg.Id, msg.Data)
 		case websocketSignatureRequest:
@@ -637,6 +642,19 @@
 	return server, nil
 }
 
+func (wsp *websocketPipe) removeServer(serverId uint64) {
+	wsp.Lock()
+	server := wsp.servers[serverId]
+	if server == nil {
+		wsp.Unlock()
+		return
+	}
+	delete(wsp.servers, serverId)
+	wsp.Unlock()
+
+	server.Stop()
+}
+
 func (wsp *websocketPipe) publish(publishRequest publishRequest, w clientWriter) {
 	// Create a server for the websocket pipe, if it does not exist already
 	server, err := wsp.maybeCreateServer(publishRequest.ServerId)
@@ -684,6 +702,27 @@
 	wsp.publish(publishRequest, w)
 }
 
+// handleStopRequest takes a request to stop a server.
+func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) {
+
+	var serverId uint64
+	decoder := json.NewDecoder(bytes.NewBufferString(data))
+	if err := decoder.Decode(&serverId); err != nil {
+		w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+		return
+	}
+
+	wsp.removeServer(serverId)
+
+	// Send true to indicate stop has finished
+	result := response{Type: responseFinal, Message: true}
+	if err := vom.ObjToJSON(w, vom.ValueOf(result)); err != nil {
+		w.sendError(verror.Internalf("error marshalling results: %v", err))
+		return
+	}
+	w.FinishMessage()
+}
+
 // handleServerResponse handles the completion of outstanding calls to JavaScript services
 // by filling the corresponding channel with the result from JavaScript.
 func (wsp *websocketPipe) handleServerResponse(id int64, data string) {
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index c2e6102..4ea681a 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -373,35 +373,39 @@
 	})
 }
 
-func TestJavascriptPublish(t *testing.T) {
+type runningTest struct {
+	wspr             *WSPR
+	wsp              *websocketPipe
+	writer           *testWriter
+	mounttableServer ipc.Server
+	proxyServer      *proxy.Proxy
+}
+
+func publishServer() (*runningTest, error) {
 	mounttableServer, endpoint, err := startMountTableServer()
 
 	if err != nil {
-		t.Errorf("unable to start mounttable: %v", err)
-		return
+		return nil, fmt.Errorf("unable to start mounttable: %v", err)
 	}
 
-	defer mounttableServer.Stop()
-
 	proxyServer, err := startProxy()
 
 	if err != nil {
-		t.Errorf("unable to start proxy: %v", err)
-		return
+		return nil, fmt.Errorf("unable to start proxy: %v", err)
 	}
 
-	defer proxyServer.Shutdown()
-
 	proxyEndpoint := proxyServer.Endpoint().String()
 
 	wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String() + "/mt"})
 	wspr.setup()
 	wsp := websocketPipe{ctx: wspr}
-	wsp.setup()
-	defer wsp.cleanup()
 	writer := testWriter{
 		logger: wspr.logger,
 	}
+	wsp.writerCreator = func(int64) clientWriter {
+		return &writer
+	}
+	wsp.setup()
 	wsp.publish(publishRequest{
 		Name: "adder",
 		Services: map[string]JSONServiceSignature{
@@ -409,11 +413,26 @@
 		},
 	}, &writer)
 
-	if len(writer.stream) != 1 {
-		t.Errorf("expected only on response, got %d", len(writer.stream))
+	return &runningTest{
+		wspr, &wsp, &writer, mounttableServer, proxyServer,
+	}, nil
+}
+
+func TestJavascriptPublishServer(t *testing.T) {
+	rt, err := publishServer()
+	defer rt.mounttableServer.Stop()
+	defer rt.proxyServer.Shutdown()
+	defer rt.wsp.cleanup()
+
+	if err != nil {
+		t.Errorf("could not publish server %v", err)
 	}
 
-	resp := writer.stream[0]
+	if len(rt.writer.stream) != 1 {
+		t.Errorf("expected only on response, got %d", len(rt.writer.stream))
+	}
+
+	resp := rt.writer.stream[0]
 
 	if resp.Type != responseFinal {
 		t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
@@ -424,11 +443,39 @@
 		if _, err := r.NewEndpoint(msg); err == nil {
 			return
 		}
-
 	}
 	t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
 }
 
+func TestJavascriptStopServer(t *testing.T) {
+	rt, err := publishServer()
+	defer rt.mounttableServer.Stop()
+	defer rt.proxyServer.Shutdown()
+	defer rt.wsp.cleanup()
+
+	if err != nil {
+		t.Errorf("could not publish server %v", err)
+		return
+	}
+
+	// ensure there is only one server and then stop the server
+	if len(rt.wsp.servers) != 1 {
+		t.Errorf("expected only one server but got: %d", len(rt.wsp.servers))
+		return
+	}
+	for serverId := range rt.wsp.servers {
+		rt.wsp.removeServer(serverId)
+	}
+
+	// ensure there is no more servers now
+	if len(rt.wsp.servers) != 0 {
+		t.Errorf("expected no server after stopping the only one but got: %d", len(rt.wsp.servers))
+		return
+	}
+
+	return
+}
+
 type jsServerTestCase struct {
 	method string
 	inArgs []interface{}
@@ -458,49 +505,20 @@
 }
 
 func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
-	mounttableServer, endpoint, err := startMountTableServer()
+	rt, err := publishServer()
+	defer rt.mounttableServer.Stop()
+	defer rt.proxyServer.Shutdown()
+	defer rt.wsp.cleanup()
 
 	if err != nil {
-		t.Errorf("unable to start mounttable: %v", err)
-		return
+		t.Errorf("could not publish server %v", err)
 	}
 
-	defer mounttableServer.Stop()
-
-	proxyServer, err := startProxy()
-
-	if err != nil {
-		t.Errorf("unable to start proxy: %v", err)
-		return
+	if len(rt.writer.stream) != 1 {
+		t.Errorf("expected only on response, got %d", len(rt.writer.stream))
 	}
 
-	defer proxyServer.Shutdown()
-
-	proxyEndpoint := proxyServer.Endpoint().String()
-
-	wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String() + "/mt//"})
-	wspr.setup()
-	wsp := websocketPipe{ctx: wspr}
-	writer := testWriter{
-		logger: wspr.logger,
-	}
-	wsp.writerCreator = func(int64) clientWriter {
-		return &writer
-	}
-	wsp.setup()
-	defer wsp.cleanup()
-	wsp.publish(publishRequest{
-		Name: "adder",
-		Services: map[string]JSONServiceSignature{
-			"adder": adderServiceSignature,
-		},
-	}, &writer)
-
-	if len(writer.stream) != 1 {
-		t.Errorf("expected only on response, got %d", len(writer.stream))
-	}
-
-	resp := writer.stream[0]
+	resp := rt.writer.stream[0]
 
 	if resp.Type != responseFinal {
 		t.Errorf("unknown stream message Got: %v, expected: publish response", resp)
@@ -516,16 +534,16 @@
 		t.Errorf("invalid endpdoint returned from publish: %v", resp.Message)
 	}
 
-	writer.stream = nil
+	rt.writer.stream = nil
 
 	// Create a client using wspr's runtime so it points to the right mounttable.
-	client, err := wspr.rt.NewClient()
+	client, err := rt.wspr.rt.NewClient()
 
 	if err != nil {
 		t.Errorf("unable to create client: %v", err)
 	}
 
-	call, err := client.StartCall(wspr.rt.NewContext(), "/"+msg+"//adder", test.method, test.inArgs)
+	call, err := client.StartCall(rt.wspr.rt.NewContext(), "/"+msg+"//adder", test.method, test.inArgs)
 	if err != nil {
 		t.Errorf("failed to start call: %v", err)
 	}
@@ -547,7 +565,7 @@
 	}
 
 	// Wait until the rpc has started.
-	if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+	if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
 		t.Errorf("didn't recieve expected message: %v", err)
 	}
 	for _, msg := range test.clientStream {
@@ -558,14 +576,14 @@
 	}
 
 	// Wait until all the streaming messages have been acknowledged.
-	if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+	if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
 		t.Errorf("didn't recieve expected message: %v", err)
 	}
 
 	expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStreamClose})
 
 	expectedStream := test.serverStream
-	go sendServerStream(t, &wsp, &test, &writer)
+	go sendServerStream(t, rt.wsp, &test, rt.writer)
 	for {
 		var data interface{}
 		if err := call.Recv(&data); err != nil {
@@ -599,11 +617,11 @@
 	}
 
 	// Wait until the close streaming messages have been acknowledged.
-	if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+	if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
 		t.Errorf("didn't recieve expected message: %v", err)
 	}
 
-	checkResponses(&writer, expectedWebsocketMessage, nil, t)
+	checkResponses(rt.writer, expectedWebsocketMessage, nil, t)
 }
 
 func TestSimpleJSServer(t *testing.T) {