veyron/services/wsprd: Remove the signature call from the main websocket
goroutine.

The call was in the main go routine because we need to know the type of
the stream before we could put in in the outstandingStream map.  The
stream itself could be added later, but the type could, so the fix was
to change the asynchronous stream code to allow delaying the
initializing of both the stream and the type of the stream.

This change also fixes a long standing issue that sending on the server
stream blocked the main pipe thread.

Change-Id: I2e1dd8312d698da7470f888275337edd3bcbed19
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 06a226f..5037b62 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -12,9 +12,7 @@
 	"time"
 
 	"veyron.io/veyron/veyron/services/wsprd/identity"
-	"veyron.io/veyron/veyron/services/wsprd/ipc/client"
 	"veyron.io/veyron/veyron/services/wsprd/ipc/server"
-	"veyron.io/veyron/veyron/services/wsprd/ipc/stream"
 	"veyron.io/veyron/veyron/services/wsprd/lib"
 	"veyron.io/veyron/veyron/services/wsprd/signature"
 	"veyron.io/veyron/veyron2"
@@ -65,11 +63,6 @@
 	Service  signature.JSONServiceSignature
 }
 
-type outstandingStream struct {
-	stream stream.Sender
-	inType vom.Type
-}
-
 type jsonCaveatValidator struct {
 	Type string `json:"_type"`
 	Data json.RawMessage
@@ -106,7 +99,7 @@
 	lastGeneratedId int64
 
 	// Streams for the outstanding requests.
-	outstandingStreams map[int64]outstandingStream
+	outstandingStreams map[int64]*outstandingStream
 
 	// Maps flowids to the server that owns them.
 	flowMap map[int64]*server.Server
@@ -227,22 +220,29 @@
 
 // CreateNewFlow creats a new server flow that will be used to write out
 // streaming messages to Javascript.
-func (c *Controller) CreateNewFlow(s *server.Server, stream stream.Sender) *server.Flow {
+func (c *Controller) CreateNewFlow(s *server.Server, stream ipc.Stream) *server.Flow {
 	c.Lock()
 	defer c.Unlock()
 	id := c.lastGeneratedId
 	c.lastGeneratedId += 2
 	c.flowMap[id] = s
-	c.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}}
+	os := newStream()
+	os.init(stream, vom_wiretype.Type{ID: 1})
+	c.outstandingStreams[id] = os
 	return &server.Flow{ID: id, Writer: c.writerCreator(id)}
 }
 
 // CleanupFlow removes the bookkeping for a previously created flow.
 func (c *Controller) CleanupFlow(id int64) {
 	c.Lock()
-	defer c.Unlock()
+	stream := c.outstandingStreams[id]
 	delete(c.outstandingStreams, id)
 	delete(c.flowMap, id)
+	c.Unlock()
+	if stream != nil {
+		stream.end()
+		stream.waitUntilDone()
+	}
 }
 
 // GetLogger returns a Veyron logger to use.
