veyron/services/wsprd: Refactored wspr so the logic to handle the
translation of javascript request to veyron and vice versa is separate
from the code that deals with websockets.  This will allow us to share
the code for an websocket based implementation and a nacl based
implementation.  This CL also changes the logic to allow only one
privateID per javascript app.

Change-Id: I7b6021bef62563f20a3bde752ae03f3de71c534f
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
new file mode 100644
index 0000000..c7ba3c1
--- /dev/null
+++ b/services/wsprd/app/app.go
@@ -0,0 +1,546 @@
+// The app package contains the struct that keeps per javascript app state and handles translating
+// javascript requests to veyron requests and vice versa.
+package app
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"sync"
+
+	"veyron/services/wsprd/ipc/client"
+	"veyron/services/wsprd/ipc/server"
+	"veyron/services/wsprd/ipc/stream"
+	"veyron/services/wsprd/lib"
+	"veyron/services/wsprd/signature"
+	"veyron2"
+	"veyron2/ipc"
+	"veyron2/rt"
+	"veyron2/security"
+	"veyron2/verror"
+	"veyron2/vlog"
+	"veyron2/vom"
+	vom_wiretype "veyron2/vom/wiretype"
+	wiretype_build "veyron2/wiretype/build"
+)
+
+// Temporary holder of RPC so that we can store the unprocessed args.
+type veyronTempRPC struct {
+	Name        string
+	Method      string
+	InArgs      []json.RawMessage
+	NumOutArgs  int32
+	IsStreaming bool
+}
+
+type veyronRPC struct {
+	Name        string
+	Method      string
+	InArgs      []interface{}
+	NumOutArgs  int32
+	IsStreaming bool
+}
+
+// A request javascript to serve undern a particular name
+type serveRequest struct {
+	Name     string
+	ServerId uint64
+	Service  signature.JSONServiceSignature
+}
+
+type outstandingStream struct {
+	stream stream.Sender
+	inType vom.Type
+}
+
+// Controller represents all the state of a Veyron Web App.  This is the struct
+// that is in charge performing all the veyron options.
+type Controller struct {
+	// Protects outstandingStreams and outstandingServerRequests.
+	sync.Mutex
+
+	logger vlog.Logger
+
+	// A set of options that will be used to construct a runtime.  This will
+	// eventually be removed, since we should construct the runtime in
+	// the constructor instead of at a later point.
+	opts []veyron2.ROpt
+
+	// The runtime to use to create new clients.
+	rt veyron2.Runtime
+
+	// Used to generate unique ids for requests initiated by the proxy.
+	// These ids will be even so they don't collide with the ids generated
+	// by the client.
+	lastGeneratedId int64
+
+	// Streams for the outstanding requests.
+	outstandingStreams map[int64]outstandingStream
+
+	// Maps flowids to the server that owns them.
+	flowMap map[int64]*server.Server
+
+	// A manager that Handles fetching and caching signature of remote services
+	signatureManager lib.SignatureManager
+
+	// We maintain multiple Veyron server per websocket pipe for serving JavaScript
+	// services.
+	servers map[uint64]*server.Server
+
+	// Creates a client writer for a given flow.  This is a member so that tests can override
+	// the default implementation.
+	writerCreator func(id int64) lib.ClientWriter
+
+	// There is only one client per Controller since there is only one identity per app.
+	client ipc.Client
+
+	veyronProxyEP string
+
+	// privateId associated with the app.
+	// TODO(bjornick): We probably don't need the identity anymore. Verify and then remove.
+	privateId security.PrivateID
+}
+
+// NewController creates a new Controller.  writerCreator will be used to create a new flow for rpcs to
+// javascript server. veyronProxyEP is an endpoint for the veyron proxy to serve through.  It can't be empty.
+// opts are any options that should be passed to the rt.New(), such as the mounttable root.
+func NewController(writerCreator func(id int64) lib.ClientWriter, veyronProxyEP string, opts ...veyron2.ROpt) *Controller {
+	controller := &Controller{writerCreator: writerCreator, veyronProxyEP: veyronProxyEP, opts: opts}
+	controller.setup()
+	return controller
+}
+
+// finishCall waits for the call to finish and write out the response to w.
+func (c *Controller) finishCall(w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
+	if msg.IsStreaming {
+		for {
+			var item interface{}
+			if err := clientCall.Recv(&item); err != nil {
+				if err == io.EOF {
+					break
+				}
+				w.Error(err) // Send streaming error as is
+				return
+			}
+			if err := w.Send(lib.ResponseStream, item); err != nil {
+				w.Error(verror.Internalf("unable to marshal: %v", item))
+			}
+		}
+
+		if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+			w.Error(verror.Internalf("unable to marshal close stream message"))
+		}
+	}
+
+	results := make([]interface{}, msg.NumOutArgs)
+	// This array will have pointers to the values in result.
+	resultptrs := make([]interface{}, msg.NumOutArgs)
+	for ax := range results {
+		resultptrs[ax] = &results[ax]
+	}
+	if err := clientCall.Finish(resultptrs...); err != nil {
+		// return the call system error as is
+		w.Error(err)
+		return
+	}
+	// for now we assume last out argument is always error
+	if len(results) < 1 {
+		w.Error(verror.Internalf("client call did not return any results"))
+		return
+	}
+
+	if err, ok := results[len(results)-1].(error); ok {
+		// return the call Application error as is
+		w.Error(err)
+		return
+	}
+
+	if err := w.Send(lib.ResponseFinal, results[0:len(results)-1]); err != nil {
+		w.Error(verror.Internalf("error marshalling results: %v", err))
+	}
+}
+
+// UpdateIdentity updates the identity used by the Controller. This must be called before any veyron requests are
+// made. This is only temporary as in the future, we'd expect to set the identity at construction time.
+func (c *Controller) UpdateIdentity(identity security.PrivateID) error {
+	c.Lock()
+	defer c.Unlock()
+	args := c.opts
+	if identity != nil {
+		args = append(c.opts, veyron2.RuntimeID(identity))
+	}
+	r, err := rt.New(args...)
+	if err != nil {
+		return err
+	}
+	client, err := r.NewClient(veyron2.CallTimeout(ipc.NoTimeout))
+	if err != nil {
+		return err
+	}
+	c.rt = r
+	c.logger = c.rt.Logger()
+	c.client = client
+	return nil
+}
+
+func (c *Controller) startCall(w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
+	c.Lock()
+	defer c.Unlock()
+	if c.client == nil {
+		return nil, verror.BadArgf("no client created")
+	}
+	methodName := lib.UppercaseFirstCharacter(msg.Method)
+	clientCall, err := c.client.StartCall(c.rt.TODOContext(), msg.Name, methodName, msg.InArgs)
+	if err != nil {
+		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+	}
+
+	return clientCall, nil
+}
+
+// Implements the serverHelper interface
+
+// CreateNewFlow creats a new server flow that will be used to write out
+// streaming messages to Javascript.
+func (c *Controller) CreateNewFlow(s *server.Server, stream stream.Sender) *server.Flow {
+	c.Lock()
+	defer c.Unlock()
+	id := c.lastGeneratedId
+	c.lastGeneratedId += 2
+	c.flowMap[id] = s
+	c.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}}
+	return &server.Flow{ID: id, Writer: c.writerCreator(id)}
+}
+
+// CleanupFlow removes the bookkeping for a previously created flow.
+func (c *Controller) CleanupFlow(id int64) {
+	c.Lock()
+	defer c.Unlock()
+	delete(c.outstandingStreams, id)
+	delete(c.flowMap, id)
+}
+
+// GetLogger returns a Veyron logger to use.
+func (c *Controller) GetLogger() vlog.Logger {
+	return c.logger
+}
+
+// RT returns the runtime of the app.
+func (c *Controller) RT() veyron2.Runtime {
+	return c.rt
+}
+
+// Cleanup Cleans up any outstanding rpcs.
+func (c *Controller) Cleanup() {
+	c.logger.VI(0).Info("Cleaning up websocket")
+	c.Lock()
+	defer c.Unlock()
+	for _, stream := range c.outstandingStreams {
+		if call, ok := stream.stream.(ipc.Call); ok {
+			call.Cancel()
+		}
+	}
+
+	for _, server := range c.servers {
+		server.Stop()
+	}
+}
+
+func (c *Controller) setup() {
+	c.signatureManager = lib.NewSignatureManager()
+	c.outstandingStreams = make(map[int64]outstandingStream)
+	c.flowMap = make(map[int64]*server.Server)
+	c.servers = make(map[uint64]*server.Server)
+}
+
+func (c *Controller) sendParsedMessageOnStream(id int64, msg interface{}, w lib.ClientWriter) {
+	c.Lock()
+	defer c.Unlock()
+	stream := c.outstandingStreams[id].stream
+	if stream == nil {
+		w.Error(fmt.Errorf("unknown stream"))
+		return
+	}
+
+	stream.Send(msg, w)
+
+}
+
+// SendOnStream writes data on id's stream.  Returns an error if the send failed.
+func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
+	c.Lock()
+	typ := c.outstandingStreams[id].inType
+	c.Unlock()
+	if typ == nil {
+		vlog.Errorf("no inType for stream %d (%q)", id, data)
+		return
+	}
+	payload, err := vom.JSONToObject(data, typ)
+	if err != nil {
+		vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
+		return
+	}
+	c.sendParsedMessageOnStream(id, payload, w)
+}
+
+// SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
+// the call object after it has been constructed.
+func (c *Controller) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w lib.ClientWriter, signal chan ipc.Stream) {
+	// We have to make the start call synchronous so we can make sure that we populate
+	// the call map before we can Handle a recieve call.
+	call, err := c.startCall(w, veyronMsg)
+	if err != nil {
+		w.Error(verror.Internalf("can't start Veyron Request: %v", err))
+		return
+	}
+
+	if signal != nil {
+		signal <- call
+	}
+
+	c.finishCall(w, call, veyronMsg)
+	if signal != nil {
+		c.Lock()
+		delete(c.outstandingStreams, id)
+		c.Unlock()
+	}
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(id int64, data string, w lib.ClientWriter) {
+	veyronMsg, inStreamType, err := c.parseVeyronRequest(bytes.NewBufferString(data))
+	if err != nil {
+		w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
+		return
+	}
+
+	c.Lock()
+	defer c.Unlock()
+	// If this rpc is streaming, we would expect that the client would try to send
+	// on this stream.  Since the initial handshake is done asynchronously, we have
+	// to basically put a queueing stream in the map before we make the async call
+	// so that the future sends on the stream can see the queuing stream, even if
+	// the client call isn't actually ready yet.
+	var signal chan ipc.Stream
+	if veyronMsg.IsStreaming {
+		signal = make(chan ipc.Stream)
+		c.outstandingStreams[id] = outstandingStream{
+			stream: client.StartQueueingStream(signal),
+			inType: inStreamType,
+		}
+	}
+	go c.sendVeyronRequest(id, veyronMsg, w, signal)
+}
+
+// CloseStream closes the stream for a given id.
+func (c *Controller) CloseStream(id int64) {
+	c.Lock()
+	defer c.Unlock()
+	stream := c.outstandingStreams[id].stream
+	if stream == nil {
+		c.logger.Errorf("close called on non-existent call: %v", id)
+		return
+	}
+
+	var call client.QueueingStream
+	var ok bool
+	if call, ok = stream.(client.QueueingStream); !ok {
+		c.logger.Errorf("can't close server stream: %v", id)
+		return
+	}
+
+	if err := call.Close(); err != nil {
+		c.logger.Errorf("client call close failed with: %v", err)
+	}
+}
+
+func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {
+	c.Lock()
+	defer c.Unlock()
+	if server, ok := c.servers[serverId]; ok {
+		return server, nil
+	}
+	server, err := server.NewServer(serverId, c.veyronProxyEP, c)
+	if err != nil {
+		return nil, err
+	}
+	c.servers[serverId] = server
+	return server, nil
+}
+
+func (c *Controller) removeServer(serverId uint64) {
+	c.Lock()
+	server := c.servers[serverId]
+	if server == nil {
+		c.Unlock()
+		return
+	}
+	delete(c.servers, serverId)
+	c.Unlock()
+
+	server.Stop()
+}
+
+func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
+	// Create a server for the websocket pipe, if it does not exist already
+	server, err := c.maybeCreateServer(serveRequest.ServerId)
+	if err != nil {
+		w.Error(verror.Internalf("error creating server: %v", err))
+	}
+
+	c.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)
+
+	endpoint, err := server.Serve(serveRequest.Name, serveRequest.Service)
+	if err != nil {
+		w.Error(verror.Internalf("error serving service: %v", err))
+		return
+	}
+	// Send the endpoint back
+	if err := w.Send(lib.ResponseFinal, endpoint); err != nil {
+		w.Error(verror.Internalf("error marshalling results: %v", err))
+		return
+	}
+}
+
+// HandleServeRequest takes a request to serve a server, creates
+// a server, registers the provided services and sends the endpoint back.
+func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
+	// Decode the serve request which includes IDL, registered services and name
+	var serveRequest serveRequest
+	decoder := json.NewDecoder(bytes.NewBufferString(data))
+	if err := decoder.Decode(&serveRequest); err != nil {
+		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+		return
+	}
+	c.serve(serveRequest, w)
+}
+
+// HandleStopRequest takes a request to stop a server.
+func (c *Controller) HandleStopRequest(data string, w lib.ClientWriter) {
+
+	var serverId uint64
+	decoder := json.NewDecoder(bytes.NewBufferString(data))
+	if err := decoder.Decode(&serverId); err != nil {
+		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+		return
+	}
+
+	c.removeServer(serverId)
+
+	// Send true to indicate stop has finished
+	if err := w.Send(lib.ResponseFinal, true); err != nil {
+		w.Error(verror.Internalf("error marshalling results: %v", err))
+		return
+	}
+}
+
+// HandleServerResponse handles the completion of outstanding calls to JavaScript services
+// by filling the corresponding channel with the result from JavaScript.
+func (c *Controller) HandleServerResponse(id int64, data string) {
+	c.Lock()
+	server := c.flowMap[id]
+	c.Unlock()
+	if server == nil {
+		c.logger.Errorf("unexpected result from JavaScript. No channel "+
+			"for MessageId: %d exists. Ignoring the results.", id)
+		//Ignore unknown responses that don't belong to any channel
+		return
+	}
+	server.HandleServerResponse(id, data)
+}
+
+// parseVeyronRequest parses a json rpc request into a veyronRPC object.
+func (c *Controller) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) {
+	var tempMsg veyronTempRPC
+	decoder := json.NewDecoder(r)
+	if err := decoder.Decode(&tempMsg); err != nil {
+		return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+	}
+
+	// Fetch and adapt signature from the SignatureManager
+	ctx := c.rt.TODOContext()
+	sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client)
+	if err != nil {
+		return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
+	}
+
+	methName := lib.UppercaseFirstCharacter(tempMsg.Method)
+	methSig, ok := sig.Methods[methName]
+	if !ok {
+		return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
+	}
+
+	var msg veyronRPC
+	if len(methSig.InArgs) != len(tempMsg.InArgs) {
+		return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
+	}
+	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
+	td := wiretype_build.TypeDefs(sig.TypeDefs)
+
+	for i := 0; i < len(tempMsg.InArgs); i++ {
+		argTypeId := methSig.InArgs[i].Type
+		argType := vom_wiretype.Type{
+			ID:   argTypeId,
+			Defs: &td,
+		}
+
+		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
+		if err != nil {
+			return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
+		}
+		msg.InArgs[i] = val
+	}
+
+	msg.Name = tempMsg.Name
+	msg.Method = tempMsg.Method
+	msg.NumOutArgs = tempMsg.NumOutArgs
+	msg.IsStreaming = tempMsg.IsStreaming
+
+	inStreamType := vom_wiretype.Type{
+		ID:   methSig.InStream,
+		Defs: &td,
+	}
+
+	c.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+	return &msg, inStreamType, nil
+}
+
+type signatureRequest struct {
+	Name string
+}
+
+func (c *Controller) getSignature(name string) (signature.JSONServiceSignature, error) {
+	// Fetch and adapt signature from the SignatureManager
+	ctx := c.rt.TODOContext()
+	sig, err := c.signatureManager.Signature(ctx, name, c.client)
+	if err != nil {
+		return nil, verror.Internalf("error getting service signature for %s: %v", name, err)
+	}
+
+	return signature.NewJSONServiceSignature(*sig), nil
+}
+
+// HandleSignatureRequest uses signature manager to get and cache signature of a remote server
+func (c *Controller) HandleSignatureRequest(data string, w lib.ClientWriter) {
+	// Decode the request
+	var request signatureRequest
+	decoder := json.NewDecoder(bytes.NewBufferString(data))
+	if err := decoder.Decode(&request); err != nil {
+		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+		return
+	}
+
+	c.logger.VI(2).Infof("requesting Signature for %q", request.Name)
+	jsSig, err := c.getSignature(request.Name)
+	if err != nil {
+		w.Error(err)
+		return
+	}
+
+	// Send the signature back
+	if err := w.Send(lib.ResponseFinal, jsSig); err != nil {
+		w.Error(verror.Internalf("error marshalling results: %v", err))
+		return
+	}
+}
diff --git a/services/wsprd/wspr/wspr_test.go b/services/wsprd/app/app_test.go
similarity index 83%
rename from services/wsprd/wspr/wspr_test.go
rename to services/wsprd/app/app_test.go
index d508beb..de01da9 100644
--- a/services/wsprd/wspr/wspr_test.go
+++ b/services/wsprd/app/app_test.go
@@ -1,4 +1,4 @@
-package wspr
+package app
 
 import (
 	"bytes"
@@ -15,7 +15,6 @@
 	"veyron2/ipc"
 	"veyron2/naming"
 	"veyron2/rt"
-	"veyron2/security"
 	"veyron2/vdl/vdlutil"
 	"veyron2/verror"
 	"veyron2/vlog"
@@ -133,6 +132,11 @@
 	return startAnyServer(true, mt)
 }
 
