blob: 4237f8fdd6992b634465054e2f4a65442380b389 [file] [log] [blame]
// A client stream helper.
package client
import (
"veyron/services/wsprd/lib"
"veyron2/ipc"
)
// A message that will be passed to the writeLoop function that will
// eventually write the message out to the stream.
type streamMessage struct {
// The data to put on the stream.
data interface{}
// The client writer that will be used to send errors.
w lib.ClientWriter
}
// A stream that will eventually write messages to the underlying stream.
// It isn't initialized with a stream, but rather a chan that will eventually
// provide a stream, so that it can accept sends before the underlying stream
// has been set up.
type QueueingStream chan *streamMessage
// Creates and returns a queueing stream that will starting writing to the
// stream provided by the ready channel. It is expected that ready will only
// provide a single stream.
// TODO(bjornick): allow for ready to pass an error if the stream had any issues
// setting up.
func StartQueueingStream(ready chan ipc.Stream) QueueingStream {
s := make(QueueingStream, 100)
go s.writeLoop(ready)
return s
}
func (q QueueingStream) Send(item interface{}, w lib.ClientWriter) {
// TODO(bjornick): Reject the message if the queue is too long.
message := streamMessage{data: item, w: w}
q <- &message
}
func (q QueueingStream) Close() error {
close(q)
return nil
}
func (q QueueingStream) writeLoop(ready chan ipc.Stream) {
stream := <-ready
for value, ok := <-q; ok; value, ok = <-q {
if !ok {
break
}
if err := stream.Send(value.data); err != nil {
value.w.Error(err)
}
}
// If the stream is on the client side, then also close the stream.
if call, ok := stream.(ipc.Call); ok {
call.CloseSend()
}
}