@@ -269,12 +269,7 @@
 	c.Lock()
 	defer c.Unlock()
 	for _, stream := range c.outstandingStreams {
-		_ = stream
-		// TODO(bjornick): this is impossible type assertion and
-		// will panic at run time.
-		// if call, ok := stream.stream.(ipc.Call); ok {
-		// 	call.Cancel()
-		// }
+		stream.end()
 	}
 
 	for _, server := range c.servers {
@@ -284,58 +279,90 @@
 
 func (c *Controller) setup() {
 	c.signatureManager = lib.NewSignatureManager()
-	c.outstandingStreams = make(map[int64]outstandingStream)
+	c.outstandingStreams = make(map[int64]*outstandingStream)
 	c.flowMap = make(map[int64]*server.Server)
 	c.servers = make(map[uint64]*server.Server)
 }
 
-func (c *Controller) sendParsedMessageOnStream(id int64, msg interface{}, w lib.ClientWriter) {
-	c.Lock()
-	defer c.Unlock()
-	stream := c.outstandingStreams[id].stream
-	if stream == nil {
-		w.Error(fmt.Errorf("unknown stream"))
-		return
-	}
-
-	stream.Send(msg, w)
-
-}
-
-// SendOnStream writes data on id's stream.  Returns an error if the send failed.
+// SendOnStream writes data on id's stream.  The actual network write will be
+// done asynchronously.  If there is an error, it will be sent to w.
 func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
 	c.Lock()
-	typ := c.outstandingStreams[id].inType
+	stream := c.outstandingStreams[id]
 	c.Unlock()
-	if typ == nil {
-		vlog.Errorf("no inType for stream %d (%q)", id, data)
+
+	if stream == nil {
+		vlog.Errorf("unknown stream: %d", id)
 		return
 	}
-	payload, err := vom.JSONToObject(data, typ)
-	if err != nil {
-		vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
-		return
-	}
-	c.sendParsedMessageOnStream(id, payload, w)
+	stream.send(data, w)
 }
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
 // the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx context.T, id int64, veyronMsg *veyronRPC, w lib.ClientWriter, signal chan ipc.Stream) {
+func (c *Controller) sendVeyronRequest(ctx context.T, id int64, tempMsg *veyronTempRPC, w lib.ClientWriter, stream *outstandingStream) {
+	// Fetch and adapt signature from the SignatureManager
+	retryTimeoutOpt := veyron2.RetryTimeoutOpt(time.Duration(*retryTimeout) * time.Second)
+	sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
+	if err != nil {
+		w.Error(verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err))
+		return
+	}
+
+	methName := lib.UppercaseFirstCharacter(tempMsg.Method)
+	methSig, ok := sig.Methods[methName]
+	if !ok {
+		w.Error(fmt.Errorf("method not found in signature: %v (full sig: %v)", methName, sig))
+		return
+	}
+
+	var msg veyronRPC
+	if len(methSig.InArgs) != len(tempMsg.InArgs) {
+		w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, tempMsg))
+		return
+	}
+	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
+	td := wiretype_build.TypeDefs(sig.TypeDefs)
+
+	for i := 0; i < len(tempMsg.InArgs); i++ {
+		argTypeId := methSig.InArgs[i].Type
+		argType := vom_wiretype.Type{
+			ID:   argTypeId,
+			Defs: &td,
+		}
+
+		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
+		if err != nil {
+			w.Error(fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err))
+			return
+		}
+		msg.InArgs[i] = val
+	}
+
+	msg.Name = tempMsg.Name
+	msg.Method = tempMsg.Method
+	msg.NumOutArgs = tempMsg.NumOutArgs
+	msg.IsStreaming = tempMsg.IsStreaming
+
+	inStreamType := vom_wiretype.Type{
+		ID:   methSig.InStream,
+		Defs: &td,
+	}
+
 	// We have to make the start call synchronous so we can make sure that we populate
 	// the call map before we can Handle a recieve call.
-	call, err := c.startCall(ctx, w, veyronMsg)
+	call, err := c.startCall(ctx, w, &msg)
 	if err != nil {
 		w.Error(verror.Internalf("can't start Veyron Request: %v", err))
 		return
 	}
 
