veyron/services/wsprd: Fixed a race in wspr because writing to different
writers returned by NextWriter in different goroutines is not safe.
They all share an internal buffer.
Change-Id: I08d56bbc3aefa552797e4383bbb663c9befafa72
diff --git a/services/wsprd/wspr/writer.go b/services/wsprd/wspr/writer.go
index 4c85ea0..ebf799e 100644
--- a/services/wsprd/wspr/writer.go
+++ b/services/wsprd/wspr/writer.go
@@ -23,7 +23,7 @@
// Implements clientWriter interface for sending messages over websockets.
type websocketWriter struct {
- ws *websocket.Conn
+ wsp *websocketPipe
logger vlog.Logger
id int64
}
@@ -35,16 +35,14 @@
return err
}
- wc, err := w.ws.NextWriter(websocket.TextMessage)
- if err != nil {
- w.logger.Error("Failed to get a writer from the websocket", err)
- return err
- }
- if err := vom.ObjToJSON(wc, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
+ var buf2 bytes.Buffer
+
+ if err := vom.ObjToJSON(&buf2, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
w.logger.Error("Failed to write the message", err)
return err
}
- wc.Close()
+
+ w.wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf2.Bytes()}
return nil
}
diff --git a/services/wsprd/wspr/wspr.go b/services/wsprd/wspr/wspr.go
index 81e31e9..2a04c7b 100644
--- a/services/wsprd/wspr/wspr.go
+++ b/services/wsprd/wspr/wspr.go
@@ -284,6 +284,11 @@
inType vom.Type
}
+type wsMessage struct {
+ buf []byte
+ messageType int
+}
+
type websocketPipe struct {
// Protects outstandingStreams and outstandingServerRequests.
sync.Mutex
@@ -310,6 +315,8 @@
// Creates a client writer for a given flow. This is a member so that tests can override
// the default implementation.
writerCreator func(id int64) lib.ClientWriter
+
+ writeQueue chan wsMessage
}
// Implements the serverHelper interface
@@ -340,6 +347,7 @@
// cleans up any outstanding rpcs.
func (wsp *websocketPipe) cleanup() {
+ wsp.ctx.logger.VI(0).Info("Cleaning up websocket")
wsp.Lock()
defer wsp.Unlock()
for _, stream := range wsp.outstandingStreams {
@@ -358,10 +366,29 @@
wsp.outstandingStreams = make(map[int64]outstandingStream)
wsp.flowMap = make(map[int64]*server.Server)
wsp.servers = make(map[uint64]*server.Server)
+ wsp.writeQueue = make(chan wsMessage, 50)
+ go wsp.writeLoop()
if wsp.writerCreator == nil {
wsp.writerCreator = func(id int64) lib.ClientWriter {
- return &websocketWriter{ws: wsp.ws, id: id, logger: wsp.ctx.logger}
+ return &websocketWriter{wsp: wsp, id: id, logger: wsp.ctx.logger}
+ }
+ }
+}
+
+func (wsp *websocketPipe) writeLoop() {
+ for {
+ msg, ok := <-wsp.writeQueue
+ if !ok {
+ wsp.ctx.logger.Errorf("write queue was closed")
+ return
+ }
+
+ if msg.messageType == websocket.PingMessage {
+ wsp.ctx.logger.Infof("sending ping")
+ }
+ if err := wsp.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
+ wsp.ctx.logger.Errorf("failed to write bytes: %s", err)
}
}
}
@@ -395,26 +422,19 @@
MounttableRoot: mounttableRoots,
}
- wc, err := wsp.ws.NextWriter(websocket.TextMessage)
- if err != nil {
- wsp.ctx.logger.Errorf("failed to create websocket writer: %s", err)
- return
- }
- if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
+ var buf bytes.Buffer
+ if err := vom.ObjToJSON(&buf, vom.ValueOf(msg)); err != nil {
wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err)
return
}
- wc.Close()
+ wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
}
func (wsp *websocketPipe) pingLoop() {
for {
time.Sleep(pingInterval)
wsp.ctx.logger.VI(2).Info("ws: ping")
- if err := wsp.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- wsp.ctx.logger.Error("ws: ping failed")
- return
- }
+ wsp.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
}
}
@@ -476,7 +496,7 @@
}
// handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w lib.ClientWriter) {
veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
if err != nil {
w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
@@ -543,11 +563,11 @@
if err := decoder.Decode(&msg); err != nil {
errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
wsp.ctx.logger.Error(errMsg)
- wsp.ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
+ wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
continue
}
- ww := &websocketWriter{ws: wsp.ws, id: msg.Id, logger: wsp.ctx.logger}
+ ww := wsp.writerCreator(msg.Id)
switch msg.Type {
case websocketVeyronRequest:
@@ -627,7 +647,7 @@
// handleServeRequest takes a request to serve a server, creates
// a server, registers the provided services and sends the endpoint back.
-func (wsp *websocketPipe) handleServeRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleServeRequest(data string, w lib.ClientWriter) {
// Decode the serve request which includes IDL, registered services and name
var serveRequest serveRequest
decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -639,7 +659,7 @@
}
// handleStopRequest takes a request to stop a server.
-func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleStopRequest(data string, w lib.ClientWriter) {
var serverId uint64
decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -756,7 +776,7 @@
}
// handleSignatureRequest uses signature manager to get and cache signature of a remote server
-func (wsp *websocketPipe) handleSignatureRequest(data string, w *websocketWriter) {
+func (wsp *websocketPipe) handleSignatureRequest(data string, w lib.ClientWriter) {
// Decode the request
var request signatureRequest
decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -791,6 +811,7 @@
http.HandleFunc("/debug", ctx.handleDebug)
http.Handle("/favicon.ico", http.NotFoundHandler())
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
+ ctx.logger.VI(0).Info("Creating a new websocket")
pipe := &websocketPipe{ctx: &ctx}
pipe.start(w, r)
})