blob: 3349f85d8df7764ca66ec4ede403fc6bd514dccf [file] [log] [blame]
package wspr
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
_ "net/http/pprof"
"os"
"strings"
"time"
"veyron/services/wsprd/app"
"veyron/services/wsprd/lib"
"veyron2/security"
"veyron2/verror"
"veyron2/vlog"
"veyron2/vom"
"github.com/gorilla/websocket"
)
// The type of message sent by the JS client to the wspr.
type websocketMessageType int
const (
// Making a veyron client request, streaming or otherwise
websocketVeyronRequest websocketMessageType = 0
// Serving this websocket under an object name
websocketServe = 1
// A response from a service in javascript to a request
// from the proxy.
websocketServerResponse = 2
// Sending streaming data, either from a JS client or JS service.
websocketStreamingValue = 3
// A response that means the stream is closed by the client.
websocketStreamClose = 4
// A request to get signature of a remote server
websocketSignatureRequest = 5
// A request to stop a server
websocketStopServer = 6
// A request to associate an identity with an origin
websocketAssocIdentity = 7
)
type websocketMessage struct {
Id int64
// This contains the json encoded payload.
Data string
// Whether it is an rpc request or a serve request.
Type websocketMessageType
}
// wsMessage is the struct that is put on the write queue.
type wsMessage struct {
buf []byte
messageType int
}
type pipe struct {
// The struct that handles the translation of javascript request to veyron requests.
controller *app.Controller
ws *websocket.Conn
logger vlog.Logger
wspr *WSPR
writerCreator func(id int64) lib.ClientWriter
// There is a single write goroutine because ws.NewWriter() creates a new writer that
// writes to a shared buffer in the websocket, so it is not safe to have multiple go
// routines writing to different websocket writers.
writeQueue chan wsMessage
// This request is used to tell WSPR which pipe to remove when we shutdown.
req *http.Request
}
func newPipe(w http.ResponseWriter, req *http.Request, wspr *WSPR, creator func(id int64) lib.ClientWriter) *pipe {
pipe := &pipe{logger: wspr.rt.Logger(), writerCreator: creator, req: req, wspr: wspr}
pipe.start(w, req)
return pipe
}
// cleans up any outstanding rpcs.
func (p *pipe) cleanup() {
p.logger.VI(0).Info("Cleaning up websocket")
p.controller.Cleanup()
p.wspr.CleanUpPipe(p.req)
}
func (p *pipe) setup() {
if p.writerCreator == nil {
p.writerCreator = func(id int64) lib.ClientWriter {
return &websocketWriter{p: p, id: id, logger: p.logger}
}
}
p.writeQueue = make(chan wsMessage, 50)
go p.writeLoop()
p.controller = app.NewController(p.writerCreator, p.wspr.veyronProxyEP)
// TODO(bjornick): Pass in the identity linked to this origin.
p.controller.UpdateIdentity(nil)
}
func (p *pipe) writeLoop() {
for {
msg, ok := <-p.writeQueue
if !ok {
p.logger.Errorf("write queue was closed")
return
}
if msg.messageType == websocket.PingMessage {
p.logger.Infof("sending ping")
}
if err := p.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
p.logger.Errorf("failed to write bytes: %s", err)
}
}
}
func (p *pipe) start(w http.ResponseWriter, req *http.Request) {
ws, err := websocket.Upgrade(w, req, nil, 1024, 1024)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(w, "Not a websocket handshake", 400)
return
} else if err != nil {
http.Error(w, "Internal Error", 500)
p.logger.Errorf("websocket upgrade failed: %s", err)
return
}
p.ws = ws
p.ws.SetPongHandler(p.pongHandler)
p.setup()
p.sendInitialMessage()
go p.readLoop()
go p.pingLoop()
}
// Upon first connect, we send a message with the wsprConfig.
func (p *pipe) sendInitialMessage() {
mounttableRoots := strings.Split(os.Getenv("NAMESPACE_ROOT"), ",")
if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
mounttableRoots = []string{}
}
msg := wsprConfig{
MounttableRoot: mounttableRoots,
}
var buf bytes.Buffer
if err := vom.ObjToJSON(&buf, vom.ValueOf(msg)); err != nil {
p.logger.Errorf("failed to convert wspr config to json: %s", err)
return
}
p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
}
func (p *pipe) pingLoop() {
for {
time.Sleep(pingInterval)
p.logger.VI(2).Info("ws: ping")
p.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
}
}
func (p *pipe) pongHandler(msg string) error {
p.logger.VI(2).Infof("ws: pong")
p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
return nil
}
func (p *pipe) readLoop() {
p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
for {
op, r, err := p.ws.NextReader()
if err == io.ErrUnexpectedEOF { // websocket disconnected
break
}
if err != nil {
p.logger.VI(1).Infof("websocket receive: %s", err)
break
}
if op != websocket.TextMessage {
p.logger.Errorf("unexpected websocket op: %v", op)
}
var msg websocketMessage
decoder := json.NewDecoder(r)
if err := decoder.Decode(&msg); err != nil {
errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
p.logger.Error(errMsg)
p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
continue
}
ww := p.writerCreator(msg.Id)
switch msg.Type {
case websocketVeyronRequest:
p.controller.HandleVeyronRequest(msg.Id, msg.Data, ww)
case websocketStreamingValue:
// This will asynchronous for a client rpc, but synchronous for a
// server rpc. This could be potentially bad if the server is sending
// back large packets. Making it asynchronous for the server, would make
// it difficult to guarantee that all stream messages make it to the client
// before the finish call.
// TODO(bjornick): Make the server send also asynchronous.
p.controller.SendOnStream(msg.Id, msg.Data, ww)
case websocketStreamClose:
p.controller.CloseStream(msg.Id)
case websocketServe:
go p.controller.HandleServeRequest(msg.Data, ww)
case websocketStopServer:
go p.controller.HandleStopRequest(msg.Data, ww)
case websocketServerResponse:
go p.controller.HandleServerResponse(msg.Id, msg.Data)
case websocketSignatureRequest:
go p.controller.HandleSignatureRequest(msg.Data, ww)
case websocketAssocIdentity:
p.handleAssocIdentity(msg.Data, ww)
default:
ww.Error(verror.Unknownf("unknown message type: %v", msg.Type))
}
}
p.cleanup()
}
type assocIdentityData struct {
Account string
Identity string // base64(vom(security.PrivateID))
Origin string
}
// handleAssocIdentityRequest associates the identity with the origin
func (p *pipe) handleAssocIdentity(data string, w lib.ClientWriter) {
// Decode the request
var parsedData assocIdentityData
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&parsedData); err != nil {
w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
return
}
p.logger.VI(2).Info("associating name %v and private id %v to origin %v",
parsedData.Account,
parsedData.Identity,
parsedData.Origin)
idManager := p.wspr.idManager
id := decodeIdentity(p.logger, parsedData.Identity)
p.controller.UpdateIdentity(id)
if err := idManager.AddAccount(parsedData.Account, id); err != nil {
w.Error(verror.Internalf("identity.AddAccount(%v, %v) failed: %v", parsedData.Account, id, err))
}
if err := idManager.AddOrigin(parsedData.Origin, parsedData.Account, []security.ServiceCaveat{}); err != nil {
w.Error(verror.Internalf("identity.AddOrigin(%v, %v, %v) failed: %v", parsedData.Origin, parsedData.Account, []security.ServiceCaveat{}, err))
}
if err := w.Send(lib.ResponseFinal, nil); err != nil {
w.Error(verror.Internalf("error marshalling results: %v", err))
return
}
}
func decodeIdentity(logger vlog.Logger, msg string) security.PrivateID {
if len(msg) == 0 {
return nil
}
// PrivateIds are sent as base64-encoded-vom-encoded identity.PrivateID.
// Pure JSON or pure VOM could not have been used.
// - JSON cannot be used because identity.PrivateID contains an
// ecdsa.PrivateKey (which encoding/json cannot decode).
// - Regular VOM cannot be used because it only has a binary,
// Go-specific implementation at this time.
// The "portable" encoding is base64-encoded VOM (see
// veyron/daemon/cmd/identity/responder/responder.go).
// When toddw@ has the text-based VOM encoding going, that can probably
// be used instead.
var id security.PrivateID
if err := vom.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(msg))).Decode(&id); err != nil {
logger.Error("Could not decode identity:", err)
return nil
}
return id
}
func encodeIdentity(logger vlog.Logger, identity security.PrivateID) string {
var vomEncoded bytes.Buffer
if err := vom.NewEncoder(&vomEncoded).Encode(identity); err != nil {
logger.Error("Could not encode identity: %v", err)
}
var base64Encoded bytes.Buffer
encoder := base64.NewEncoder(base64.URLEncoding, &base64Encoded)
encoder.Write(vomEncoded.Bytes())
encoder.Close()
return base64Encoded.String()
}