// A simple WebSocket proxy (WSPR) that takes in a Veyron RPC message, encoded in JSON
// and stored in a WebSocket message, and sends it to the specified Veyron
// endpoint.
//
// Input arguments must be provided as a JSON message in the following format:
//
// {
//	 "Address" : String, //EndPoint Address
//   "Name" : String, //Service Name
//   "Method"   : String, //Method Name
//   "PrivateID" : "", //Identification
//   "InArgs"     : { "ArgName1" : ArgVal1, "ArgName2" : ArgVal2, ... },
//   "IsStreaming" : true/false
// }
//
package lib

import (
	"bytes"
	"crypto/tls"
	"encoding/base64"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	_ "net/http/pprof"
	"os"
	"strings"
	"sync"
	"time"

	"veyron2"
	"veyron2/ipc"
	"veyron2/rt"
	"veyron2/security"
	"veyron2/verror"
	"veyron2/vlog"
	"veyron2/vom"
	vom_wiretype "veyron2/vom/wiretype"
	wiretype_build "veyron2/wiretype/build"

	"github.com/gorilla/websocket"
)

const (
	pingInterval = 50 * time.Second              // how often the server pings the client.
	pongTimeout  = pingInterval + 10*time.Second // maximum wait for pong.
)

type wsprConfig struct {
	MounttableRoot []string
}

type WSPR struct {
	tlsCert       *tls.Certificate
	clientCache   *ClientCache
	rt            veyron2.Runtime
	logger        vlog.Logger
	port          int
	veyronProxyEP string
}

type responseType int

const (
	responseFinal         responseType = 0
	responseStream                     = 1
	responseError                      = 2
	responseServerRequest              = 3
	responseStreamClose                = 4
)

// Wraps a response to the proxy client and adds a message type.
type response struct {
	Type    responseType
	Message interface{}
}

var logger vlog.Logger

// The type of message sent by the JS client to the wspr.
type websocketMessageType int

const (
	// Making a veyron client request, streaming or otherwise
	websocketVeyronRequest websocketMessageType = 0

	// Serving this websocket under an object name
	websocketServe = 1

	// A response from a service in javascript to a request
	// from the proxy.
	websocketServerResponse = 2

	// Sending streaming data, either from a JS client or JS service.
	websocketStreamingValue = 3

	// A response that means the stream is closed by the client.
	websocketStreamClose = 4

	// A request to get signature of a remote server
	websocketSignatureRequest = 5

	// A request to stop a server
	websocketStopServer = 6
)

type websocketMessage struct {
	Id int64
	// This contains the json encoded payload.
	Data string

	// Whether it is an rpc request or a serve request.
	Type websocketMessageType
}

// Temporary holder of RPC so that we can store the unprocessed args.
type veyronTempRPC struct {
	Name        string
	Method      string
	PrivateId   string // base64(vom(security.PrivateID))
	InArgs      []json.RawMessage
	NumOutArgs  int32
	IsStreaming bool
}

type veyronRPC struct {
	Name        string
	Method      string
	PrivateId   string // base64(vom(security.PrivateID))
	InArgs      []interface{}
	NumOutArgs  int32
	IsStreaming bool
}

// A request javascript to serve undern a particular name
type serveRequest struct {
	Name     string
	ServerId uint64
	Service  JSONServiceSignature
}

// The response from the javascript server to the proxy.
type serverRPCReply struct {
	Results []interface{}
	Err     *verror.Standard
}

