blob: 062115bcdef6051ea9d811a3bd8afe68007f8fb3 [file] [log] [blame]
// The set of streaming helper objects for wspr.
package lib
import (
"veyron2/ipc"
)
// An interface for an asynchronous sender.
type sender interface {
// Similar to ipc.Stream.Send, expect that instead of
// returning an error, w.sendError will be called.
Send(item interface{}, w clientWriter)
}
// A message that will be passed to the writeLoop function that will
// eventually write the message out to the stream.
type streamMessage struct {
// The data to put on the stream.
data interface{}
// The client writer that will be used to send errors.
w clientWriter
}
// A stream that will eventually write messages to the underlying stream.
// It isn't initialized with a stream, but rather a chan that will eventually
// provide a stream, so that it can accept sends before the underlying stream
// has been set up.
type queueingStream chan *streamMessage
// Creates and returns a queueing stream that will starting writing to the
// stream provided by the ready channel. It is expected that ready will only
// provide a single stream.
// TODO(bjornick): allow for ready to pass an error if the stream had any issues
// setting up.
func startQueueingStream(ready chan ipc.Stream) queueingStream {
s := make(queueingStream, 100)
go s.writeLoop(ready)
return s
}
func (q queueingStream) Send(item interface{}, w clientWriter) {
// TODO(bjornick): Reject the message if the queue is too long.
message := streamMessage{data: item, w: w}
q <- &message
}
func (q queueingStream) Close() error {
close(q)
return nil
}
func (q queueingStream) writeLoop(ready chan ipc.Stream) {
stream := <-ready
for value, ok := <-q; ok; value, ok = <-q {
if !ok {
break
}
if err := stream.Send(value.data); err != nil {
value.w.sendError(err)
}
}
// If the stream is on the client side, then also close the stream.
if call, ok := stream.(ipc.Call); ok {
call.CloseSend()
}
}
// A simple struct that wraps a stream with the sender api. It
// will write to the stream synchronously. Any error will still
// be written to clientWriter.
type senderWrapper struct {
stream ipc.Stream
}
func (s senderWrapper) Send(item interface{}, w clientWriter) {
if err := s.stream.Send(item); err != nil {
w.sendError(err)
}
}