+type response struct {
+	Type    lib.ResponseType
+	Message interface{}
+}
+
 type testWriter struct {
 	sync.Mutex
 	stream []response
@@ -236,10 +240,9 @@
 		return
 	}
 	defer s.Stop()
-	wspr := NewWSPR(0, "mockVeyronProxyEP")
-	wsp := websocketPipe{ctx: wspr}
-	wsp.setup()
-	jsSig, err := wsp.getSignature("/" + endpoint.String())
+	controller := NewController(nil, "mockVeyronProxyEP")
+	controller.UpdateIdentity(nil)
+	jsSig, err := controller.getSignature("/" + endpoint.String())
 	if err != nil {
 		t.Errorf("Failed to get signature: %v", err)
 	}
@@ -249,45 +252,6 @@
 	}
 }
 
-func TestEncodeDecodeIdentity(t *testing.T) {
-	identity := security.FakePrivateID("/fake/private/id")
-	resultIdentity := decodeIdentity(r.Logger(), encodeIdentity(r.Logger(), identity))
-	if identity != resultIdentity {
-		t.Errorf("expected decodeIdentity(encodeIdentity(identity)) to be %v, got %v", identity, resultIdentity)
-	}
-}
-
-func TestHandleAssocIdentity(t *testing.T) {
-	wspr := NewWSPR(0, "mockVeyronProxyEP")
-	wsp := websocketPipe{ctx: wspr}
-	wsp.setup()
-
-	privateId := security.FakePrivateID("/fake/private/id")
-	identityData := assocIdentityData{
-		Account:  "test@example.org",
-		Identity: encodeIdentity(wspr.logger, privateId),
-		Origin:   "my.webapp.com",
-	}
-	jsonIdentityDataBytes, err := json.Marshal(identityData)
-	if err != nil {
-		t.Errorf("json.Marshal(%v) failed: %v", identityData, err)
-	}
-	jsonIdentityData := string(jsonIdentityDataBytes)
-	writer := testWriter{
-		logger: wspr.logger,
-	}
-	wsp.handleAssocIdentity(jsonIdentityData, lib.ClientWriter(&writer))
-	// Check that the pipe has the privateId
-	if wsp.privateId != privateId {
-		t.Errorf("wsp.privateId was not set. got: %v, expected: %v", wsp.privateId, identityData.Identity)
-	}
-	// Check that wspr idManager has the origin
-	_, err = wspr.idManager.Identity(identityData.Origin)
-	if err != nil {
-		t.Errorf("wspr.idManager.Identity(%v) failed: %v", identityData.Origin, err)
-	}
-}
-
 type goServerTestCase struct {
 	method             string
 	inArgs             []interface{}
@@ -307,25 +271,24 @@
 	}
 	defer s.Stop()
 