// finishCall waits for the call to finish and write out the response to w.
func (wsp *websocketPipe) finishCall(w clientWriter, clientCall ipc.Call, msg *veyronRPC) {
	if msg.IsStreaming {
		for {
			var item interface{}
			if err := clientCall.Recv(&item); err != nil {
				if err == io.EOF {
					break
				}
				w.sendError(err) // Send streaming error as is
				return
			}
			data := &response{Type: responseStream, Message: item}
			if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
				w.sendError(verror.Internalf("unable to marshal: %v", item))
				continue
			}
			if err := w.FinishMessage(); err != nil {
				wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
			}
		}

		if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil {
			w.sendError(verror.Internalf("unable to marshal close stream message"))
		}
		if err := w.FinishMessage(); err != nil {
			wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
		}
	}

	results := make([]interface{}, msg.NumOutArgs)
	// This array will have pointers to the values in result.
	resultptrs := make([]interface{}, msg.NumOutArgs)
	for ax := range results {
		resultptrs[ax] = &results[ax]
	}
	if err := clientCall.Finish(resultptrs...); err != nil {
		// return the call system error as is
		w.sendError(err)
		return
	}
	// for now we assume last out argument is always error
	if len(results) < 1 {
		w.sendError(verror.Internalf("client call did not return any results"))
		return
	}

	if err, ok := results[len(results)-1].(error); ok {
		// return the call application error as is
		w.sendError(err)
		return
	}

	data := response{Type: responseFinal, Message: results[0 : len(results)-1]}
	if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
		w.sendError(verror.Internalf("error marshalling results: %v", err))
		return
	}

	if err := w.FinishMessage(); err != nil {
		wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
		return
	}
}

func (ctx WSPR) newClient(privateId string) (ipc.Client, error) {
	id := decodeIdentity(ctx.logger, privateId)
	client := ctx.clientCache.Get(id)
	var err error
	if client == nil {
		// TODO(bjornick): Use the identity to create the client.
		client, err = ctx.rt.NewClient()
		if err != nil {
			return nil, fmt.Errorf("error creating client: %v", err)
		}
		ctx.clientCache.Put(id, client)
	}

	return client, nil
}

func (ctx WSPR) startVeyronRequest(w clientWriter, msg *veyronRPC) (ipc.Call, error) {
	// Issue request to the endpoint.
	client, err := ctx.newClient(msg.PrivateId)
	if err != nil {
		return nil, err
	}
	clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs)

	if err != nil {
		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs, err)
	}

	return clientCall, nil
}

func decodeIdentity(logger vlog.Logger, msg string) security.PrivateID {
	if len(msg) == 0 {
		return nil
	}
	// msg contains base64-encoded-vom-encoded identity.PrivateID.
	// Pure JSON or pure VOM could not have been used.
	// - JSON cannot be used because identity.PrivateID contains an
	//   ecdsa.PrivateKey (which encoding/json cannot decode).
	// - Regular VOM cannot be used because it only has a binary,
	//   Go-specific implementation at this time.
	// The "portable" encoding is base64-encoded VOM (see
	// veyron/daemon/cmd/identity/responder/responder.go).
	// When toddw@ has the text-based VOM encoding going, that can probably
	// be used instead.
	var id security.PrivateID
	if err := vom.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(msg))).Decode(&id); err != nil {
		logger.Error("Could not decode identity:", err)
		return nil
	}
	return id
}

func intToByteSlice(i int32) []byte {
	rw := new(bytes.Buffer)
	binary.Write(rw, binary.BigEndian, i)
	buf := make([]byte, 4)
	n, err := io.ReadFull(rw, buf)
	if n != 4 || err != nil {
		panic(fmt.Sprintf("Read less than 4 bytes: %d", n))
	}
	return buf[:n]
}

func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/html")
	w.Write([]byte(`<html>
<head>
<title>/debug</title>
</head>
<body>
<ul>
<li><a href="/debug/pprof">/debug/pprof</a></li>
<li><b>Client cache stats:</b>
`))
	if ctx.clientCache == nil {
		w.Write([]byte("No ClientCache"))
	} else {
		w.Write([]byte(ctx.clientCache.Stats()))
	}
	w.Write([]byte("</li></ul></body></html>"))
}

func readFromRequest(r *http.Request) (*bytes.Buffer, error) {
	var buf bytes.Buffer
	if readBytes, err := io.Copy(&buf, r.Body); err != nil {
		return nil, fmt.Errorf("error copying message out of request: %v", err)
	} else if wantBytes := r.ContentLength; readBytes != wantBytes {
		return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes)
	}
	return &buf, nil
}

func setAccessControl(w http.ResponseWriter) {
	w.Header().Set("Access-Control-Allow-Origin", "*")
}

type outstandingStream struct {
	stream sender
	inType vom.Type
}

