| // A simple WebSocket proxy (WSPR) that takes in a Veyron RPC message, encoded in JSON |
| // and stored in a WebSocket message, and sends it to the specified Veyron |
| // endpoint. |
| // |
| // Input arguments must be provided as a JSON message in the following format: |
| // |
| // { |
| // "Address" : String, //EndPoint Address |
| // "Name" : String, //Service Name |
| // "Method" : String, //Method Name |
| // "PrivateID" : "", //Identification |
| // "InArgs" : { "ArgName1" : ArgVal1, "ArgName2" : ArgVal2, ... }, |
| // "IsStreaming" : true/false |
| // } |
| // |
| package lib |
| |
| import ( |
| "bytes" |
| "crypto/tls" |
| "encoding/base64" |
| "encoding/binary" |
| "encoding/json" |
| "fmt" |
| "io" |
| "log" |
| "net/http" |
| _ "net/http/pprof" |
| "os" |
| "strings" |
| "sync" |
| "time" |
| |
| "veyron2" |
| "veyron2/ipc" |
| "veyron2/rt" |
| "veyron2/security" |
| "veyron2/verror" |
| "veyron2/vlog" |
| "veyron2/vom" |
| vom_wiretype "veyron2/vom/wiretype" |
| wiretype_build "veyron2/wiretype/build" |
| |
| "github.com/gorilla/websocket" |
| ) |
| |
| const ( |
| pingInterval = 50 * time.Second // how often the server pings the client. |
| pongTimeout = pingInterval + 10*time.Second // maximum wait for pong. |
| ) |
| |
| type wsprConfig struct { |
| MounttableRoot []string |
| } |
| |
| type WSPR struct { |
| tlsCert *tls.Certificate |
| clientCache *ClientCache |
| rt veyron2.Runtime |
| logger vlog.Logger |
| port int |
| veyronProxyEP string |
| } |
| |
| type responseType int |
| |
| const ( |
| responseFinal responseType = 0 |
| responseStream = 1 |
| responseError = 2 |
| responseServerRequest = 3 |
| responseStreamClose = 4 |
| ) |
| |
| // Wraps a response to the proxy client and adds a message type. |
| type response struct { |
| Type responseType |
| Message interface{} |
| } |
| |
| var logger vlog.Logger |
| |
| // The type of message sent by the JS client to the wspr. |
| type websocketMessageType int |
| |
| const ( |
| // Making a veyron client request, streaming or otherwise |
| websocketVeyronRequest websocketMessageType = 0 |
| |
| // Serving this websocket under an object name |
| websocketServe = 1 |
| |
| // A response from a service in javascript to a request |
| // from the proxy. |
| websocketServerResponse = 2 |
| |
| // Sending streaming data, either from a JS client or JS service. |
| websocketStreamingValue = 3 |
| |
| // A response that means the stream is closed by the client. |
| websocketStreamClose = 4 |
| |
| // A request to get signature of a remote server |
| websocketSignatureRequest = 5 |
| |
| // A request to stop a server |
| websocketStopServer = 6 |
| ) |
| |
| type websocketMessage struct { |
| Id int64 |
| // This contains the json encoded payload. |
| Data string |
| |
| // Whether it is an rpc request or a serve request. |
| Type websocketMessageType |
| } |
| |
| // Temporary holder of RPC so that we can store the unprocessed args. |
| type veyronTempRPC struct { |
| Name string |
| Method string |
| PrivateId string // base64(vom(security.PrivateID)) |
| InArgs []json.RawMessage |
| NumOutArgs int32 |
| IsStreaming bool |
| } |
| |
| type veyronRPC struct { |
| Name string |
| Method string |
| PrivateId string // base64(vom(security.PrivateID)) |
| InArgs []interface{} |
| NumOutArgs int32 |
| IsStreaming bool |
| } |
| |
| // A request javascript to serve undern a particular name |
| type serveRequest struct { |
| Name string |
| ServerId uint64 |
| Service JSONServiceSignature |
| } |
| |
| // The response from the javascript server to the proxy. |
| type serverRPCReply struct { |
| Results []interface{} |
| Err *verror.Standard |
| } |
| |
| // finishCall waits for the call to finish and write out the response to w. |
| func (wsp *websocketPipe) finishCall(w clientWriter, clientCall ipc.Call, msg *veyronRPC) { |
| if msg.IsStreaming { |
| for { |
| var item interface{} |
| if err := clientCall.Recv(&item); err != nil { |
| if err == io.EOF { |
| break |
| } |
| w.sendError(err) // Send streaming error as is |
| return |
| } |
| data := &response{Type: responseStream, Message: item} |
| if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil { |
| w.sendError(verror.Internalf("unable to marshal: %v", item)) |
| continue |
| } |
| if err := w.FinishMessage(); err != nil { |
| wsp.ctx.logger.Error("WSPR: error finishing message: ", err) |
| } |
| } |
| |
| if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil { |
| w.sendError(verror.Internalf("unable to marshal close stream message")) |
| } |
| if err := w.FinishMessage(); err != nil { |
| wsp.ctx.logger.Error("WSPR: error finishing message: ", err) |
| } |
| } |
| |
| results := make([]interface{}, msg.NumOutArgs) |
| // This array will have pointers to the values in result. |
| resultptrs := make([]interface{}, msg.NumOutArgs) |
| for ax := range results { |
| resultptrs[ax] = &results[ax] |
| } |
| if err := clientCall.Finish(resultptrs...); err != nil { |
| // return the call system error as is |
| w.sendError(err) |
| return |
| } |
| // for now we assume last out argument is always error |
| if len(results) < 1 { |
| w.sendError(verror.Internalf("client call did not return any results")) |
| return |
| } |
| |
| if err, ok := results[len(results)-1].(error); ok { |
| // return the call application error as is |
| w.sendError(err) |
| return |
| } |
| |
| data := response{Type: responseFinal, Message: results[0 : len(results)-1]} |
| if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil { |
| w.sendError(verror.Internalf("error marshalling results: %v", err)) |
| return |
| } |
| |
| if err := w.FinishMessage(); err != nil { |
| wsp.ctx.logger.Error("WSPR: error finishing message: ", err) |
| return |
| } |
| } |
| |
| func (ctx WSPR) newClient(privateId string) (ipc.Client, error) { |
| id := decodeIdentity(ctx.logger, privateId) |
| client := ctx.clientCache.Get(id) |
| var err error |
| if client == nil { |
| // TODO(bjornick): Use the identity to create the client. |
| client, err = ctx.rt.NewClient(veyron2.CallTimeout(ipc.NoTimeout)) |
| if err != nil { |
| return nil, fmt.Errorf("error creating client: %v", err) |
| } |
| ctx.clientCache.Put(id, client) |
| } |
| |
| return client, nil |
| } |
| |
| func (ctx WSPR) startVeyronRequest(w clientWriter, msg *veyronRPC) (ipc.Call, error) { |
| // Issue request to the endpoint. |
| client, err := ctx.newClient(msg.PrivateId) |
| if err != nil { |
| return nil, err |
| } |
| clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs) |
| |
| if err != nil { |
| return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs, err) |
| } |
| |
| return clientCall, nil |
| } |
| |
| func decodeIdentity(logger vlog.Logger, msg string) security.PrivateID { |
| if len(msg) == 0 { |
| return nil |
| } |
| // msg contains base64-encoded-vom-encoded identity.PrivateID. |
| // Pure JSON or pure VOM could not have been used. |
| // - JSON cannot be used because identity.PrivateID contains an |
| // ecdsa.PrivateKey (which encoding/json cannot decode). |
| // - Regular VOM cannot be used because it only has a binary, |
| // Go-specific implementation at this time. |
| // The "portable" encoding is base64-encoded VOM (see |
| // veyron/daemon/cmd/identity/responder/responder.go). |
| // When toddw@ has the text-based VOM encoding going, that can probably |
| // be used instead. |
| var id security.PrivateID |
| if err := vom.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(msg))).Decode(&id); err != nil { |
| logger.Error("Could not decode identity:", err) |
| return nil |
| } |
| return id |
| } |
| |
| func intToByteSlice(i int32) []byte { |
| rw := new(bytes.Buffer) |
| binary.Write(rw, binary.BigEndian, i) |
| buf := make([]byte, 4) |
| n, err := io.ReadFull(rw, buf) |
| if n != 4 || err != nil { |
| panic(fmt.Sprintf("Read less than 4 bytes: %d", n)) |
| } |
| return buf[:n] |
| } |
| |
| func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) { |
| w.Header().Set("Content-Type", "text/html") |
| w.Write([]byte(`<html> |
| <head> |
| <title>/debug</title> |
| </head> |
| <body> |
| <ul> |
| <li><a href="/debug/pprof">/debug/pprof</a></li> |
| <li><b>Client cache stats:</b> |
| `)) |
| if ctx.clientCache == nil { |
| w.Write([]byte("No ClientCache")) |
| } else { |
| w.Write([]byte(ctx.clientCache.Stats())) |
| } |
| w.Write([]byte("</li></ul></body></html>")) |
| } |
| |
| func readFromRequest(r *http.Request) (*bytes.Buffer, error) { |
| var buf bytes.Buffer |
| if readBytes, err := io.Copy(&buf, r.Body); err != nil { |
| return nil, fmt.Errorf("error copying message out of request: %v", err) |
| } else if wantBytes := r.ContentLength; readBytes != wantBytes { |
| return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes) |
| } |
| return &buf, nil |
| } |
| |
| func setAccessControl(w http.ResponseWriter) { |
| w.Header().Set("Access-Control-Allow-Origin", "*") |
| } |
| |
| type outstandingStream struct { |
| stream sender |
| inType vom.Type |
| } |
| |
| type websocketPipe struct { |
| // Protects outstandingStreams and outstandingServerRequests. |
| sync.Mutex |
| ws *websocket.Conn |
| ctx *WSPR |
| // Used to generate unique ids for requests initiated by the proxy. |
| // These ids will be even so they don't collide with the ids generated |
| // by the client. |
| lastGeneratedId int64 |
| |
| // Streams for the outstanding requests. |
| outstandingStreams map[int64]outstandingStream |
| |
| // Maps flowids to the server that owns them. |
| flowMap map[int64]*server |
| |
| // A manager that handles fetching and caching signature of remote services |
| signatureManager *signatureManager |
| |
| // We maintain multiple Veyron server per websocket pipe for serving JavaScript |
| // services. |
| servers map[uint64]*server |
| |
| // Creates a client writer for a given flow. This is a member so that tests can override |
| // the default implementation. |
| writerCreator func(id int64) clientWriter |
| } |
| |
| // Implements the serverHelper interface |
| func (wsp *websocketPipe) createNewFlow(server *server, stream sender) *flow { |
| wsp.Lock() |
| defer wsp.Unlock() |
| id := wsp.lastGeneratedId |
| wsp.lastGeneratedId += 2 |
| wsp.flowMap[id] = server |
| wsp.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}} |
| return &flow{id: id, writer: wsp.writerCreator(id)} |
| } |
| |
| func (wsp *websocketPipe) cleanupFlow(id int64) { |
| wsp.Lock() |
| defer wsp.Unlock() |
| delete(wsp.outstandingStreams, id) |
| delete(wsp.flowMap, id) |
| } |
| |
| func (wsp *websocketPipe) getLogger() vlog.Logger { |
| return wsp.ctx.logger |
| } |
| |
| func (wsp *websocketPipe) rt() veyron2.Runtime { |
| return wsp.ctx.rt |
| } |
| |
| // cleans up any outstanding rpcs. |
| func (wsp *websocketPipe) cleanup() { |
| wsp.Lock() |
| defer wsp.Unlock() |
| for _, stream := range wsp.outstandingStreams { |
| if call, ok := stream.stream.(ipc.Call); ok { |
| call.Cancel() |
| } |
| } |
| |
| for _, server := range wsp.servers { |
| server.Stop() |
| } |
| } |
| |
| func (wsp *websocketPipe) setup() { |
| wsp.signatureManager = newSignatureManager() |
| wsp.outstandingStreams = make(map[int64]outstandingStream) |
| wsp.flowMap = make(map[int64]*server) |
| wsp.servers = make(map[uint64]*server) |
| |
| if wsp.writerCreator == nil { |
| wsp.writerCreator = func(id int64) clientWriter { |
| return &websocketWriter{ws: wsp.ws, id: id, logger: wsp.ctx.logger} |
| } |
| } |
| } |
| |
| func (wsp *websocketPipe) start(w http.ResponseWriter, req *http.Request) { |
| ws, err := websocket.Upgrade(w, req, nil, 1024, 1024) |
| if _, ok := err.(websocket.HandshakeError); ok { |
| http.Error(w, "Not a websocket handshake", 400) |
| return |
| } else if err != nil { |
| http.Error(w, "Internal Error", 500) |
| wsp.ctx.logger.Errorf("websocket upgrade failed: %s", err) |
| return |
| } |
| |
| wsp.setup() |
| wsp.ws = ws |
| wsp.ws.SetPongHandler(wsp.pongHandler) |
| wsp.sendInitialMessage() |
| go wsp.readLoop() |
| go wsp.pingLoop() |
| } |
| |
| // Upon first connect, we send a message with the wsprConfig. |
| func (wsp *websocketPipe) sendInitialMessage() { |
| mounttableRoots := strings.Split(os.Getenv("NAMESPACE_ROOT"), ",") |
| if len(mounttableRoots) == 1 && mounttableRoots[0] == "" { |
| mounttableRoots = []string{} |
| } |
| msg := wsprConfig{ |
| MounttableRoot: mounttableRoots, |
| } |
| |
| wc, err := wsp.ws.NextWriter(websocket.TextMessage) |
| if err != nil { |
| wsp.ctx.logger.Errorf("failed to create websocket writer: %s", err) |
| return |
| } |
| if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil { |
| wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err) |
| return |
| } |
| wc.Close() |
| } |
| |
| func (wsp *websocketPipe) pingLoop() { |
| for { |
| time.Sleep(pingInterval) |
| wsp.ctx.logger.VI(2).Info("ws: ping") |
| if err := wsp.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { |
| wsp.ctx.logger.Error("ws: ping failed") |
| return |
| } |
| } |
| } |
| |
| func (wsp *websocketPipe) pongHandler(msg string) error { |
| wsp.ctx.logger.VI(2).Infof("ws: pong") |
| wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout)) |
| return nil |
| } |
| |
| func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w clientWriter) { |
| wsp.Lock() |
| defer wsp.Unlock() |
| stream := wsp.outstandingStreams[id].stream |
| if stream == nil { |
| w.sendError(fmt.Errorf("unknown stream")) |
| } |
| |
| stream.Send(msg, w) |
| |
| } |
| |
| // sendOnStream writes data on id's stream. Returns an error if the send failed. |
| func (wsp *websocketPipe) sendOnStream(id int64, data string, w clientWriter) { |
| wsp.Lock() |
| typ := wsp.outstandingStreams[id].inType |
| wsp.Unlock() |
| if typ == nil { |
| vlog.Errorf("no inType for stream %d (%q)", id, data) |
| return |
| } |
| payload, err := vom.JSONToObject(data, typ) |
| if err != nil { |
| vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err) |
| return |
| } |
| wsp.sendParsedMessageOnStream(id, payload, w) |
| } |
| |
| func (wsp *websocketPipe) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w clientWriter, signal chan ipc.Stream) { |
| // We have to make the start call synchronous so we can make sure that we populate |
| // the call map before we can handle a recieve call. |
| call, err := wsp.ctx.startVeyronRequest(w, veyronMsg) |
| if err != nil { |
| w.sendError(verror.Internalf("can't start Veyron Request: %v", err)) |
| return |
| } |
| |
| if signal != nil { |
| signal <- call |
| } |
| |
| wsp.finishCall(w, call, veyronMsg) |
| if signal != nil { |
| wsp.Lock() |
| delete(wsp.outstandingStreams, id) |
| wsp.Unlock() |
| } |
| } |
| |
| // handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed. |
| func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) { |
| veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data)) |
| if err != nil { |
| w.sendError(verror.Internalf("can't parse Veyron Request: %v", err)) |
| return |
| } |
| |
| wsp.Lock() |
| defer wsp.Unlock() |
| // If this rpc is streaming, we would expect that the client would try to send |
| // on this stream. Since the initial handshake is done asynchronously, we have |
| // to basically put a queueing stream in the map before we make the async call |
| // so that the future sends on the stream can see the queuing stream, even if |
| // the client call isn't actually ready yet. |
| var signal chan ipc.Stream |
| if veyronMsg.IsStreaming { |
| signal = make(chan ipc.Stream) |
| wsp.outstandingStreams[id] = outstandingStream{ |
| stream: startQueueingStream(signal), |
| inType: inStreamType, |
| } |
| } |
| go wsp.sendVeyronRequest(id, veyronMsg, w, signal) |
| } |
| |
| func (wsp *websocketPipe) closeStream(id int64) { |
| wsp.Lock() |
| defer wsp.Unlock() |
| stream := wsp.outstandingStreams[id].stream |
| if stream == nil { |
| wsp.ctx.logger.Errorf("close called on non-existent call: %v", id) |
| return |
| } |
| |
| var call queueingStream |
| var ok bool |
| if call, ok = stream.(queueingStream); !ok { |
| wsp.ctx.logger.Errorf("can't close server stream: %v", id) |
| return |
| } |
| |
| if err := call.Close(); err != nil { |
| wsp.ctx.logger.Errorf("client call close failed with: %v", err) |
| } |
| } |
| |
| func (wsp *websocketPipe) readLoop() { |
| wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout)) |
| for { |
| op, r, err := wsp.ws.NextReader() |
| if err == io.ErrUnexpectedEOF { // websocket disconnected |
| break |
| } |
| if err != nil { |
| wsp.ctx.logger.VI(1).Infof("websocket receive: %s", err) |
| break |
| } |
| |
| if op != websocket.TextMessage { |
| wsp.ctx.logger.Errorf("unexpected websocket op: %v", op) |
| } |
| |
| var msg websocketMessage |
| decoder := json.NewDecoder(r) |
| if err := decoder.Decode(&msg); err != nil { |
| errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err) |
| wsp.ctx.logger.Error(errMsg) |
| wsp.ws.WriteMessage(websocket.TextMessage, []byte(errMsg)) |
| continue |
| } |
| |
| ww := &websocketWriter{ws: wsp.ws, id: msg.Id, logger: wsp.ctx.logger} |
| |
| switch msg.Type { |
| case websocketVeyronRequest: |
| wsp.handleVeyronRequest(msg.Id, msg.Data, ww) |
| case websocketStreamingValue: |
| // This will asynchronous for a client rpc, but synchronous for a |
| // server rpc. This could be potentially bad if the server is sending |
| // back large packets. Making it asynchronous for the server, would make |
| // it difficult to guarantee that all stream messages make it to the client |
| // before the finish call. |
| // TODO(bjornick): Make the server send also asynchronous. |
| wsp.sendOnStream(msg.Id, msg.Data, ww) |
| case websocketStreamClose: |
| wsp.closeStream(msg.Id) |
| case websocketServe: |
| go wsp.handleServeRequest(msg.Data, ww) |
| case websocketStopServer: |
| go wsp.handleStopRequest(msg.Data, ww) |
| case websocketServerResponse: |
| go wsp.handleServerResponse(msg.Id, msg.Data) |
| case websocketSignatureRequest: |
| go wsp.handleSignatureRequest(msg.Data, ww) |
| default: |
| ww.sendError(verror.Unknownf("unknown message type: %v", msg.Type)) |
| } |
| } |
| wsp.cleanup() |
| } |
| |
| func (wsp *websocketPipe) maybeCreateServer(serverId uint64) (*server, error) { |
| wsp.Lock() |
| defer wsp.Unlock() |
| if server, ok := wsp.servers[serverId]; ok { |
| return server, nil |
| } |
| server, err := newServer(serverId, wsp.ctx.veyronProxyEP, wsp) |
| if err != nil { |
| return nil, err |
| } |
| wsp.servers[serverId] = server |
| return server, nil |
| } |
| |
| func (wsp *websocketPipe) removeServer(serverId uint64) { |
| wsp.Lock() |
| server := wsp.servers[serverId] |
| if server == nil { |
| wsp.Unlock() |
| return |
| } |
| delete(wsp.servers, serverId) |
| wsp.Unlock() |
| |
| server.Stop() |
| } |
| |
| func (wsp *websocketPipe) serve(serveRequest serveRequest, w clientWriter) { |
| // Create a server for the websocket pipe, if it does not exist already |
| server, err := wsp.maybeCreateServer(serveRequest.ServerId) |
| if err != nil { |
| w.sendError(verror.Internalf("error creating server: %v", err)) |
| } |
| |
| wsp.ctx.logger.VI(2).Infof("serving under name: %q", serveRequest.Name) |
| |
| endpoint, err := server.serve(serveRequest.Name, serveRequest.Service) |
| if err != nil { |
| w.sendError(verror.Internalf("error serving service: %v", err)) |
| return |
| } |
| // Send the endpoint back |
| endpointData := response{Type: responseFinal, Message: endpoint} |
| if err := vom.ObjToJSON(w, vom.ValueOf(endpointData)); err != nil { |
| w.sendError(verror.Internalf("error marshalling results: %v", err)) |
| return |
| } |
| |
| if err := w.FinishMessage(); err != nil { |
| wsp.ctx.logger.Error("WSPR: error finishing message: ", err) |
| return |
| } |
| } |
| |
| // handleServeRequest takes a request to serve a server, creates |
| // a server, registers the provided services and sends the endpoint back. |
| func (wsp *websocketPipe) handleServeRequest(data string, w *websocketWriter) { |
| // Decode the serve request which includes IDL, registered services and name |
| var serveRequest serveRequest |
| decoder := json.NewDecoder(bytes.NewBufferString(data)) |
| if err := decoder.Decode(&serveRequest); err != nil { |
| w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err)) |
| return |
| } |
| wsp.serve(serveRequest, w) |
| } |
| |
| // handleStopRequest takes a request to stop a server. |
| func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) { |
| |
| var serverId uint64 |
| decoder := json.NewDecoder(bytes.NewBufferString(data)) |
| if err := decoder.Decode(&serverId); err != nil { |
| w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err)) |
| return |
| } |
| |
| wsp.removeServer(serverId) |
| |
| // Send true to indicate stop has finished |
| result := response{Type: responseFinal, Message: true} |
| if err := vom.ObjToJSON(w, vom.ValueOf(result)); err != nil { |
| w.sendError(verror.Internalf("error marshalling results: %v", err)) |
| return |
| } |
| w.FinishMessage() |
| } |
| |
| // handleServerResponse handles the completion of outstanding calls to JavaScript services |
| // by filling the corresponding channel with the result from JavaScript. |
| func (wsp *websocketPipe) handleServerResponse(id int64, data string) { |
| wsp.Lock() |
| server := wsp.flowMap[id] |
| wsp.Unlock() |
| if server == nil { |
| wsp.ctx.logger.Errorf("unexpected result from JavaScript. No channel "+ |
| "for MessageId: %d exists. Ignoring the results.", id) |
| //Ignore unknown responses that don't belong to any channel |
| return |
| } |
| server.handleServerResponse(id, data) |
| } |
| |
| // parseVeyronRequest parses a json rpc request into a veyronRPC object. |
| func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) { |
| var tempMsg veyronTempRPC |
| decoder := json.NewDecoder(r) |
| if err := decoder.Decode(&tempMsg); err != nil { |
| return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err) |
| } |
| |
| client, err := wsp.ctx.newClient(tempMsg.PrivateId) |
| if err != nil { |
| return nil, nil, verror.Internalf("error creating client: %v", err) |
| } |
| |
| // Fetch and adapt signature from the SignatureManager |
| ctx := wsp.ctx.rt.TODOContext() |
| sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client) |
| if err != nil { |
| return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err) |
| } |
| |
| methName := uppercaseFirstCharacter(tempMsg.Method) |
| methSig, ok := sig.Methods[methName] |
| if !ok { |
| return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig) |
| } |
| |
| var msg veyronRPC |
| if len(methSig.InArgs) != len(tempMsg.InArgs) { |
| return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg) |
| } |
| msg.InArgs = make([]interface{}, len(tempMsg.InArgs)) |
| td := wiretype_build.TypeDefs(sig.TypeDefs) |
| |
| for i := 0; i < len(tempMsg.InArgs); i++ { |
| argTypeId := methSig.InArgs[i].Type |
| argType := vom_wiretype.Type{ |
| ID: argTypeId, |
| Defs: &td, |
| } |
| |
| val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType) |
| if err != nil { |
| return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err) |
| } |
| msg.InArgs[i] = val |
| } |
| |
| msg.Name = tempMsg.Name |
| msg.Method = tempMsg.Method |
| msg.PrivateId = tempMsg.PrivateId |
| msg.NumOutArgs = tempMsg.NumOutArgs |
| msg.IsStreaming = tempMsg.IsStreaming |
| |
| inStreamType := vom_wiretype.Type{ |
| ID: methSig.InStream, |
| Defs: &td, |
| } |
| |
| wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming) |
| return &msg, inStreamType, nil |
| } |
| |
| type signatureRequest struct { |
| Name string |
| PrivateId string |
| } |
| |
| func (wsp *websocketPipe) getSignature(name string, privateId string) (JSONServiceSignature, error) { |
| client, err := wsp.ctx.newClient(privateId) |
| if err != nil { |
| return nil, verror.Internalf("error creating client: %v", err) |
| } |
| |
| // Fetch and adapt signature from the SignatureManager |
| ctx := wsp.ctx.rt.TODOContext() |
| sig, err := wsp.signatureManager.signature(ctx, name, client) |
| if err != nil { |
| return nil, verror.Internalf("error getting service signature for %s: %v", name, err) |
| } |
| |
| return NewJSONServiceSignature(*sig), nil |
| } |
| |
| // handleSignatureRequest uses signature manager to get and cache signature of a remote server |
| func (wsp *websocketPipe) handleSignatureRequest(data string, w *websocketWriter) { |
| // Decode the request |
| var request signatureRequest |
| decoder := json.NewDecoder(bytes.NewBufferString(data)) |
| if err := decoder.Decode(&request); err != nil { |
| w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err)) |
| return |
| } |
| |
| wsp.ctx.logger.VI(2).Infof("requesting Signature for %q", request.Name) |
| wsp.ctx.logger.VI(2).Info("private id is", request.PrivateId) |
| jsSig, err := wsp.getSignature(request.Name, request.PrivateId) |
| if err != nil { |
| w.sendError(err) |
| return |
| } |
| |
| // Send the signature back |
| signatureData := response{Type: responseFinal, Message: jsSig} |
| if err := vom.ObjToJSON(w, vom.ValueOf(signatureData)); err != nil { |
| w.sendError(verror.Internalf("error marshalling results: %v", err)) |
| return |
| } |
| if err := w.FinishMessage(); err != nil { |
| w.logger.Error("WSPR: error finishing message: ", err) |
| return |
| } |
| } |
| |
| func (ctx *WSPR) setup() { |
| // Cache up to 20 identity.PrivateID->ipc.Client mappings |
| ctx.clientCache = NewClientCache(20) |
| } |
| |
| // Starts the proxy and listens for requests. This method is blocking. |
| func (ctx WSPR) Run() { |
| ctx.setup() |
| http.HandleFunc("/debug", ctx.handleDebug) |
| http.Handle("/favicon.ico", http.NotFoundHandler()) |
| http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { |
| pipe := &websocketPipe{ctx: &ctx} |
| pipe.start(w, r) |
| }) |
| ctx.logger.VI(1).Infof("Listening on port %d.", ctx.port) |
| httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.port), nil) |
| if httpErr != nil { |
| log.Fatalf("Failed to HTTP serve: %s", httpErr) |
| } |
| } |
| |
| func (ctx WSPR) Shutdown() { |
| ctx.rt.Cleanup() |
| } |
| |
| // Creates a new WebSocket Proxy object. |
| func NewWSPR(port int, veyronProxyEP string, opts ...veyron2.ROpt) *WSPR { |
| if veyronProxyEP == "" { |
| log.Fatalf("a veyron proxy must be set") |
| } |
| |
| newrt, err := rt.New(opts...) |
| if err != nil { |
| log.Fatalf("rt.New failed: %s", err) |
| } |
| |
| return &WSPR{port: port, veyronProxyEP: veyronProxyEP, rt: newrt, logger: newrt.Logger()} |
| } |