veyron/services/wsprd: Fixed a race in wspr because writing to different
writers returned by NextWriter in different goroutines is not safe.
They all share an internal buffer.

Change-Id: I08d56bbc3aefa552797e4383bbb663c9befafa72
diff --git a/services/wsprd/wspr/writer.go b/services/wsprd/wspr/writer.go
index 4c85ea0..ebf799e 100644
--- a/services/wsprd/wspr/writer.go
+++ b/services/wsprd/wspr/writer.go
@@ -23,7 +23,7 @@
 
 // Implements clientWriter interface for sending messages over websockets.
 type websocketWriter struct {
-	ws     *websocket.Conn
+	wsp    *websocketPipe
 	logger vlog.Logger
 	id     int64
 }
@@ -35,16 +35,14 @@
 		return err
 	}
 
-	wc, err := w.ws.NextWriter(websocket.TextMessage)
-	if err != nil {
-		w.logger.Error("Failed to get a writer from the websocket", err)
-		return err
-	}
-	if err := vom.ObjToJSON(wc, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
+	var buf2 bytes.Buffer
+
+	if err := vom.ObjToJSON(&buf2, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
 		w.logger.Error("Failed to write the message", err)
 		return err
 	}
-	wc.Close()
+
+	w.wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf2.Bytes()}
 
 	return nil
 }
diff --git a/services/wsprd/wspr/wspr.go b/services/wsprd/wspr/wspr.go
index 81e31e9..2a04c7b 100644
--- a/services/wsprd/wspr/wspr.go
+++ b/services/wsprd/wspr/wspr.go
@@ -284,6 +284,11 @@
 	inType vom.Type
 }
 
+type wsMessage struct {
+	buf         []byte
+	messageType int
+}
+
 type websocketPipe struct {
 	// Protects outstandingStreams and outstandingServerRequests.
 	sync.Mutex
@@ -310,6 +315,8 @@
 	// Creates a client writer for a given flow.  This is a member so that tests can override
 	// the default implementation.
 	writerCreator func(id int64) lib.ClientWriter
+
+	writeQueue chan wsMessage
 }
 
 // Implements the serverHelper interface
@@ -340,6 +347,7 @@
 
 // cleans up any outstanding rpcs.
 func (wsp *websocketPipe) cleanup() {
+	wsp.ctx.logger.VI(0).Info("Cleaning up websocket")
 	wsp.Lock()
 	defer wsp.Unlock()
 	for _, stream := range wsp.outstandingStreams {
@@ -358,10 +366,29 @@
 	wsp.outstandingStreams = make(map[int64]outstandingStream)
 	wsp.flowMap = make(map[int64]*server.Server)
 	wsp.servers = make(map[uint64]*server.Server)
+	wsp.writeQueue = make(chan wsMessage, 50)
+	go wsp.writeLoop()
 
 	if wsp.writerCreator == nil {
 		wsp.writerCreator = func(id int64) lib.ClientWriter {
-			return &websocketWriter{ws: wsp.ws, id: id, logger: wsp.ctx.logger}
+			return &websocketWriter{wsp: wsp, id: id, logger: wsp.ctx.logger}
+		}
+	}
+}
+
+func (wsp *websocketPipe) writeLoop() {
+	for {
+		msg, ok := <-wsp.writeQueue
+		if !ok {
+			wsp.ctx.logger.Errorf("write queue was closed")
+			return
+		}
+
+		if msg.messageType == websocket.PingMessage {
+			wsp.ctx.logger.Infof("sending ping")
+		}
+		if err := wsp.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
+			wsp.ctx.logger.Errorf("failed to write bytes: %s", err)
 		}
 	}
 }
@@ -395,26 +422,19 @@
 		MounttableRoot: mounttableRoots,
 	}
 
-	wc, err := wsp.ws.NextWriter(websocket.TextMessage)
-	if err != nil {
-		wsp.ctx.logger.Errorf("failed to create websocket writer: %s", err)
-		return
-	}
-	if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
+	var buf bytes.Buffer
+	if err := vom.ObjToJSON(&buf, vom.ValueOf(msg)); err != nil {
 		wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err)
 		return
 	}
-	wc.Close()
+	wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
 }
 
 func (wsp *websocketPipe) pingLoop() {
 	for {
 		time.Sleep(pingInterval)
 		wsp.ctx.logger.VI(2).Info("ws: ping")
-		if err := wsp.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
-			wsp.ctx.logger.Error("ws: ping failed")
-			return
-		}
+		wsp.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
 	}
 }
 
@@ -476,7 +496,7 @@
 }
 
 // handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w lib.ClientWriter) {
 	veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
 	if err != nil {
 		w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
@@ -543,11 +563,11 @@
 		if err := decoder.Decode(&msg); err != nil {
 			errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
 			wsp.ctx.logger.Error(errMsg)
-			wsp.ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
+			wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
 			continue
 		}
 
-		ww := &websocketWriter{ws: wsp.ws, id: msg.Id, logger: wsp.ctx.logger}
+		ww := wsp.writerCreator(msg.Id)
 
 		switch msg.Type {
 		case websocketVeyronRequest:
@@ -627,7 +647,7 @@
 
 // handleServeRequest takes a request to serve a server, creates
 // a server, registers the provided services and sends the endpoint back.
-func (wsp *websocketPipe) handleServeRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleServeRequest(data string, w lib.ClientWriter) {
 	// Decode the serve request which includes IDL, registered services and name
 	var serveRequest serveRequest
 	decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -639,7 +659,7 @@
 }
 
 // handleStopRequest takes a request to stop a server.
-func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleStopRequest(data string, w lib.ClientWriter) {
 
 	var serverId uint64
 	decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -756,7 +776,7 @@
 }
 
 // handleSignatureRequest uses signature manager to get and cache signature of a remote server
-func (wsp *websocketPipe) handleSignatureRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleSignatureRequest(data string, w lib.ClientWriter) {
 	// Decode the request
 	var request signatureRequest
 	decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -791,6 +811,7 @@
 	http.HandleFunc("/debug", ctx.handleDebug)
 	http.Handle("/favicon.ico", http.NotFoundHandler())
 	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
+		ctx.logger.VI(0).Info("Creating a new websocket")
 		pipe := &websocketPipe{ctx: &ctx}
 		pipe.start(w, r)
 	})