-	wspr := NewWSPR(0, "mockVeyronProxyEP")
-	wsp := websocketPipe{ctx: wspr}
-	wsp.setup()
-	writer := testWriter{
-		logger: wspr.logger,
-	}
+	controller := NewController(nil, "mockVeyronProxyEP")
+	controller.UpdateIdentity(nil)
 
+	writer := testWriter{
+		logger: controller.logger,
+	}
 	var signal chan ipc.Stream
 	if len(test.streamingInputs) > 0 {
 		signal = make(chan ipc.Stream, 1)
-		wsp.outstandingStreams[0] = outstandingStream{
+		controller.outstandingStreams[0] = outstandingStream{
 			stream: client.StartQueueingStream(signal),
 			inType: test.streamingInputType,
 		}
 		go func() {
 			for _, value := range test.streamingInputs {
-				wsp.sendOnStream(0, value, &writer)
+				controller.SendOnStream(0, value, &writer)
 			}
-			wsp.closeStream(0)
+			controller.CloseStream(0)
 		}()
 	}
 
@@ -336,7 +299,7 @@
 		NumOutArgs:  test.numOutArgs,
 		IsStreaming: signal != nil,
 	}
-	wsp.sendVeyronRequest(0, &request, &writer, signal)
+	controller.sendVeyronRequest(0, &request, &writer, signal)
 
 	checkResponses(&writer, test.expectedStream, test.expectedError, t)
 }