type websocketPipe struct {
	// Protects outstandingStreams and outstandingServerRequests.
	sync.Mutex
	ws  *websocket.Conn
	ctx *WSPR
	// Used to generate unique ids for requests initiated by the proxy.
	// These ids will be even so they don't collide with the ids generated
	// by the client.
	lastGeneratedId int64

	// Streams for the outstanding requests.
	outstandingStreams map[int64]outstandingStream

	// Maps flowids to the server that owns them.
	flowMap map[int64]*server

	// A manager that handles fetching and caching signature of remote services
	signatureManager *signatureManager

	// We maintain multiple Veyron server per websocket pipe for serving JavaScript
	// services.
	servers map[uint64]*server

	// Creates a client writer for a given flow.  This is a member so that tests can override
	// the default implementation.
	writerCreator func(id int64) clientWriter
}

// Implements the serverHelper interface
func (wsp *websocketPipe) createNewFlow(server *server, stream sender) *flow {
	wsp.Lock()
	defer wsp.Unlock()
	id := wsp.lastGeneratedId
	wsp.lastGeneratedId += 2
	wsp.flowMap[id] = server
	wsp.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}}
	return &flow{id: id, writer: wsp.writerCreator(id)}
}

func (wsp *websocketPipe) cleanupFlow(id int64) {
	wsp.Lock()
	defer wsp.Unlock()
	delete(wsp.outstandingStreams, id)
	delete(wsp.flowMap, id)
}

func (wsp *websocketPipe) getLogger() vlog.Logger {
	return wsp.ctx.logger
}

func (wsp *websocketPipe) rt() veyron2.Runtime {
	return wsp.ctx.rt
}

// cleans up any outstanding rpcs.
func (wsp *websocketPipe) cleanup() {
	wsp.Lock()
	defer wsp.Unlock()
	for _, stream := range wsp.outstandingStreams {
		if call, ok := stream.stream.(ipc.Call); ok {
			call.Cancel()
		}
	}

	for _, server := range wsp.servers {
		server.Stop()
	}
}

func (wsp *websocketPipe) setup() {
	wsp.signatureManager = newSignatureManager()
	wsp.outstandingStreams = make(map[int64]outstandingStream)
	wsp.flowMap = make(map[int64]*server)
	wsp.servers = make(map[uint64]*server)

	if wsp.writerCreator == nil {
		wsp.writerCreator = func(id int64) clientWriter {
			return &websocketWriter{ws: wsp.ws, id: id, logger: wsp.ctx.logger}
		}
	}
}

func (wsp *websocketPipe) start(w http.ResponseWriter, req *http.Request) {
	ws, err := websocket.Upgrade(w, req, nil, 1024, 1024)
	if _, ok := err.(websocket.HandshakeError); ok {
		http.Error(w, "Not a websocket handshake", 400)
		return
	} else if err != nil {
		http.Error(w, "Internal Error", 500)
		wsp.ctx.logger.Errorf("websocket upgrade failed: %s", err)
		return
	}

	wsp.setup()
	wsp.ws = ws
	wsp.ws.SetPongHandler(wsp.pongHandler)
	wsp.sendInitialMessage()
	go wsp.readLoop()
	go wsp.pingLoop()
}

// Upon first connect, we send a message with the wsprConfig.
func (wsp *websocketPipe) sendInitialMessage() {
	mounttableRoots := strings.Split(os.Getenv("NAMESPACE_ROOT"), ",")
	if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
		mounttableRoots = []string{}
	}
	msg := wsprConfig{
		MounttableRoot: mounttableRoots,
	}

	wc, err := wsp.ws.NextWriter(websocket.TextMessage)
	if err != nil {
		wsp.ctx.logger.Errorf("failed to create websocket writer: %s", err)
		return
	}
	if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
		wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err)
		return
	}
	wc.Close()
}

func (wsp *websocketPipe) pingLoop() {
	for {
		time.Sleep(pingInterval)
		wsp.ctx.logger.VI(2).Info("ws: ping")
		if err := wsp.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
			wsp.ctx.logger.Error("ws: ping failed")
			return
		}
	}
}

func (wsp *websocketPipe) pongHandler(msg string) error {
	wsp.ctx.logger.VI(2).Infof("ws: pong")
	wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout))
	return nil
}

func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w clientWriter) {
	wsp.Lock()
	defer wsp.Unlock()
	stream := wsp.outstandingStreams[id].stream
	if stream == nil {
		w.sendError(fmt.Errorf("unknown stream"))
	}

	stream.Send(msg, w)

}

