blob: 727807d6b964b68a02bacb8c0ea9f471d39e42e6 [file] [log] [blame]
// A simple WebSocket proxy (WSPR) that takes in a Veyron RPC message, encoded in JSON
// and stored in a WebSocket message, and sends it to the specified Veyron
// endpoint.
//
// Input arguments must be provided as a JSON message in the following format:
//
// {
// "Address" : String, //EndPoint Address
// "Name" : String, //Service Name
// "Method" : String, //Method Name
// "InArgs" : { "ArgName1" : ArgVal1, "ArgName2" : ArgVal2, ... },
// "IsStreaming" : true/false
// }
//
package wspr
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
_ "net/http/pprof"
"sync"
"time"
veyron_identity "veyron.io/veyron/veyron/services/identity"
"veyron.io/veyron/veyron/services/wsprd/identity"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vlog"
)
const (
pingInterval = 50 * time.Second // how often the server pings the client.
pongTimeout = pingInterval + 10*time.Second // maximum wait for pong.
)
type wsprConfig struct {
MounttableRoot []string
}
type WSPR struct {
mu sync.Mutex
tlsCert *tls.Certificate
rt veyron2.Runtime
logger vlog.Logger
port int
identdEP string
veyronProxyEP string
idManager *identity.IDManager
blesserService veyron_identity.OAuthBlesser
pipes map[*http.Request]*pipe
}
var logger vlog.Logger
func readFromRequest(r *http.Request) (*bytes.Buffer, error) {
var buf bytes.Buffer
if readBytes, err := io.Copy(&buf, r.Body); err != nil {
return nil, fmt.Errorf("error copying message out of request: %v", err)
} else if wantBytes := r.ContentLength; readBytes != wantBytes {
return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes)
}
return &buf, nil
}
func setAccessControl(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
}
// Starts the proxy and listens for requests. This method is blocking.
func (ctx WSPR) Run() {
// Bind to the OAuth Blesser service
blesserService, err := veyron_identity.BindOAuthBlesser(ctx.identdEP)
if err != nil {
log.Fatalf("Failed to bind to identity service at %v: %v", ctx.identdEP, err)
}
ctx.blesserService = blesserService
// HTTP routes
http.HandleFunc("/debug", ctx.handleDebug)
http.HandleFunc("/create-account", ctx.handleCreateAccount)
http.HandleFunc("/assoc-account", ctx.handleAssocAccount)
http.HandleFunc("/ws", ctx.handleWS)
// Everything else is a 404.
// Note: the pattern "/" matches all paths not matched by other
// registered patterns, not just the URL with Path == "/".'
// (http://golang.org/pkg/net/http/#ServeMux)
http.Handle("/", http.NotFoundHandler())
ctx.logger.VI(1).Infof("Listening on port %d.", ctx.port)
httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.port), nil)
if httpErr != nil {
log.Fatalf("Failed to HTTP serve: %s", httpErr)
}
}
func (ctx WSPR) Shutdown() {
ctx.rt.Cleanup()
}
func (ctx WSPR) CleanUpPipe(req *http.Request) {
ctx.mu.Lock()
defer ctx.mu.Unlock()
delete(ctx.pipes, req)
}
// Creates a new WebSocket Proxy object.
func NewWSPR(port int, veyronProxyEP, identdEP string, opts ...veyron2.ROpt) *WSPR {
if veyronProxyEP == "" {
log.Fatalf("a veyron proxy must be set")
}
if identdEP == "" {
log.Fatalf("an identd server must be set")
}
newrt, err := rt.New(opts...)
if err != nil {
log.Fatalf("rt.New failed: %s", err)
}
// TODO(nlacasse, bjornick) use a serializer that can actually persist.
idManager, err := identity.NewIDManager(newrt, &identity.InMemorySerializer{})
if err != nil {
log.Fatalf("identity.NewIDManager failed: %s", err)
}
return &WSPR{port: port,
veyronProxyEP: veyronProxyEP,
identdEP: identdEP,
rt: newrt,
logger: newrt.Logger(),
idManager: idManager,
pipes: map[*http.Request]*pipe{},
}
}
// HTTP Handlers
func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "")
return
}
w.Header().Set("Content-Type", "text/html")
w.Write([]byte(`<html>
<head>
<title>/debug</title>
</head>
<body>
<ul>
<li><a href="/debug/pprof">/debug/pprof</a></li>
</li></ul></body></html>
`))
}
func (ctx WSPR) handleWS(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
return
}
ctx.logger.VI(0).Info("Creating a new websocket")
p := newPipe(w, r, &ctx, nil)
if p == nil {
return
}
ctx.mu.Lock()
defer ctx.mu.Unlock()
ctx.pipes[r] = p
}
// Structs for marshalling input/output to create-account route.
type createAccountInput struct {
AccessToken string `json:"access_token"`
}
type createAccountOutput struct {
Names []string `json:"names"`
}
// Handler for creating an account in the identity manager.
// A valid OAuth2 access token must be supplied in the request body. That
// access token is exchanged for a blessing from the identd server. A new
// privateID is then derived from WSPR's privateID and the blessing. That
// privateID is stored in the identity manager. The name of the new privateID
// is returned to the client.
func (ctx WSPR) handleCreateAccount(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
return
}
// Parse request body.
var data createAccountInput
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
msg := fmt.Sprintf("Error parsing body: %v", err)
ctx.logger.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
}
// Get a blessing for the access token from identity server.
rctx, cancel := ctx.rt.NewContext().WithTimeout(time.Minute)
defer cancel()
blessingAny, err := ctx.blesserService.BlessUsingAccessToken(rctx, data.AccessToken)
if err != nil {
msg := fmt.Sprintf("Error getting blessing for access token: %v", err)
ctx.logger.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
blessing := blessingAny.(security.PublicID)
// Derive a new identity from the runtime's identity and the blessing.
identity, err := ctx.rt.Identity().Derive(blessing)
if err != nil {
msg := fmt.Sprintf("Error deriving identity: %v", err)
ctx.logger.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
for _, name := range blessing.Names() {
// Store identity in identity manager.
if err := ctx.idManager.AddAccount(name, identity); err != nil {
msg := fmt.Sprintf("Error storing identity: %v", err)
ctx.logger.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
}
// Return the names to the client.
out := createAccountOutput{
Names: blessing.Names(),
}
outJson, err := json.Marshal(out)
if err != nil {
msg := fmt.Sprintf("Error mashalling names: %v", err)
ctx.logger.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
// Success.
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, string(outJson))
}
// Struct for marshalling input to assoc-account route.
type assocAccountInput struct {
Name string `json:"name"`
Origin string `json:"origin"`
}
// Handler for associating an existing privateID with an origin.
func (ctx WSPR) handleAssocAccount(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
return
}
// Parse request body.
var data assocAccountInput
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
http.Error(w, fmt.Sprintf("Error parsing body: %v", err), http.StatusBadRequest)
}
// Store the origin.
// TODO(nlacasse, bjornick): determine what the caveats should be.
if err := ctx.idManager.AddOrigin(data.Origin, data.Name, nil); err != nil {
http.Error(w, fmt.Sprintf("Error associating account: %v", err), http.StatusBadRequest)
return
}
// Success.
fmt.Fprintf(w, "")
}