blob: 1ded31dda82e367f5d798102cc7ea1c0cc8cbbe8 [file] [log] [blame]
package app
import (
"fmt"
"veyron.io/veyron/veyron/services/wsprd/lib"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/vom"
)
type initConfig struct {
stream ipc.Stream
inType vom.Type
}
type message struct {
data string
writer lib.ClientWriter
}
// oustandingStream provides a stream-like api with the added ability to
// queue up messages if the stream hasn't been initialized first. send
// can be called before init has been called, but no data will be sent
// until init is called.
type outstandingStream struct {
// The channel on which the stream and the type
// of data on the stream is sent after the stream
// has been constructed.
initChan chan *initConfig
// The queue of messages to write out.
messages chan *message
// done will be notified when the stream has been closed.
done chan bool
}
func newStream() *outstandingStream {
os := &outstandingStream{
initChan: make(chan *initConfig, 1),
// We allow queueing up to 100 messages before init is called.
// TODO(bjornick): Deal with the case that the queue is full.
messages: make(chan *message, 100),
done: make(chan bool),
}
go os.loop()
return os
}
func (os *outstandingStream) send(data string, w lib.ClientWriter) {
os.messages <- &message{data, w}
}
func (os *outstandingStream) end() {
close(os.messages)
}
// Waits until the stream has been closed and all the messages
// have been drained.
func (os *outstandingStream) waitUntilDone() {
<-os.done
}
func (os *outstandingStream) loop() {
config := <-os.initChan
for msg := range os.messages {
payload, err := vom.JSONToObject(msg.data, config.inType)
if err != nil {
msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
continue
}
if err := config.stream.Send(payload); err != nil {
msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
}
}
close(os.done)
// If this is a client rpc, we need to call CloseSend on it.
if call, ok := config.stream.(ipc.Call); ok {
call.CloseSend()
}
}
func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
os.initChan <- &initConfig{stream, inType}
}