veyron/services/wsprd: moving wspr to a separate repository
Change-Id: I09bd1418547f38ec70fcfa2e7943f8d6e78c5fb5
diff --git a/services/wsprd/wspr/wspr.go b/services/wsprd/wspr/wspr.go
new file mode 100644
index 0000000..dd9cbf5
--- /dev/null
+++ b/services/wsprd/wspr/wspr.go
@@ -0,0 +1,285 @@
+// A simple WebSocket proxy (WSPR) that takes in a Veyron RPC message, encoded in JSON
+// and stored in a WebSocket message, and sends it to the specified Veyron
+// endpoint.
+//
+// Input arguments must be provided as a JSON message in the following format:
+//
+// {
+// "Address" : String, //EndPoint Address
+// "Name" : String, //Service Name
+// "Method" : String, //Method Name
+// "InArgs" : { "ArgName1" : ArgVal1, "ArgName2" : ArgVal2, ... },
+// "IsStreaming" : true/false
+// }
+//
+package wspr
+
+import (
+ "bytes"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ _ "net/http/pprof"
+ "sync"
+ "time"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vlog"
+
+ veyron_identity "veyron.io/veyron/veyron/services/identity"
+ "veyron.io/wspr/veyron/services/wsprd/identity"
+)
+
+const (
+ pingInterval = 50 * time.Second // how often the server pings the client.
+ pongTimeout = pingInterval + 10*time.Second // maximum wait for pong.
+)
+
+type wsprConfig struct {
+ MounttableRoot []string
+}
+
+type WSPR struct {
+ mu sync.Mutex
+ tlsCert *tls.Certificate
+ rt veyron2.Runtime
+ httpPort int // HTTP port for WSPR to serve on. Port rather than address to discourage serving in a way that isn't local.
+ logger vlog.Logger
+ listenSpec ipc.ListenSpec
+ identdEP string
+ idManager *identity.IDManager
+ blesserService veyron_identity.OAuthBlesser
+ pipes map[*http.Request]*pipe
+}
+
+var logger vlog.Logger
+
+func readFromRequest(r *http.Request) (*bytes.Buffer, error) {
+ var buf bytes.Buffer
+ if readBytes, err := io.Copy(&buf, r.Body); err != nil {
+ return nil, fmt.Errorf("error copying message out of request: %v", err)
+ } else if wantBytes := r.ContentLength; readBytes != wantBytes {
+ return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes)
+ }
+ return &buf, nil
+}
+
+func setAccessControl(w http.ResponseWriter) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+}
+
+// Starts the proxy and listens for requests. This method is blocking.
+func (ctx WSPR) Run() {
+ // Bind to the OAuth Blesser service
+ blesserService, err := veyron_identity.BindOAuthBlesser(ctx.identdEP)
+ if err != nil {
+ log.Fatalf("Failed to bind to identity service at %v: %v", ctx.identdEP, err)
+ }
+ ctx.blesserService = blesserService
+
+ // HTTP routes
+ http.HandleFunc("/debug", ctx.handleDebug)
+ http.HandleFunc("/create-account", ctx.handleCreateAccount)
+ http.HandleFunc("/assoc-account", ctx.handleAssocAccount)
+ http.HandleFunc("/ws", ctx.handleWS)
+ // Everything else is a 404.
+ // Note: the pattern "/" matches all paths not matched by other
+ // registered patterns, not just the URL with Path == "/".'
+ // (http://golang.org/pkg/net/http/#ServeMux)
+ http.Handle("/", http.NotFoundHandler())
+ ctx.logger.VI(1).Infof("Listening at port %d.", ctx.httpPort)
+ httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.httpPort), nil)
+ if httpErr != nil {
+ log.Fatalf("Failed to HTTP serve: %s", httpErr)
+ }
+}
+
+func (ctx WSPR) Shutdown() {
+ ctx.rt.Cleanup()
+}
+
+func (ctx WSPR) CleanUpPipe(req *http.Request) {
+ ctx.mu.Lock()
+ defer ctx.mu.Unlock()
+ delete(ctx.pipes, req)
+}
+
+// Creates a new WebSocket Proxy object.
+func NewWSPR(httpPort int, listenSpec ipc.ListenSpec, identdEP string, opts ...veyron2.ROpt) *WSPR {
+ if listenSpec.Proxy == "" {
+ log.Fatalf("a veyron proxy must be set")
+ }
+ if identdEP == "" {
+ log.Fatalf("an identd server must be set")
+ }
+
+ newrt, err := rt.New(opts...)
+ if err != nil {
+ log.Fatalf("rt.New failed: %s", err)
+ }
+
+ // TODO(nlacasse, bjornick) use a serializer that can actually persist.
+ idManager, err := identity.NewIDManager(newrt, &identity.InMemorySerializer{})
+ if err != nil {
+ log.Fatalf("identity.NewIDManager failed: %s", err)
+ }
+
+ return &WSPR{
+ httpPort: httpPort,
+ listenSpec: listenSpec,
+ identdEP: identdEP,
+ rt: newrt,
+ logger: newrt.Logger(),
+ idManager: idManager,
+ pipes: map[*http.Request]*pipe{},
+ }
+}
+
+// HTTP Handlers
+
+func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ fmt.Fprintf(w, "")
+ return
+ }
+ w.Header().Set("Content-Type", "text/html")
+ w.Write([]byte(`<html>
+<head>
+<title>/debug</title>
+</head>
+<body>
+<ul>
+<li><a href="/debug/pprof">/debug/pprof</a></li>
+</li></ul></body></html>
+`))
+}
+
+func (ctx WSPR) handleWS(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+ ctx.logger.VI(0).Info("Creating a new websocket")
+ p := newPipe(w, r, &ctx, nil)
+
+ if p == nil {
+ return
+ }
+ ctx.mu.Lock()
+ defer ctx.mu.Unlock()
+ ctx.pipes[r] = p
+}
+
+// Structs for marshalling input/output to create-account route.
+type createAccountInput struct {
+ AccessToken string `json:"access_token"`
+}
+
+type createAccountOutput struct {
+ Names []string `json:"names"`
+}
+
+// Handler for creating an account in the identity manager.
+// A valid OAuth2 access token must be supplied in the request body. That
+// access token is exchanged for a blessing from the identd server. A new
+// privateID is then derived from WSPR's privateID and the blessing. That
+// privateID is stored in the identity manager. The name of the new privateID
+// is returned to the client.
+func (ctx WSPR) handleCreateAccount(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse request body.
+ var data createAccountInput
+ if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
+ msg := fmt.Sprintf("Error parsing body: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ }
+
+ // Get a blessing for the access token from identity server.
+ rctx, cancel := ctx.rt.NewContext().WithTimeout(time.Minute)
+ defer cancel()
+ blessingAny, err := ctx.blesserService.BlessUsingAccessToken(rctx, data.AccessToken)
+ if err != nil {
+ msg := fmt.Sprintf("Error getting blessing for access token: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+ blessing := blessingAny.(security.PublicID)
+
+ // Derive a new identity from the runtime's identity and the blessing.
+ identity, err := ctx.rt.Identity().Derive(blessing)
+ if err != nil {
+ msg := fmt.Sprintf("Error deriving identity: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+
+ for _, name := range blessing.Names() {
+ // Store identity in identity manager.
+ if err := ctx.idManager.AddAccount(name, identity); err != nil {
+ msg := fmt.Sprintf("Error storing identity: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+ }
+
+ // Return the names to the client.
+ out := createAccountOutput{
+ Names: blessing.Names(),
+ }
+ outJson, err := json.Marshal(out)
+ if err != nil {
+ msg := fmt.Sprintf("Error mashalling names: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusInternalServerError)
+ return
+ }
+
+ // Success.
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintf(w, string(outJson))
+}
+
+// Struct for marshalling input to assoc-account route.
+type assocAccountInput struct {
+ Name string `json:"name"`
+ Origin string `json:"origin"`
+}
+
+// Handler for associating an existing privateID with an origin.
+func (ctx WSPR) handleAssocAccount(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse request body.
+ var data assocAccountInput
+ if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
+ http.Error(w, fmt.Sprintf("Error parsing body: %v", err), http.StatusBadRequest)
+ }
+
+ // Store the origin.
+ // TODO(nlacasse, bjornick): determine what the caveats should be.
+ if err := ctx.idManager.AddOrigin(data.Origin, data.Name, nil); err != nil {
+ http.Error(w, fmt.Sprintf("Error associating account: %v", err), http.StatusBadRequest)
+ return
+ }
+
+ // Success.
+ fmt.Fprintf(w, "")
+}