// sendOnStream writes data on id's stream.  Returns an error if the send failed.
func (wsp *websocketPipe) sendOnStream(id int64, data string, w clientWriter) {
	wsp.Lock()
	typ := wsp.outstandingStreams[id].inType
	wsp.Unlock()
	if typ == nil {
		vlog.Errorf("no inType for stream %d (%q)", id, data)
		return
	}
	payload, err := vom.JSONToObject(data, typ)
	if err != nil {
		vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
		return
	}
	wsp.sendParsedMessageOnStream(id, payload, w)
}

func (wsp *websocketPipe) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w clientWriter, signal chan ipc.Stream) {
	// We have to make the start call synchronous so we can make sure that we populate
	// the call map before we can handle a recieve call.
	call, err := wsp.ctx.startVeyronRequest(w, veyronMsg)
	if err != nil {
		w.sendError(verror.Internalf("can't start Veyron Request: %v", err))
		return
	}

	if signal != nil {
		signal <- call
	}

	wsp.finishCall(w, call, veyronMsg)
	if signal != nil {
		wsp.Lock()
		delete(wsp.outstandingStreams, id)
		wsp.Unlock()
	}
}

// handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
	veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
	if err != nil {
		w.sendError(verror.Internalf("can't parse Veyron Request: %v", err))
		return
	}

	wsp.Lock()
	defer wsp.Unlock()
	// If this rpc is streaming, we would expect that the client would try to send
	// on this stream.  Since the initial handshake is done asynchronously, we have
	// to basically put a queueing stream in the map before we make the async call
	// so that the future sends on the stream can see the queuing stream, even if
	// the client call isn't actually ready yet.
	var signal chan ipc.Stream
	if veyronMsg.IsStreaming {
		signal = make(chan ipc.Stream)
		wsp.outstandingStreams[id] = outstandingStream{
			stream: startQueueingStream(signal),
			inType: inStreamType,
		}
	}
	go wsp.sendVeyronRequest(id, veyronMsg, w, signal)
}

func (wsp *websocketPipe) closeStream(id int64) {
	wsp.Lock()
	defer wsp.Unlock()
	stream := wsp.outstandingStreams[id].stream
	if stream == nil {
		wsp.ctx.logger.Errorf("close called on non-existent call: %v", id)
		return
	}

	var call queueingStream
	var ok bool
	if call, ok = stream.(queueingStream); !ok {
		wsp.ctx.logger.Errorf("can't close server stream: %v", id)
		return
	}

	if err := call.Close(); err != nil {
		wsp.ctx.logger.Errorf("client call close failed with: %v", err)
	}
}

func (wsp *websocketPipe) readLoop() {
	wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout))
	for {
		op, r, err := wsp.ws.NextReader()
		if err == io.ErrUnexpectedEOF { // websocket disconnected
			break
		}
		if err != nil {
			wsp.ctx.logger.VI(1).Infof("websocket receive: %s", err)
			break
		}

		if op != websocket.TextMessage {
			wsp.ctx.logger.Errorf("unexpected websocket op: %v", op)
		}

		var msg websocketMessage
		decoder := json.NewDecoder(r)
		if err := decoder.Decode(&msg); err != nil {
			errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
			wsp.ctx.logger.Error(errMsg)
			wsp.ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
			continue
		}

		ww := &websocketWriter{ws: wsp.ws, id: msg.Id, logger: wsp.ctx.logger}

		switch msg.Type {
		case websocketVeyronRequest:
			wsp.handleVeyronRequest(msg.Id, msg.Data, ww)
		case websocketStreamingValue:
			// This will asynchronous for a client rpc, but synchronous for a
			// server rpc.  This could be potentially bad if the server is sending
			// back large packets.  Making it asynchronous for the server, would make
			// it difficult to guarantee that all stream messages make it to the client
			// before the finish call.
			// TODO(bjornick): Make the server send also asynchronous.
			wsp.sendOnStream(msg.Id, msg.Data, ww)
		case websocketStreamClose:
			wsp.closeStream(msg.Id)
		case websocketServe:
			go wsp.handleServeRequest(msg.Data, ww)
		case websocketStopServer:
			go wsp.handleStopRequest(msg.Data, ww)
		case websocketServerResponse:
			go wsp.handleServerResponse(msg.Id, msg.Data)
		case websocketSignatureRequest:
			go wsp.handleSignatureRequest(msg.Data, ww)
		default:
			ww.sendError(verror.Unknownf("unknown message type: %v", msg.Type))
		}
	}
	wsp.cleanup()
}