-	if signal != nil {
-		signal <- call
+	if stream != nil {
+		stream.init(call, inStreamType)
 	}
 
-	c.finishCall(w, call, veyronMsg)
-	if signal != nil {
+	c.finishCall(w, call, &msg)
+	if stream != nil {
 		c.Lock()
 		delete(c.outstandingStreams, id)
 		c.Unlock()
@@ -344,50 +371,38 @@
 
 // HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
 func (c *Controller) HandleVeyronRequest(ctx context.T, id int64, data string, w lib.ClientWriter) {
-	veyronMsg, inStreamType, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
+	veyronTempMsg, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
 	if err != nil {
 		w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
 		return
 	}
 
-	c.Lock()
-	defer c.Unlock()
 	// If this rpc is streaming, we would expect that the client would try to send
 	// on this stream.  Since the initial handshake is done asynchronously, we have
-	// to basically put a queueing stream in the map before we make the async call
-	// so that the future sends on the stream can see the queuing stream, even if
-	// the client call isn't actually ready yet.
-	var signal chan ipc.Stream
-	if veyronMsg.IsStreaming {
-		signal = make(chan ipc.Stream)
-		c.outstandingStreams[id] = outstandingStream{
-			stream: client.StartQueueingStream(signal),
-			inType: inStreamType,
-		}
+	// to put the outstanding stream in the map before we make the async call so that
+	// the future send know which queue to write to, even if the client call isn't
+	// actually ready yet.
+	var stream *outstandingStream
+	if veyronTempMsg.IsStreaming {
+		stream = newStream()
+		c.Lock()
+		c.outstandingStreams[id] = stream
+		c.Unlock()
 	}
-	go c.sendVeyronRequest(ctx, id, veyronMsg, w, signal)
+	go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
 }
 
 // CloseStream closes the stream for a given id.
 func (c *Controller) CloseStream(id int64) {
 	c.Lock()
 	defer c.Unlock()
-	stream := c.outstandingStreams[id].stream
+	stream := c.outstandingStreams[id]
 	if stream == nil {
 		c.logger.Errorf("close called on non-existent call: %v", id)
 		return
 	}
 
-	var call client.QueueingStream
-	var ok bool
-	if call, ok = stream.(client.QueueingStream); !ok {
-		c.logger.Errorf("can't close server stream: %v", id)
-		return
-	}
-
-	if err := call.Close(); err != nil {
-		c.logger.Errorf("client call close failed with: %v", err)
-	}
+	stream.end()
 }
 
 func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {
@@ -485,59 +500,14 @@
 }
 
 // parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronRPC, vom.Type, error) {
+func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronTempRPC, error) {
 	var tempMsg veyronTempRPC
 	decoder := json.NewDecoder(r)
 	if err := decoder.Decode(&tempMsg); err != nil {
-		return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+		return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
 	}
-
-	// Fetch and adapt signature from the SignatureManager
-	retryTimeoutOpt := veyron2.RetryTimeoutOpt(time.Duration(*retryTimeout) * time.Second)
-	sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
-	if err != nil {
-		return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
-	}
-
-	methName := lib.UppercaseFirstCharacter(tempMsg.Method)
-	methSig, ok := sig.Methods[methName]
-	if !ok {
-		return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
-	}
-
-	var msg veyronRPC
-	if len(methSig.InArgs) != len(tempMsg.InArgs) {
-		return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
-	}
-	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
-	td := wiretype_build.TypeDefs(sig.TypeDefs)
-
-	for i := 0; i < len(tempMsg.InArgs); i++ {
-		argTypeId := methSig.InArgs[i].Type
-		argType := vom_wiretype.Type{
-			ID:   argTypeId,
-			Defs: &td,
-		}
-
-		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
-		if err != nil {
-			return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
-		}
-		msg.InArgs[i] = val
-	}
-
-	msg.Name = tempMsg.Name
-	msg.Method = tempMsg.Method
-	msg.NumOutArgs = tempMsg.NumOutArgs
-	msg.IsStreaming = tempMsg.IsStreaming
-
-	inStreamType := vom_wiretype.Type{
-		ID:   methSig.InStream,
-		Defs: &td,
-	}
-
-	c.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
-	return &msg, inStreamType, nil
+	c.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", tempMsg.Name, tempMsg.Method, tempMsg.IsStreaming)
+	return &tempMsg, nil
 }
 
 type signatureRequest struct {
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index f0b50d5..adcf1a8 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -9,7 +9,6 @@
 	"sync"
 	"testing"
 	"time"
-	"veyron.io/veyron/veyron/services/wsprd/ipc/client"
 	"veyron.io/veyron/veyron/services/wsprd/lib"
 	"veyron.io/veyron/veyron/services/wsprd/signature"
 	"veyron.io/veyron/veyron2"
@@ -261,7 +260,7 @@
 
 type goServerTestCase struct {
 	method             string
-	inArgs             []interface{}
+	inArgs             []json.RawMessage
 	numOutArgs         int32
 	streamingInputs    []string
 	streamingInputType vom.Type
@@ -289,13 +288,10 @@
 	writer := testWriter{
 		logger: controller.logger,
 	}
-	var signal chan ipc.Stream
+	var stream *outstandingStream
 	if len(test.streamingInputs) > 0 {
-		signal = make(chan ipc.Stream, 1)
-		controller.outstandingStreams[0] = outstandingStream{
-			stream: client.StartQueueingStream(signal),
-			inType: test.streamingInputType,
-		}
+		stream = newStream()
+		controller.outstandingStreams[0] = stream
 		go func() {
 			for _, value := range test.streamingInputs {
 				controller.SendOnStream(0, value, &writer)
@@ -304,14 +300,14 @@
 		}()
 	}
 
-	request := veyronRPC{
+	request := veyronTempRPC{
 		Name:        "/" + endpoint.String(),
 		Method:      test.method,
 		InArgs:      test.inArgs,
 		NumOutArgs:  test.numOutArgs,
-		IsStreaming: signal != nil,
+		IsStreaming: stream != nil,
 	}
-	controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, signal)
+	controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, stream)
 
 	checkResponses(&writer, test.expectedStream, test.expectedError, t)
 }
@@ -319,7 +315,7 @@
 func TestCallingGoServer(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
 		method:     "Add",
-		inArgs:     []interface{}{2, 3},
+		inArgs:     []json.RawMessage{json.RawMessage("2"), json.RawMessage("3")},
 		numOutArgs: 2,
 		expectedStream: []response{
 			response{
@@ -333,7 +329,7 @@
 func TestCallingGoServerWithError(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
 		method:        "Divide",
-		inArgs:        []interface{}{1, 0},
+		inArgs:        []json.RawMessage{json.RawMessage("1"), json.RawMessage("0")},
 		numOutArgs:    2,
 		expectedError: verror.BadArgf("can't divide by zero"),
 	})
@@ -342,7 +338,7 @@
 func TestCallingGoWithStreaming(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
 		method:             "StreamingAdd",
-		inArgs:             []interface{}{},
+		inArgs:             []json.RawMessage{},
 		streamingInputs:    []string{"1", "2", "3", "4"},
 		streamingInputType: vom_wiretype.Type{ID: 36},
 		numOutArgs:         2,
@@ -484,14 +480,15 @@
 	// The set of streaming inputs from the client to the server.
 	clientStream []interface{}
 	// The set of streaming outputs from the server to the client.
-	serverStream  []interface{}
-	finalResponse interface{}
-	err           *verror.Standard
+	serverStream         []string
+	expectedServerStream []interface{}
+	finalResponse        interface{}
+	err                  *verror.Standard
 }
 
 func sendServerStream(t *testing.T, controller *Controller, test *jsServerTestCase, w lib.ClientWriter) {
 	for _, msg := range test.serverStream {
-		controller.sendParsedMessageOnStream(0, msg, w)
+		controller.SendOnStream(0, msg, w)
 	}
 
 	serverReply := map[string]interface{}{
@@ -593,7 +590,7 @@
 
 	expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStreamClose})
 
-	expectedStream := test.serverStream
+	expectedStream := test.expectedServerStream
 	go sendServerStream(t, rt.controller, &test, rt.writer)
 	for {
 		var data interface{}
@@ -666,20 +663,22 @@
 
 func TestJSServerWihStreamingOutputs(t *testing.T) {
 	runJsServerTestCase(t, jsServerTestCase{
-		method:        "StreamingAdd",
-		inArgs:        []interface{}{},
-		serverStream:  []interface{}{3.0, 4.0},
-		finalResponse: 10.0,
+		method:               "StreamingAdd",
+		inArgs:               []interface{}{},
+		serverStream:         []string{"3", "4"},
+		expectedServerStream: []interface{}{3.0, 4.0},
+		finalResponse:        10.0,
 	})
 }
 
 func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) {
 	runJsServerTestCase(t, jsServerTestCase{
-		method:        "StreamingAdd",
-		inArgs:        []interface{}{},
-		clientStream:  []interface{}{1.0, 2.0},
-		serverStream:  []interface{}{3.0, 4.0},
-		finalResponse: 10.0,
+		method:               "StreamingAdd",
+		inArgs:               []interface{}{},
+		clientStream:         []interface{}{1.0, 2.0},
+		serverStream:         []string{"3", "4"},
+		expectedServerStream: []interface{}{3.0, 4.0},
+		finalResponse:        10.0,
 	})
 }
 
diff --git a/services/wsprd/app/stream.go b/services/wsprd/app/stream.go
new file mode 100644
index 0000000..1ded31d
--- /dev/null
+++ b/services/wsprd/app/stream.go
@@ -0,0 +1,83 @@
+package app
+
+import (
+	"fmt"
+	"veyron.io/veyron/veyron/services/wsprd/lib"
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/vom"
+)
+
+type initConfig struct {
+	stream ipc.Stream
+	inType vom.Type
+}
+
+type message struct {
+	data   string
+	writer lib.ClientWriter
+}
+
+// oustandingStream provides a stream-like api with the added ability to
+// queue up messages if the stream hasn't been initialized first.  send
+// can be called before init has been called, but no data will be sent
+// until init is called.
+type outstandingStream struct {
+	// The channel on which the stream and the type
+	// of data on the stream is sent after the stream
+	// has been constructed.
+	initChan chan *initConfig
+	// The queue of messages to write out.
+	messages chan *message
+	// done will be notified when the stream has been closed.
+	done chan bool
+}
+
+func newStream() *outstandingStream {
+	os := &outstandingStream{
+		initChan: make(chan *initConfig, 1),
+		// We allow queueing up to 100 messages before init is called.
+		// TODO(bjornick): Deal with the case that the queue is full.
+		messages: make(chan *message, 100),
+		done:     make(chan bool),
+	}
+	go os.loop()
+	return os
+}
+
+func (os *outstandingStream) send(data string, w lib.ClientWriter) {
+	os.messages <- &message{data, w}
+}
+
+func (os *outstandingStream) end() {
+	close(os.messages)
+}
+
+// Waits until the stream has been closed and all the messages
+// have been drained.
+func (os *outstandingStream) waitUntilDone() {
+	<-os.done
+}
+
+func (os *outstandingStream) loop() {
+	config := <-os.initChan
+	for msg := range os.messages {
+		payload, err := vom.JSONToObject(msg.data, config.inType)
+		if err != nil {
+			msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
+			continue
+		}
+		if err := config.stream.Send(payload); err != nil {
+			msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
+		}
+
+	}
+	close(os.done)
+	// If this is a client rpc, we need to call CloseSend on it.
+	if call, ok := config.stream.(ipc.Call); ok {
+		call.CloseSend()
+	}
+}
+
+func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
+	os.initChan <- &initConfig{stream, inType}
+}
diff --git a/services/wsprd/ipc/client/stream.go b/services/wsprd/ipc/client/stream.go
deleted file mode 100644
index 0db5bda..0000000
--- a/services/wsprd/ipc/client/stream.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// A client stream helper.
-
-package client
-
-import (
-	"veyron.io/veyron/veyron/services/wsprd/lib"
-	"veyron.io/veyron/veyron2/ipc"
-)
-
-// A message that will be passed to the writeLoop function that will
-// eventually write the message out to the stream.
-type streamMessage struct {
-	// The data to put on the stream.
-	data interface{}
-	// The client writer that will be used to send errors.
-	w lib.ClientWriter
-}
-
-// A stream that will eventually write messages to the underlying stream.
-// It isn't initialized with a stream, but rather a chan that will eventually
-// provide a stream, so that it can accept sends before the underlying stream
-// has been set up.
-type QueueingStream chan *streamMessage
-
-// Creates and returns a queueing stream that will starting writing to the
-// stream provided by the ready channel.  It is expected that ready will only
-// provide a single stream.
-// TODO(bjornick): allow for ready to pass an error if the stream had any issues
-// setting up.
-func StartQueueingStream(ready chan ipc.Stream) QueueingStream {
-	s := make(QueueingStream, 100)
-	go s.writeLoop(ready)
-	return s
-}
-
-func (q QueueingStream) Send(item interface{}, w lib.ClientWriter) {
-	// TODO(bjornick): Reject the message if the queue is too long.
-	message := streamMessage{data: item, w: w}
-	q <- &message
-}
-
-func (q QueueingStream) Close() error {
-	close(q)
-	return nil
-}
-
-func (q QueueingStream) writeLoop(ready chan ipc.Stream) {
-	stream := <-ready
-	for value, ok := <-q; ok; value, ok = <-q {
-		if !ok {
-			break
-		}
-		if err := stream.Send(value.data); err != nil {
-			value.w.Error(err)
-		}
-	}
-
-	// If the stream is on the client side, then also close the stream.
-	if call, ok := stream.(ipc.Call); ok {
-		call.CloseSend()
-	}
-}
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index 35b6e85..4e7b627 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -9,7 +9,6 @@
 	"sync"
 
 	vsecurity "veyron.io/veyron/veyron/security"
-	"veyron.io/veyron/veyron/services/wsprd/ipc/stream"
 	"veyron.io/veyron/veyron/services/wsprd/lib"
 	"veyron.io/veyron/veyron/services/wsprd/signature"
 
@@ -52,7 +51,7 @@
 }
 
 type FlowHandler interface {
-	CreateNewFlow(server *Server, sender stream.Sender) *Flow
+	CreateNewFlow(server *Server, sender ipc.Stream) *Flow
 
 	CleanupFlow(id int64)
 }
@@ -116,7 +115,7 @@
 
 func (s *Server) createRemoteInvokerFunc() remoteInvokeFunc {
 	return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
-		flow := s.helper.CreateNewFlow(s, senderWrapper{stream: call})
+		flow := s.helper.CreateNewFlow(s, call)
 		replyChan := make(chan *serverRPCReply, 1)
 		s.Lock()
 		s.outstandingServerRequests[flow.ID] = replyChan
diff --git a/services/wsprd/ipc/server/stream.go b/services/wsprd/ipc/server/stream.go
deleted file mode 100644
index 3ac5fc8..0000000
--- a/services/wsprd/ipc/server/stream.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package server
-
-import (
-	"veyron.io/veyron/veyron/services/wsprd/lib"
-	"veyron.io/veyron/veyron2/ipc"
-)
-
-// A simple struct that wraps a stream with the sender api.  It
-// will write to the stream synchronously.  Any error will still
-// be written to clientWriter.
-type senderWrapper struct {
-	stream ipc.Stream
-}
-
-func (s senderWrapper) Send(item interface{}, w lib.ClientWriter) {
-	if err := s.stream.Send(item); err != nil {
-		w.Error(err)
-	}
-}
diff --git a/services/wsprd/ipc/stream/stream.go b/services/wsprd/ipc/stream/stream.go
deleted file mode 100644
index ad9e371..0000000
--- a/services/wsprd/ipc/stream/stream.go
+++ /dev/null
@@ -1,14 +0,0 @@
-// The set of streaming helper objects for wspr.
-
-package stream
-
-import (
-	"veyron.io/veyron/veyron/services/wsprd/lib"
-)
-
-// An interface for an asynchronous sender.
-type Sender interface {
-	// Similar to ipc.Stream.Send, except that instead of
-	// returning an error, w.sendError will be called.
-	Send(item interface{}, w lib.ClientWriter)
-}
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
index da1dc4e..27d9090 100644
--- a/services/wsprd/wspr/pipe.go
+++ b/services/wsprd/wspr/pipe.go
@@ -251,12 +251,9 @@
 			ctx := p.wspr.rt.NewContext()
 			p.controller.HandleVeyronRequest(ctx, msg.Id, msg.Data, ww)
 		case websocketStreamingValue:
-			// This will asynchronous for a client rpc, but synchronous for a
-			// server rpc.  This could be potentially bad if the server is sending
-			// back large packets.  Making it asynchronous for the server, would make
-			// it difficult to guarantee that all stream messages make it to the client
-			// before the finish call.
-			// TODO(bjornick): Make the server send also asynchronous.
+			// SendOnStream queues up the message to be sent, but doesn't do the send
+			// on this goroutine.  We need to queue the messages synchronously so that
+			// the order is preserved.
 			p.controller.SendOnStream(msg.Id, msg.Data, ww)
 		case websocketStreamClose:
 			p.controller.CloseStream(msg.Id)