veyron/services/wsprd: Remove the signature call from the main websocket
goroutine.

The call was in the main go routine because we need to know the type of
the stream before we could put in in the outstandingStream map.  The
stream itself could be added later, but the type could, so the fix was
to change the asynchronous stream code to allow delaying the
initializing of both the stream and the type of the stream.

This change also fixes a long standing issue that sending on the server
stream blocked the main pipe thread.

Change-Id: I2e1dd8312d698da7470f888275337edd3bcbed19
diff --git a/services/wsprd/app/stream.go b/services/wsprd/app/stream.go
new file mode 100644
index 0000000..1ded31d
--- /dev/null
+++ b/services/wsprd/app/stream.go
@@ -0,0 +1,83 @@
+package app
+
+import (
+	"fmt"
+	"veyron.io/veyron/veyron/services/wsprd/lib"
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/vom"
+)
+
+type initConfig struct {
+	stream ipc.Stream
+	inType vom.Type
+}
+
+type message struct {
+	data   string
+	writer lib.ClientWriter
+}
+
+// oustandingStream provides a stream-like api with the added ability to
+// queue up messages if the stream hasn't been initialized first.  send
+// can be called before init has been called, but no data will be sent
+// until init is called.
+type outstandingStream struct {
+	// The channel on which the stream and the type
+	// of data on the stream is sent after the stream
+	// has been constructed.
+	initChan chan *initConfig
+	// The queue of messages to write out.
+	messages chan *message
+	// done will be notified when the stream has been closed.
+	done chan bool
+}
+
+func newStream() *outstandingStream {
+	os := &outstandingStream{
+		initChan: make(chan *initConfig, 1),
+		// We allow queueing up to 100 messages before init is called.
+		// TODO(bjornick): Deal with the case that the queue is full.
+		messages: make(chan *message, 100),
+		done:     make(chan bool),
+	}
+	go os.loop()
+	return os
+}
+
+func (os *outstandingStream) send(data string, w lib.ClientWriter) {
+	os.messages <- &message{data, w}
+}
+
+func (os *outstandingStream) end() {
+	close(os.messages)
+}
+
+// Waits until the stream has been closed and all the messages
+// have been drained.
+func (os *outstandingStream) waitUntilDone() {
+	<-os.done
+}
+
+func (os *outstandingStream) loop() {
+	config := <-os.initChan
+	for msg := range os.messages {
+		payload, err := vom.JSONToObject(msg.data, config.inType)
+		if err != nil {
+			msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
+			continue
+		}
+		if err := config.stream.Send(payload); err != nil {
+			msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
+		}
+
+	}
+	close(os.done)
+	// If this is a client rpc, we need to call CloseSend on it.
+	if call, ok := config.stream.(ipc.Call); ok {
+		call.CloseSend()
+	}
+}
+
+func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
+	os.initChan <- &initConfig{stream, inType}
+}