func (wsp *websocketPipe) maybeCreateServer(serverId uint64) (*server, error) {
	wsp.Lock()
	defer wsp.Unlock()
	if server, ok := wsp.servers[serverId]; ok {
		return server, nil
	}
	server, err := newServer(serverId, wsp.ctx.veyronProxyEP, wsp)
	if err != nil {
		return nil, err
	}
	wsp.servers[serverId] = server
	return server, nil
}

func (wsp *websocketPipe) removeServer(serverId uint64) {
	wsp.Lock()
	server := wsp.servers[serverId]
	if server == nil {
		wsp.Unlock()
		return
	}
	delete(wsp.servers, serverId)
	wsp.Unlock()

	server.Stop()
}

func (wsp *websocketPipe) serve(serveRequest serveRequest, w clientWriter) {
	// Create a server for the websocket pipe, if it does not exist already
	server, err := wsp.maybeCreateServer(serveRequest.ServerId)
	if err != nil {
		w.sendError(verror.Internalf("error creating server: %v", err))
	}

	wsp.ctx.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)

	endpoint, err := server.serve(serveRequest.Name, serveRequest.Service)
	if err != nil {
		w.sendError(verror.Internalf("error serving service: %v", err))
		return
	}
	// Send the endpoint back
	endpointData := response{Type: responseFinal, Message: endpoint}
	if err := vom.ObjToJSON(w, vom.ValueOf(endpointData)); err != nil {
		w.sendError(verror.Internalf("error marshalling results: %v", err))
		return
	}

	if err := w.FinishMessage(); err != nil {
		wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
		return
	}
}

// handleServeRequest takes a request to serve a server, creates
// a server, registers the provided services and sends the endpoint back.
func (wsp *websocketPipe) handleServeRequest(data string, w *websocketWriter) {
	// Decode the serve request which includes IDL, registered services and name
	var serveRequest serveRequest
	decoder := json.NewDecoder(bytes.NewBufferString(data))
	if err := decoder.Decode(&serveRequest); err != nil {
		w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
		return
	}
	wsp.serve(serveRequest, w)
}

// handleStopRequest takes a request to stop a server.
func (wsp *websocketPipe) handleStopRequest(data string, w *websocketWriter) {

	var serverId uint64
	decoder := json.NewDecoder(bytes.NewBufferString(data))
	if err := decoder.Decode(&serverId); err != nil {
		w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
		return
	}

	wsp.removeServer(serverId)

	// Send true to indicate stop has finished
	result := response{Type: responseFinal, Message: true}
	if err := vom.ObjToJSON(w, vom.ValueOf(result)); err != nil {
		w.sendError(verror.Internalf("error marshalling results: %v", err))
		return
	}
	w.FinishMessage()
}

// handleServerResponse handles the completion of outstanding calls to JavaScript services
// by filling the corresponding channel with the result from JavaScript.
func (wsp *websocketPipe) handleServerResponse(id int64, data string) {
	wsp.Lock()
	server := wsp.flowMap[id]
	wsp.Unlock()
	if server == nil {
		wsp.ctx.logger.Errorf("unexpected result from JavaScript. No channel "+
			"for MessageId: %d exists. Ignoring the results.", id)
		//Ignore unknown responses that don't belong to any channel
		return
	}
	server.handleServerResponse(id, data)
}