@@ -401,8 +364,7 @@
 }
 
 type runningTest struct {
-	wspr             *WSPR
-	wsp              *websocketPipe
+	controller       *Controller
 	writer           *testWriter
 	mounttableServer ipc.Server
 	proxyServer      *proxy.Proxy
@@ -423,22 +385,22 @@
 
 	proxyEndpoint := proxyServer.Endpoint().String()
 
-	wspr := NewWSPR(0, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String()})
-	wsp := websocketPipe{ctx: wspr}
-	writer := testWriter{
-		logger: wspr.logger,
-	}
-	wsp.writerCreator = func(int64) lib.ClientWriter {
+	writer := testWriter{}
+
+	writerCreator := func(int64) lib.ClientWriter {
 		return &writer
 	}
-	wsp.setup()
-	wsp.serve(serveRequest{
+	controller := NewController(writerCreator, "/"+proxyEndpoint, veyron2.NamespaceRoots{"/" + endpoint.String()})
+	controller.UpdateIdentity(nil)
+	writer.logger = controller.logger
+
+	controller.serve(serveRequest{
 		Name:    "adder",
 		Service: adderServiceSignature,
 	}, &writer)
 
 	return &runningTest{
-		wspr, &wsp, &writer, mounttableServer, proxyServer,
+		controller, &writer, mounttableServer, proxyServer,
 	}, nil
 }
 
@@ -446,7 +408,7 @@
 	rt, err := serveServer()
 	defer rt.mounttableServer.Stop()
 	defer rt.proxyServer.Shutdown()
-	defer rt.wsp.cleanup()
+	defer rt.controller.Cleanup()
 	if err != nil {
 		t.Fatalf("could not serve server %v", err)
 	}
@@ -475,7 +437,7 @@
 	rt, err := serveServer()
 	defer rt.mounttableServer.Stop()
 	defer rt.proxyServer.Shutdown()
-	defer rt.wsp.cleanup()
+	defer rt.controller.Cleanup()
 
 	if err != nil {
 		t.Errorf("could not serve server %v", err)
@@ -483,17 +445,17 @@
 	}
 
 	// ensure there is only one server and then stop the server
-	if len(rt.wsp.servers) != 1 {
-		t.Errorf("expected only one server but got: %d", len(rt.wsp.servers))
+	if len(rt.controller.servers) != 1 {
+		t.Errorf("expected only one server but got: %d", len(rt.controller.servers))
 		return
 	}
-	for serverId := range rt.wsp.servers {
-		rt.wsp.removeServer(serverId)
+	for serverId := range rt.controller.servers {
+		rt.controller.removeServer(serverId)
 	}
 
 	// ensure there is no more servers now
-	if len(rt.wsp.servers) != 0 {
-		t.Errorf("expected no server after stopping the only one but got: %d", len(rt.wsp.servers))
+	if len(rt.controller.servers) != 0 {
+		t.Errorf("expected no server after stopping the only one but got: %d", len(rt.controller.servers))
 		return
 	}
 
@@ -511,9 +473,9 @@
 	err           *verror.Standard
 }
 
-func sendServerStream(t *testing.T, wsp *websocketPipe, test *jsServerTestCase, w lib.ClientWriter) {
+func sendServerStream(t *testing.T, controller *Controller, test *jsServerTestCase, w lib.ClientWriter) {
 	for _, msg := range test.serverStream {
-		wsp.sendParsedMessageOnStream(0, msg, w)
+		controller.sendParsedMessageOnStream(0, msg, w)
 	}
 
 	serverReply := map[string]interface{}{
@@ -525,14 +487,14 @@
 	if err != nil {
 		t.Fatalf("Failed to serialize the reply: %v", err)
 	}
-	wsp.handleServerResponse(0, string(bytes))
+	controller.HandleServerResponse(0, string(bytes))
 }
 
 func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
 	rt, err := serveServer()
 	defer rt.mounttableServer.Stop()
 	defer rt.proxyServer.Shutdown()
-	defer rt.wsp.cleanup()
+	defer rt.controller.Cleanup()
 
 	if err != nil {
 		t.Errorf("could not serve server %v", err)
@@ -561,14 +523,14 @@
 
 	rt.writer.stream = nil
 
-	// Create a client using wspr's runtime so it points to the right mounttable.
-	client, err := rt.wspr.rt.NewClient()
+	// Create a client using app's runtime so it points to the right mounttable.
+	client, err := rt.controller.rt.NewClient()
 
 	if err != nil {
 		t.Errorf("unable to create client: %v", err)
 	}
 
-	call, err := client.StartCall(rt.wspr.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs)
+	call, err := client.StartCall(rt.controller.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs)
 	if err != nil {
 		t.Errorf("failed to start call: %v", err)
 	}
@@ -607,7 +569,7 @@
 	expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStreamClose})
 
 	expectedStream := test.serverStream
-	go sendServerStream(t, rt.wsp, &test, rt.writer)
+	go sendServerStream(t, rt.controller, &test, rt.writer)
 	for {
 		var data interface{}
 		if err := call.Recv(&data); err != nil {
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
index b8689be..3349f85 100644
--- a/services/wsprd/wspr/pipe.go
+++ b/services/wsprd/wspr/pipe.go
@@ -10,22 +10,14 @@
 	_ "net/http/pprof"
 	"os"
 	"strings"
-	"sync"
 	"time"
 
-	"veyron/services/wsprd/ipc/client"
-	"veyron/services/wsprd/ipc/server"
-	"veyron/services/wsprd/ipc/stream"
+	"veyron/services/wsprd/app"
 	"veyron/services/wsprd/lib"
-	"veyron/services/wsprd/signature"
-	"veyron2"
-	"veyron2/ipc"
 	"veyron2/security"
 	"veyron2/verror"
 	"veyron2/vlog"
 	"veyron2/vom"
-	vom_wiretype "veyron2/vom/wiretype"
-	wiretype_build "veyron2/wiretype/build"
 
 	"github.com/gorilla/websocket"
 )
@@ -69,207 +61,100 @@
 	Type websocketMessageType
 }
 
-// Temporary holder of RPC so that we can store the unprocessed args.
-type veyronTempRPC struct {
-	Name        string
-	Method      string
-	InArgs      []json.RawMessage
-	NumOutArgs  int32
-	IsStreaming bool
+// wsMessage is the struct that is put on the write queue.
+type wsMessage struct {
+	buf         []byte
+	messageType int
 }
 
-type veyronRPC struct {
-	Name        string
-	Method      string
-	InArgs      []interface{}
-	NumOutArgs  int32
-	IsStreaming bool
-}
+type pipe struct {
+	// The struct that handles the translation of javascript request to veyron requests.
+	controller *app.Controller
 
-// A request javascript to serve undern a particular name
-type serveRequest struct {
-	Name     string
-	ServerId uint64
-	Service  signature.JSONServiceSignature
-}
-type websocketPipe struct {
-	// Protects outstandingStreams and outstandingServerRequests.
-	sync.Mutex
-	ws  *websocket.Conn
-	ctx *WSPR
-	// Used to generate unique ids for requests initiated by the proxy.
-	// These ids will be even so they don't collide with the ids generated
-	// by the client.
-	lastGeneratedId int64
+	ws *websocket.Conn
 
-	// Streams for the outstanding requests.
-	outstandingStreams map[int64]outstandingStream
+	logger vlog.Logger
 
-	// Maps flowids to the server that owns them.
-	flowMap map[int64]*server.Server
+	wspr *WSPR
 
-	// A manager that handles fetching and caching signature of remote services
-	signatureManager lib.SignatureManager
-
-	// We maintain multiple Veyron server per websocket pipe for serving JavaScript
-	// services.
-	servers map[uint64]*server.Server
-
-	// Creates a client writer for a given flow.  This is a member so that tests can override
-	// the default implementation.
 	writerCreator func(id int64) lib.ClientWriter
 
+	// There is a single write goroutine because ws.NewWriter() creates a new writer that
+	// writes to a shared buffer in the websocket, so it is not safe to have multiple go
+	// routines writing to different websocket writers.
 	writeQueue chan wsMessage
 
-	// privateId associated with the pipe
-	privateId security.PrivateID
+	// This request is used to tell WSPR which pipe to remove when we shutdown.
+	req *http.Request
 }
 
-// finishCall waits for the call to finish and write out the response to w.
-func (wsp *websocketPipe) finishCall(w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
-	if msg.IsStreaming {
-		for {
-			var item interface{}
-			if err := clientCall.Recv(&item); err != nil {
-				if err == io.EOF {
-					break
-				}
-				w.Error(err) // Send streaming error as is
-				return
-			}
-			if err := w.Send(lib.ResponseStream, item); err != nil {
-				w.Error(verror.Internalf("unable to marshal: %v", item))
-			}
-		}
-
-		if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
-			w.Error(verror.Internalf("unable to marshal close stream message"))
-		}
-	}
-
-	results := make([]interface{}, msg.NumOutArgs)
-	// This array will have pointers to the values in result.
-	resultptrs := make([]interface{}, msg.NumOutArgs)
-	for ax := range results {
-		resultptrs[ax] = &results[ax]
-	}
-	if err := clientCall.Finish(resultptrs...); err != nil {
-		// return the call system error as is
-		w.Error(err)
-		return
-	}
-	// for now we assume last out argument is always error
-	if len(results) < 1 {
-		w.Error(verror.Internalf("client call did not return any results"))
-		return
-	}
-
-	if err, ok := results[len(results)-1].(error); ok {
-		// return the call application error as is
-		w.Error(err)
-		return
-	}
-
-	if err := w.Send(lib.ResponseFinal, results[0:len(results)-1]); err != nil {
-		w.Error(verror.Internalf("error marshalling results: %v", err))
-	}
-}
-
-// Implements the serverHelper interface
-func (wsp *websocketPipe) CreateNewFlow(s *server.Server, stream stream.Sender) *server.Flow {
-	wsp.Lock()
-	defer wsp.Unlock()
-	id := wsp.lastGeneratedId
-	wsp.lastGeneratedId += 2
-	wsp.flowMap[id] = s
-	wsp.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}}
-	return &server.Flow{ID: id, Writer: wsp.writerCreator(id)}
-}
-
-func (wsp *websocketPipe) CleanupFlow(id int64) {
-	wsp.Lock()
-	defer wsp.Unlock()
-	delete(wsp.outstandingStreams, id)
-	delete(wsp.flowMap, id)
-}
-
-func (wsp *websocketPipe) GetLogger() vlog.Logger {
-	return wsp.ctx.logger
-}
-
-func (wsp *websocketPipe) RT() veyron2.Runtime {
-	return wsp.ctx.rt
+func newPipe(w http.ResponseWriter, req *http.Request, wspr *WSPR, creator func(id int64) lib.ClientWriter) *pipe {
+	pipe := &pipe{logger: wspr.rt.Logger(), writerCreator: creator, req: req, wspr: wspr}
+	pipe.start(w, req)
+	return pipe
 }
 
 // cleans up any outstanding rpcs.
-func (wsp *websocketPipe) cleanup() {
-	wsp.ctx.logger.VI(0).Info("Cleaning up websocket")
-	wsp.Lock()
-	defer wsp.Unlock()
-	for _, stream := range wsp.outstandingStreams {
-		if call, ok := stream.stream.(ipc.Call); ok {
-			call.Cancel()
+func (p *pipe) cleanup() {
+	p.logger.VI(0).Info("Cleaning up websocket")
+	p.controller.Cleanup()
+	p.wspr.CleanUpPipe(p.req)
+}
+
+func (p *pipe) setup() {
+	if p.writerCreator == nil {
+		p.writerCreator = func(id int64) lib.ClientWriter {
+			return &websocketWriter{p: p, id: id, logger: p.logger}
 		}
 	}
 
-	for _, server := range wsp.servers {
-		server.Stop()
-	}
+	p.writeQueue = make(chan wsMessage, 50)
+	go p.writeLoop()
+
+	p.controller = app.NewController(p.writerCreator, p.wspr.veyronProxyEP)
+	// TODO(bjornick):  Pass in the identity linked to this origin.
+	p.controller.UpdateIdentity(nil)
 }
 
-func (wsp *websocketPipe) setup() {
-	wsp.signatureManager = lib.NewSignatureManager()
-	wsp.outstandingStreams = make(map[int64]outstandingStream)
-	wsp.flowMap = make(map[int64]*server.Server)
-	wsp.servers = make(map[uint64]*server.Server)
-	wsp.writeQueue = make(chan wsMessage, 50)
-	go wsp.writeLoop()
-
-	if wsp.writerCreator == nil {
-		wsp.writerCreator = func(id int64) lib.ClientWriter {
-			return &websocketWriter{wsp: wsp, id: id, logger: wsp.ctx.logger}
-		}
-	}
-}
-
-func (wsp *websocketPipe) writeLoop() {
+func (p *pipe) writeLoop() {
 	for {
-		msg, ok := <-wsp.writeQueue
+		msg, ok := <-p.writeQueue
 		if !ok {
-			wsp.ctx.logger.Errorf("write queue was closed")
+			p.logger.Errorf("write queue was closed")
 			return
 		}
 
 		if msg.messageType == websocket.PingMessage {
-			wsp.ctx.logger.Infof("sending ping")
+			p.logger.Infof("sending ping")
 		}
-		if err := wsp.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
-			wsp.ctx.logger.Errorf("failed to write bytes: %s", err)
+		if err := p.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
+			p.logger.Errorf("failed to write bytes: %s", err)
 		}
 	}
 }
 
-func (wsp *websocketPipe) start(w http.ResponseWriter, req *http.Request) {
+func (p *pipe) start(w http.ResponseWriter, req *http.Request) {
 	ws, err := websocket.Upgrade(w, req, nil, 1024, 1024)
 	if _, ok := err.(websocket.HandshakeError); ok {
 		http.Error(w, "Not a websocket handshake", 400)
 		return
 	} else if err != nil {
 		http.Error(w, "Internal Error", 500)
-		wsp.ctx.logger.Errorf("websocket upgrade failed: %s", err)
+		p.logger.Errorf("websocket upgrade failed: %s", err)
 		return
 	}
 
-	wsp.setup()
-	wsp.ws = ws
-	wsp.ws.SetPongHandler(wsp.pongHandler)
-	wsp.sendInitialMessage()
-	go wsp.readLoop()
-	go wsp.pingLoop()
+	p.ws = ws
+	p.ws.SetPongHandler(p.pongHandler)
+	p.setup()
+	p.sendInitialMessage()
+
+	go p.readLoop()
+	go p.pingLoop()
 }
 
 // Upon first connect, we send a message with the wsprConfig.
-func (wsp *websocketPipe) sendInitialMessage() {
+func (p *pipe) sendInitialMessage() {
 	mounttableRoots := strings.Split(os.Getenv("NAMESPACE_ROOT"), ",")
 	if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
 		mounttableRoots = []string{}
@@ -280,154 +165,56 @@
 
 	var buf bytes.Buffer
 	if err := vom.ObjToJSON(&buf, vom.ValueOf(msg)); err != nil {
-		wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err)
+		p.logger.Errorf("failed to convert wspr config to json: %s", err)
 		return
 	}
-	wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
+	p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
 }
 
-func (wsp *websocketPipe) pingLoop() {
+func (p *pipe) pingLoop() {
 	for {
 		time.Sleep(pingInterval)
-		wsp.ctx.logger.VI(2).Info("ws: ping")
-		wsp.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
+		p.logger.VI(2).Info("ws: ping")
+		p.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
 	}
 }
 
-func (wsp *websocketPipe) pongHandler(msg string) error {
-	wsp.ctx.logger.VI(2).Infof("ws: pong")
-	wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout))
+func (p *pipe) pongHandler(msg string) error {
+	p.logger.VI(2).Infof("ws: pong")
+	p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
 	return nil
 }
 
-func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w lib.ClientWriter) {
-	wsp.Lock()
-	defer wsp.Unlock()
-	stream := wsp.outstandingStreams[id].stream
-	if stream == nil {
-		w.Error(fmt.Errorf("unknown stream"))
-		return
-	}
-
-	stream.Send(msg, w)
-
-}
-
-// sendOnStream writes data on id's stream.  Returns an error if the send failed.
-func (wsp *websocketPipe) sendOnStream(id int64, data string, w lib.ClientWriter) {
-	wsp.Lock()
-	typ := wsp.outstandingStreams[id].inType
-	wsp.Unlock()
-	if typ == nil {
-		vlog.Errorf("no inType for stream %d (%q)", id, data)
-		return
-	}
-	payload, err := vom.JSONToObject(data, typ)
-	if err != nil {
-		vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
-		return
-	}
-	wsp.sendParsedMessageOnStream(id, payload, w)
-}
-
-func (wsp *websocketPipe) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w lib.ClientWriter, signal chan ipc.Stream) {
-	// We have to make the start call synchronous so we can make sure that we populate
-	// the call map before we can handle a recieve call.
-	call, err := wsp.ctx.startVeyronRequest(w, veyronMsg)
-	if err != nil {
-		w.Error(verror.Internalf("can't start Veyron Request: %v", err))
-		return
-	}
-
-	if signal != nil {
-		signal <- call
-	}
-
-	wsp.finishCall(w, call, veyronMsg)
-	if signal != nil {
-		wsp.Lock()
-		delete(wsp.outstandingStreams, id)
-		wsp.Unlock()
-	}
-}
-
-// handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w lib.ClientWriter) {
-	veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
-	if err != nil {
-		w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
-		return
-	}
-
-	wsp.Lock()
-	defer wsp.Unlock()
-	// If this rpc is streaming, we would expect that the client would try to send
-	// on this stream.  Since the initial handshake is done asynchronously, we have
-	// to basically put a queueing stream in the map before we make the async call
-	// so that the future sends on the stream can see the queuing stream, even if
-	// the client call isn't actually ready yet.
-	var signal chan ipc.Stream
-	if veyronMsg.IsStreaming {
-		signal = make(chan ipc.Stream)
-		wsp.outstandingStreams[id] = outstandingStream{
-			stream: client.StartQueueingStream(signal),
-			inType: inStreamType,
-		}
-	}
-	go wsp.sendVeyronRequest(id, veyronMsg, w, signal)
-}
-
-func (wsp *websocketPipe) closeStream(id int64) {
-	wsp.Lock()
-	defer wsp.Unlock()
-	stream := wsp.outstandingStreams[id].stream
-	if stream == nil {
-		wsp.ctx.logger.Errorf("close called on non-existent call: %v", id)
-		return
-	}
-
-	var call client.QueueingStream
-	var ok bool
-	if call, ok = stream.(client.QueueingStream); !ok {
-		wsp.ctx.logger.Errorf("can't close server stream: %v", id)
-		return
-	}
-
-	if err := call.Close(); err != nil {
-		wsp.ctx.logger.Errorf("client call close failed with: %v", err)
-	}
-}
-
-func (wsp *websocketPipe) readLoop() {
-	wsp.ws.SetReadDeadline(time.Now().Add(pongTimeout))
+func (p *pipe) readLoop() {
+	p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
 	for {
-		op, r, err := wsp.ws.NextReader()
+		op, r, err := p.ws.NextReader()
 		if err == io.ErrUnexpectedEOF { // websocket disconnected
 			break
 		}
 		if err != nil {
-			wsp.ctx.logger.VI(1).Infof("websocket receive: %s", err)
+			p.logger.VI(1).Infof("websocket receive: %s", err)
 			break
 		}
 
 		if op != websocket.TextMessage {
-			wsp.ctx.logger.Errorf("unexpected websocket op: %v", op)
+			p.logger.Errorf("unexpected websocket op: %v", op)
 		}
 
 		var msg websocketMessage
 		decoder := json.NewDecoder(r)
 		if err := decoder.Decode(&msg); err != nil {
 			errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
-			wsp.ctx.logger.Error(errMsg)
-			wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
+			p.logger.Error(errMsg)
+			p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
 			continue
 		}
 
-		ww := wsp.writerCreator(msg.Id)
+		ww := p.writerCreator(msg.Id)
 
 		switch msg.Type {
 		case websocketVeyronRequest:
-			wsp.handleVeyronRequest(msg.Id, msg.Data, ww)
+			p.controller.HandleVeyronRequest(msg.Id, msg.Data, ww)
 		case websocketStreamingValue:
 			// This will asynchronous for a client rpc, but synchronous for a
 			// server rpc.  This could be potentially bad if the server is sending
@@ -435,224 +222,24 @@
 			// it difficult to guarantee that all stream messages make it to the client
 			// before the finish call.
 			// TODO(bjornick): Make the server send also asynchronous.
-			wsp.sendOnStream(msg.Id, msg.Data, ww)
+			p.controller.SendOnStream(msg.Id, msg.Data, ww)
 		case websocketStreamClose:
-			wsp.closeStream(msg.Id)
+			p.controller.CloseStream(msg.Id)
 		case websocketServe:
-			go wsp.handleServeRequest(msg.Data, ww)
+			go p.controller.HandleServeRequest(msg.Data, ww)
 		case websocketStopServer:
-			go wsp.handleStopRequest(msg.Data, ww)
+			go p.controller.HandleStopRequest(msg.Data, ww)
 		case websocketServerResponse:
-			go wsp.handleServerResponse(msg.Id, msg.Data)
+			go p.controller.HandleServerResponse(msg.Id, msg.Data)
 		case websocketSignatureRequest:
-			go wsp.handleSignatureRequest(msg.Data, ww)
+			go p.controller.HandleSignatureRequest(msg.Data, ww)
 		case websocketAssocIdentity:
-			wsp.handleAssocIdentity(msg.Data, ww)
+			p.handleAssocIdentity(msg.Data, ww)
 		default:
 			ww.Error(verror.Unknownf("unknown message type: %v", msg.Type))
 		}
 	}
-	wsp.cleanup()
-}
-
-func (wsp *websocketPipe) maybeCreateServer(serverId uint64) (*server.Server, error) {
-	wsp.Lock()
-	defer wsp.Unlock()
-	if server, ok := wsp.servers[serverId]; ok {
-		return server, nil
-	}
-	server, err := server.NewServer(serverId, wsp.ctx.veyronProxyEP, wsp)
-	if err != nil {
-		return nil, err
-	}
-	wsp.servers[serverId] = server
-	return server, nil
-}
-
-func (wsp *websocketPipe) removeServer(serverId uint64) {
-	wsp.Lock()
-	server := wsp.servers[serverId]
-	if server == nil {
-		wsp.Unlock()
-		return
-	}
-	delete(wsp.servers, serverId)
-	wsp.Unlock()
-
-	server.Stop()
-}
-
-func (wsp *websocketPipe) serve(serveRequest serveRequest, w lib.ClientWriter) {
-	// Create a server for the websocket pipe, if it does not exist already
-	server, err := wsp.maybeCreateServer(serveRequest.ServerId)
-	if err != nil {
-		w.Error(verror.Internalf("error creating server: %v", err))
-	}
-
-	wsp.ctx.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)
-
-	endpoint, err := server.Serve(serveRequest.Name, serveRequest.Service)
-	if err != nil {
-		w.Error(verror.Internalf("error serving service: %v", err))
-		return
-	}
-	// Send the endpoint back
-	if err := w.Send(lib.ResponseFinal, endpoint); err != nil {
-		w.Error(verror.Internalf("error marshalling results: %v", err))
-		return
-	}
-}
-
-// handleServeRequest takes a request to serve a server, creates
-// a server, registers the provided services and sends the endpoint back.
-func (wsp *websocketPipe) handleServeRequest(data string, w lib.ClientWriter) {
-	// Decode the serve request which includes IDL, registered services and name
-	var serveRequest serveRequest
-	decoder := json.NewDecoder(bytes.NewBufferString(data))
-	if err := decoder.Decode(&serveRequest); err != nil {
-		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
-		return
-	}
-	wsp.serve(serveRequest, w)
-}
-
-// handleStopRequest takes a request to stop a server.
-func (wsp *websocketPipe) handleStopRequest(data string, w lib.ClientWriter) {
-
-	var serverId uint64
-	decoder := json.NewDecoder(bytes.NewBufferString(data))
-	if err := decoder.Decode(&serverId); err != nil {
-		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
-		return
-	}
-
-	wsp.removeServer(serverId)
-
-	// Send true to indicate stop has finished
-	if err := w.Send(lib.ResponseFinal, true); err != nil {
-		w.Error(verror.Internalf("error marshalling results: %v", err))
-		return
-	}
-}
-
-// handleServerResponse handles the completion of outstanding calls to JavaScript services
-// by filling the corresponding channel with the result from JavaScript.
-func (wsp *websocketPipe) handleServerResponse(id int64, data string) {
-	wsp.Lock()
-	server := wsp.flowMap[id]
-	wsp.Unlock()
-	if server == nil {
-		wsp.ctx.logger.Errorf("unexpected result from JavaScript. No channel "+
-			"for MessageId: %d exists. Ignoring the results.", id)
-		//Ignore unknown responses that don't belong to any channel
-		return
-	}
-	server.HandleServerResponse(id, data)
-}
-
-// parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) {
-	var tempMsg veyronTempRPC
-	decoder := json.NewDecoder(r)
-	if err := decoder.Decode(&tempMsg); err != nil {
-		return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
-	}
-
-	client, err := wsp.ctx.newClient()
-	if err != nil {
-		return nil, nil, verror.Internalf("error creating client: %v", err)
-	}
-
-	// Fetch and adapt signature from the SignatureManager
-	ctx := wsp.ctx.rt.TODOContext()
-	sig, err := wsp.signatureManager.Signature(ctx, tempMsg.Name, client)
-	if err != nil {
-		return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
-	}
-
-	methName := lib.UppercaseFirstCharacter(tempMsg.Method)
-	methSig, ok := sig.Methods[methName]
-	if !ok {
-		return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
-	}
-
-	var msg veyronRPC
-	if len(methSig.InArgs) != len(tempMsg.InArgs) {
-		return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
-	}
-	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
-	td := wiretype_build.TypeDefs(sig.TypeDefs)
-
-	for i := 0; i < len(tempMsg.InArgs); i++ {
-		argTypeId := methSig.InArgs[i].Type
-		argType := vom_wiretype.Type{
-			ID:   argTypeId,
-			Defs: &td,
-		}
-
-		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
-		if err != nil {
-			return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
-		}
-		msg.InArgs[i] = val
-	}
-
-	msg.Name = tempMsg.Name
-	msg.Method = tempMsg.Method
-	msg.NumOutArgs = tempMsg.NumOutArgs
-	msg.IsStreaming = tempMsg.IsStreaming
-
-	inStreamType := vom_wiretype.Type{
-		ID:   methSig.InStream,
-		Defs: &td,
-	}
-
-	wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
-	return &msg, inStreamType, nil
-}
-
-type signatureRequest struct {
-	Name string
-}
-
-func (wsp *websocketPipe) getSignature(name string) (signature.JSONServiceSignature, error) {
-	client, err := wsp.ctx.newClient()
-	if err != nil {
-		return nil, verror.Internalf("error creating client: %v", err)
-	}
-
-	// Fetch and adapt signature from the SignatureManager
-	ctx := wsp.ctx.rt.TODOContext()
-	sig, err := wsp.signatureManager.Signature(ctx, name, client)
-	if err != nil {
-		return nil, verror.Internalf("error getting service signature for %s: %v", name, err)
-	}
-
-	return signature.NewJSONServiceSignature(*sig), nil
-}
-
-// handleSignatureRequest uses signature manager to get and cache signature of a remote server
-func (wsp *websocketPipe) handleSignatureRequest(data string, w lib.ClientWriter) {
-	// Decode the request
-	var request signatureRequest
-	decoder := json.NewDecoder(bytes.NewBufferString(data))
-	if err := decoder.Decode(&request); err != nil {
-		w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
-		return
-	}
-
-	wsp.ctx.logger.VI(2).Infof("requesting Signature for %q", request.Name)
-	jsSig, err := wsp.getSignature(request.Name)
-	if err != nil {
-		w.Error(err)
-		return
-	}
-
-	// Send the signature back
-	if err := w.Send(lib.ResponseFinal, jsSig); err != nil {
-		w.Error(verror.Internalf("error marshalling results: %v", err))
-		return
-	}
+	p.cleanup()
 }
 
 type assocIdentityData struct {
@@ -662,7 +249,7 @@
 }
 
 // handleAssocIdentityRequest associates the identity with the origin
-func (wsp *websocketPipe) handleAssocIdentity(data string, w lib.ClientWriter) {
+func (p *pipe) handleAssocIdentity(data string, w lib.ClientWriter) {
 	// Decode the request
 	var parsedData assocIdentityData
 	decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -671,17 +258,18 @@
 		return
 	}
 
-	wsp.ctx.logger.VI(2).Info("associating name %v and private id %v to origin %v",
+	p.logger.VI(2).Info("associating name %v and private id %v to origin %v",
 		parsedData.Account,
 		parsedData.Identity,
 		parsedData.Origin)
 
-	idManager := wsp.ctx.idManager
+	idManager := p.wspr.idManager
 
-	wsp.privateId = decodeIdentity(wsp.ctx.logger, parsedData.Identity)
+	id := decodeIdentity(p.logger, parsedData.Identity)
+	p.controller.UpdateIdentity(id)
 
-	if err := idManager.AddAccount(parsedData.Account, wsp.privateId); err != nil {
-		w.Error(verror.Internalf("identity.AddAccount(%v, %v) failed: %v", parsedData.Account, wsp.privateId, err))
+	if err := idManager.AddAccount(parsedData.Account, id); err != nil {
+		w.Error(verror.Internalf("identity.AddAccount(%v, %v) failed: %v", parsedData.Account, id, err))
 	}
 
 	if err := idManager.AddOrigin(parsedData.Origin, parsedData.Account, []security.ServiceCaveat{}); err != nil {
diff --git a/services/wsprd/wspr/pipe_test.go b/services/wsprd/wspr/pipe_test.go
new file mode 100644
index 0000000..8f2e89c
--- /dev/null
+++ b/services/wsprd/wspr/pipe_test.go
@@ -0,0 +1,58 @@
+package wspr
+
+import (
+	"encoding/json"
+	"testing"
+	"veyron/services/wsprd/lib"
+	"veyron2"
+	"veyron2/rt"
+	"veyron2/security"
+)
+
+var r veyron2.Runtime
+
+func init() {
+	r = rt.Init()
+}
+
+type testWriter struct{}
+
+func (*testWriter) Send(lib.ResponseType, interface{}) error { return nil }
+func (*testWriter) Error(error)                              {}
+
+func TestHandleAssocIdentity(t *testing.T) {
+	wspr := NewWSPR(0, "mockVeyronProxyEP")
+	p := pipe{wspr: wspr, logger: wspr.rt.Logger()}
+	p.setup()
+
+	privateId := security.FakePrivateID("/fake/private/id")
+	identityData := assocIdentityData{
+		Account:  "test@example.org",
+		Identity: encodeIdentity(wspr.logger, privateId),
+		Origin:   "my.webapp.com",
+	}
+	jsonIdentityDataBytes, err := json.Marshal(identityData)
+	if err != nil {
+		t.Errorf("json.Marshal(%v) failed: %v", identityData, err)
+	}
+	jsonIdentityData := string(jsonIdentityDataBytes)
+	writer := testWriter{}
+	p.handleAssocIdentity(jsonIdentityData, lib.ClientWriter(&writer))
+	// Check that the pipe has the privateId
+	if p.app.RT().Identity() != privateId {
+		t.Errorf("p.privateId was not set. got: %v, expected: %v", p.app.RT().Identity(), identityData.Identity)
+	}
+	// Check that wspr idManager has the origin
+	_, err = wspr.idManager.Identity(identityData.Origin)
+	if err != nil {
+		t.Errorf("wspr.idManager.Identity(%v) failed: %v", identityData.Origin, err)
+	}
+}
+
+func TestEncodeDecodeIdentity(t *testing.T) {
+	identity := security.FakePrivateID("/fake/private/id")
+	resultIdentity := decodeIdentity(r.Logger(), encodeIdentity(r.Logger(), identity))
+	if identity != resultIdentity {
+		t.Errorf("expected decodeIdentity(encodeIdentity(identity)) to be %v, got %v", identity, resultIdentity)
+	}
+}
diff --git a/services/wsprd/wspr/writer.go b/services/wsprd/wspr/writer.go
index ebf799e..c7ca1b8 100644
--- a/services/wsprd/wspr/writer.go
+++ b/services/wsprd/wspr/writer.go
@@ -23,7 +23,7 @@
 
 // Implements clientWriter interface for sending messages over websockets.
 type websocketWriter struct {
-	wsp    *websocketPipe
+	p      *pipe
 	logger vlog.Logger
 	id     int64
 }
@@ -42,7 +42,7 @@
 		return err
 	}
 
-	w.wsp.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf2.Bytes()}
+	w.p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf2.Bytes()}
 
 	return nil
 }
diff --git a/services/wsprd/wspr/wspr.go b/services/wsprd/wspr/wspr.go
index af24916..0780d1c 100644
--- a/services/wsprd/wspr/wspr.go
+++ b/services/wsprd/wspr/wspr.go
@@ -17,22 +17,18 @@
 import (
 	"bytes"
 	"crypto/tls"
-	"encoding/binary"
 	"fmt"
 	"io"
 	"log"
 	"net/http"
 	_ "net/http/pprof"
+	"sync"
 	"time"
 
 	"veyron/services/wsprd/identity"
-	"veyron/services/wsprd/ipc/stream"
-	"veyron/services/wsprd/lib"
 	"veyron2"
-	"veyron2/ipc"
 	"veyron2/rt"
 	"veyron2/vlog"
-	"veyron2/vom"
 )
 
 const (
@@ -45,47 +41,18 @@
 }
 
 type WSPR struct {
+	mu            sync.Mutex
 	tlsCert       *tls.Certificate
 	rt            veyron2.Runtime
 	logger        vlog.Logger
 	port          int
 	veyronProxyEP string
 	idManager     *identity.IDManager
+	pipes         map[*http.Request]*pipe
 }
 
 var logger vlog.Logger
 
-func (ctx WSPR) newClient() (ipc.Client, error) {
-	return ctx.rt.NewClient(veyron2.CallTimeout(ipc.NoTimeout))
-}
-
-func (ctx WSPR) startVeyronRequest(w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
-	// Issue request to the endpoint.
-	client, err := ctx.newClient()
-	if err != nil {
-		return nil, err
-	}
-	methodName := lib.UppercaseFirstCharacter(msg.Method)
-	clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, methodName, msg.InArgs)
-
-	if err != nil {
-		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
-	}
-
-	return clientCall, nil
-}
-
-func intToByteSlice(i int32) []byte {
-	rw := new(bytes.Buffer)
-	binary.Write(rw, binary.BigEndian, i)
-	buf := make([]byte, 4)
-	n, err := io.ReadFull(rw, buf)
-	if n != 4 || err != nil {
-		panic(fmt.Sprintf("Read less than 4 bytes: %d", n))
-	}
-	return buf[:n]
-}
-
 func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("Content-Type", "text/html")
 	w.Write([]byte(`<html>
@@ -113,24 +80,15 @@
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 }
 
-type outstandingStream struct {
-	stream stream.Sender
-	inType vom.Type
-}
-
-type wsMessage struct {
-	buf         []byte
-	messageType int
-}
-
 // Starts the proxy and listens for requests. This method is blocking.
 func (ctx WSPR) Run() {
 	http.HandleFunc("/debug", ctx.handleDebug)
 	http.Handle("/favicon.ico", http.NotFoundHandler())
 	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
 		ctx.logger.VI(0).Info("Creating a new websocket")
-		pipe := &websocketPipe{ctx: &ctx}
-		pipe.start(w, r)
+		ctx.mu.Lock()
+		defer ctx.mu.Unlock()
+		ctx.pipes[r] = newPipe(w, r, &ctx, nil)
 	})
 	ctx.logger.VI(1).Infof("Listening on port %d.", ctx.port)
 	httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.port), nil)
@@ -143,6 +101,12 @@
 	ctx.rt.Cleanup()
 }
 
+func (ctx WSPR) CleanUpPipe(req *http.Request) {
+	ctx.mu.Lock()
+	defer ctx.mu.Unlock()
+	delete(ctx.pipes, req)
+}
+
 // Creates a new WebSocket Proxy object.
 func NewWSPR(port int, veyronProxyEP string, opts ...veyron2.ROpt) *WSPR {
 	if veyronProxyEP == "" {