blob: 1ded31dda82e367f5d798102cc7ea1c0cc8cbbe8 [file] [log] [blame]
Shyam Jayaraman41dceff2014-09-18 14:43:06 -07001package app
2
3import (
4 "fmt"
5 "veyron.io/veyron/veyron/services/wsprd/lib"
6 "veyron.io/veyron/veyron2/ipc"
7 "veyron.io/veyron/veyron2/vom"
8)
9
10type initConfig struct {
11 stream ipc.Stream
12 inType vom.Type
13}
14
15type message struct {
16 data string
17 writer lib.ClientWriter
18}
19
20// oustandingStream provides a stream-like api with the added ability to
21// queue up messages if the stream hasn't been initialized first. send
22// can be called before init has been called, but no data will be sent
23// until init is called.
24type outstandingStream struct {
25 // The channel on which the stream and the type
26 // of data on the stream is sent after the stream
27 // has been constructed.
28 initChan chan *initConfig
29 // The queue of messages to write out.
30 messages chan *message
31 // done will be notified when the stream has been closed.
32 done chan bool
33}
34
35func newStream() *outstandingStream {
36 os := &outstandingStream{
37 initChan: make(chan *initConfig, 1),
38 // We allow queueing up to 100 messages before init is called.
39 // TODO(bjornick): Deal with the case that the queue is full.
40 messages: make(chan *message, 100),
41 done: make(chan bool),
42 }
43 go os.loop()
44 return os
45}
46
47func (os *outstandingStream) send(data string, w lib.ClientWriter) {
48 os.messages <- &message{data, w}
49}
50
51func (os *outstandingStream) end() {
52 close(os.messages)
53}
54
55// Waits until the stream has been closed and all the messages
56// have been drained.
57func (os *outstandingStream) waitUntilDone() {
58 <-os.done
59}
60
61func (os *outstandingStream) loop() {
62 config := <-os.initChan
63 for msg := range os.messages {
64 payload, err := vom.JSONToObject(msg.data, config.inType)
65 if err != nil {
66 msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
67 continue
68 }
69 if err := config.stream.Send(payload); err != nil {
70 msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
71 }
72
73 }
74 close(os.done)
75 // If this is a client rpc, we need to call CloseSend on it.
76 if call, ok := config.stream.(ipc.Call); ok {
77 call.CloseSend()
78 }
79}
80
81func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
82 os.initChan <- &initConfig{stream, inType}
83}