blob: 90c98a3e61d941c24947ef2497804ebe28170ddd [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wsprlib
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/x/ref/services/wspr/internal/app"
"v.io/x/ref/services/wspr/internal/lib"
"github.com/gorilla/websocket"
)
// wsMessage is the struct that is put on the write queue.
type wsMessage struct {
buf []byte
messageType int
}
type pipe struct {
// The struct that handles the translation of javascript request to vanadium requests.
controller *app.Controller
ws *websocket.Conn
wspr *WSPR
// Creates a client writer for a given flow. This is a member so that tests can override
// the default implementation.
writerCreator func(id int32) lib.ClientWriter
// There is a single write goroutine because ws.NewWriter() creates a new writer that
// writes to a shared buffer in the websocket, so it is not safe to have multiple go
// routines writing to different websocket writers.
writeQueue chan wsMessage
// This request is used to tell WSPR which pipe to remove when we shutdown.
req *http.Request
}
func newPipe(w http.ResponseWriter, req *http.Request, wspr *WSPR, creator func(id int32) lib.ClientWriter) *pipe {
pipe := &pipe{wspr: wspr, req: req}
if creator == nil {
creator = func(id int32) lib.ClientWriter {
return &websocketWriter{p: pipe, id: id, ctx: wspr.ctx}
}
}
pipe.writerCreator = creator
origin := req.Header.Get("Origin")
if origin == "" {
wspr.ctx.Errorf("Could not read origin from the request")
http.Error(w, "Could not read origin from the request", http.StatusBadRequest)
return nil
}
p, err := wspr.principalManager.Principal(origin)
if err != nil {
p = v23.GetPrincipal(wspr.ctx)
wspr.ctx.Errorf("no principal associated with origin %s: %v", origin, err)
// TODO(bjornick): Send an error to the client when all of the principal stuff is set up.
}
pipe.controller, err = app.NewController(wspr.ctx, creator, wspr.listenSpec, wspr.namespaceRoots, p)
if err != nil {
wspr.ctx.Errorf("Could not create controller: %v", err)
http.Error(w, fmt.Sprintf("Failed to create controller: %v", err), http.StatusInternalServerError)
return nil
}
pipe.start(w, req)
return pipe
}
// cleans up any outstanding rpcs.
func (p *pipe) cleanup(ctx *context.T) {
ctx.VI(0).Info("Cleaning up websocket")
p.controller.Cleanup(ctx)
p.ws.Close()
p.wspr.CleanUpPipe(p.req)
}
func (p *pipe) setup() {
p.writeQueue = make(chan wsMessage, 50)
go p.writeLoop()
}
func (p *pipe) writeLoop() {
for {
msg, ok := <-p.writeQueue
if !ok {
p.wspr.ctx.Errorf("write queue was closed")
return
}
if msg.messageType == websocket.PingMessage {
p.wspr.ctx.Infof("sending ping")
}
if err := p.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
p.wspr.ctx.Errorf("failed to write bytes: %s", err)
}
}
}
func (p *pipe) start(w http.ResponseWriter, req *http.Request) {
ws, err := websocket.Upgrade(w, req, nil, 1024, 1024)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(w, "Not a websocket handshake", 400)
return
} else if err != nil {
http.Error(w, "Internal Error", 500)
p.wspr.ctx.Errorf("websocket upgrade failed: %s", err)
return
}
p.ws = ws
p.ws.SetPongHandler(p.pongHandler)
p.setup()
go p.readLoop()
go p.pingLoop()
}
func (p *pipe) pingLoop() {
for {
time.Sleep(pingInterval)
p.wspr.ctx.VI(2).Info("ws: ping")
p.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
}
}
func (p *pipe) pongHandler(msg string) error {
p.wspr.ctx.VI(2).Infof("ws: pong")
p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
return nil
}
func (p *pipe) readLoop() {
p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
for {
op, r, err := p.ws.NextReader()
if err == io.ErrUnexpectedEOF { // websocket disconnected
break
}
if err != nil {
p.wspr.ctx.VI(1).Infof("websocket receive: %s", err)
break
}
if op != websocket.TextMessage {
p.wspr.ctx.Errorf("unexpected websocket op: %v", op)
}
var msg app.Message
decoder := json.NewDecoder(r)
if err := decoder.Decode(&msg); err != nil {
errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
p.wspr.ctx.Error(errMsg)
p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
continue
}
ww := p.writerCreator(msg.Id)
p.controller.HandleIncomingMessage(msg, ww)
}
p.cleanup(p.wspr.ctx)
}