// parseVeyronRequest parses a json rpc request into a veyronRPC object.
func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) {
	var tempMsg veyronTempRPC
	decoder := json.NewDecoder(r)
	if err := decoder.Decode(&tempMsg); err != nil {
		return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
	}

	client, err := wsp.ctx.newClient(tempMsg.PrivateId)
	if err != nil {
		return nil, nil, verror.Internalf("error creating client: %v", err)
	}

	// Fetch and adapt signature from the SignatureManager
	ctx := wsp.ctx.rt.TODOContext()
	sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client)
	if err != nil {
		return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
	}

	methName := uppercaseFirstCharacter(tempMsg.Method)
	methSig, ok := sig.Methods[methName]
	if !ok {
		return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
	}

	var msg veyronRPC
	if len(methSig.InArgs) != len(tempMsg.InArgs) {
		return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
	}
	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
	td := wiretype_build.TypeDefs(sig.TypeDefs)

	for i := 0; i < len(tempMsg.InArgs); i++ {
		argTypeId := methSig.InArgs[i].Type
		argType := vom_wiretype.Type{
			ID:   argTypeId,
			Defs: &td,
		}

		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
		if err != nil {
			return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
		}
		msg.InArgs[i] = val
	}

	msg.Name = tempMsg.Name
	msg.Method = tempMsg.Method
	msg.PrivateId = tempMsg.PrivateId
	msg.NumOutArgs = tempMsg.NumOutArgs
	msg.IsStreaming = tempMsg.IsStreaming

	inStreamType := vom_wiretype.Type{
		ID:   methSig.InStream,
		Defs: &td,
	}

	wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming)
	return &msg, inStreamType, nil
}

type signatureRequest struct {
	Name      string
	PrivateId string
}

func (wsp *websocketPipe) getSignature(name string, privateId string) (JSONServiceSignature, error) {
	client, err := wsp.ctx.newClient(privateId)
	if err != nil {
		return nil, verror.Internalf("error creating client: %v", err)
	}

	// Fetch and adapt signature from the SignatureManager
	ctx := wsp.ctx.rt.TODOContext()
	sig, err := wsp.signatureManager.signature(ctx, name, client)
	if err != nil {
		return nil, verror.Internalf("error getting service signature for %s: %v", name, err)
	}

	return NewJSONServiceSignature(*sig), nil
}

// handleSignatureRequest uses signature manager to get and cache signature of a remote server
func (wsp *websocketPipe) handleSignatureRequest(data string, w *websocketWriter) {
	// Decode the request
	var request signatureRequest
	decoder := json.NewDecoder(bytes.NewBufferString(data))
	if err := decoder.Decode(&request); err != nil {
		w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
		return
	}

	wsp.ctx.logger.VI(2).Infof("requesting Signature for %q", request.Name)
	wsp.ctx.logger.VI(2).Info("private id is", request.PrivateId)
	jsSig, err := wsp.getSignature(request.Name, request.PrivateId)
	if err != nil {
		w.sendError(err)
		return
	}

	// Send the signature back
	signatureData := response{Type: responseFinal, Message: jsSig}
	if err := vom.ObjToJSON(w, vom.ValueOf(signatureData)); err != nil {
		w.sendError(verror.Internalf("error marshalling results: %v", err))
		return
	}
	if err := w.FinishMessage(); err != nil {
		w.logger.Error("WSPR: error finishing message: ", err)
		return
	}
}

func (ctx *WSPR) setup() {
	// Cache up to 20 identity.PrivateID->ipc.Client mappings
	ctx.clientCache = NewClientCache(20)
}

// Starts the proxy and listens for requests. This method is blocking.
func (ctx WSPR) Run() {
	ctx.setup()
	http.HandleFunc("/debug", ctx.handleDebug)
	http.Handle("/favicon.ico", http.NotFoundHandler())
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		pipe := &websocketPipe{ctx: &ctx}
		pipe.start(w, r)
	})
	ctx.logger.VI(1).Infof("Listening on port %d.", ctx.port)
	httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.port), nil)
	if httpErr != nil {
		log.Fatalf("Failed to HTTP serve: %s", httpErr)
	}
}

func (ctx WSPR) Shutdown() {
	ctx.rt.Cleanup()
}

// Creates a new WebSocket Proxy object.
func NewWSPR(port int, veyronProxyEP string, opts ...veyron2.ROpt) *WSPR {
	if veyronProxyEP == "" {
		log.Fatalf("a veyron proxy must be set")
	}

	newrt, err := rt.New(opts...)
	if err != nil {
		log.Fatalf("rt.New failed: %s", err)
	}

	return &WSPR{port: port, veyronProxyEP: veyronProxyEP, rt: newrt, logger: newrt.Logger()}
}
