Shyam Jayaraman | 41dceff | 2014-09-18 14:43:06 -0700 | [diff] [blame] | 1 | package app |
| 2 | |
| 3 | import ( |
| 4 | "fmt" |
| 5 | "veyron.io/veyron/veyron/services/wsprd/lib" |
| 6 | "veyron.io/veyron/veyron2/ipc" |
| 7 | "veyron.io/veyron/veyron2/vom" |
| 8 | ) |
| 9 | |
| 10 | type initConfig struct { |
| 11 | stream ipc.Stream |
| 12 | inType vom.Type |
| 13 | } |
| 14 | |
| 15 | type message struct { |
| 16 | data string |
| 17 | writer lib.ClientWriter |
| 18 | } |
| 19 | |
| 20 | // oustandingStream provides a stream-like api with the added ability to |
| 21 | // queue up messages if the stream hasn't been initialized first. send |
| 22 | // can be called before init has been called, but no data will be sent |
| 23 | // until init is called. |
| 24 | type outstandingStream struct { |
| 25 | // The channel on which the stream and the type |
| 26 | // of data on the stream is sent after the stream |
| 27 | // has been constructed. |
| 28 | initChan chan *initConfig |
| 29 | // The queue of messages to write out. |
| 30 | messages chan *message |
| 31 | // done will be notified when the stream has been closed. |
| 32 | done chan bool |
| 33 | } |
| 34 | |
| 35 | func newStream() *outstandingStream { |
| 36 | os := &outstandingStream{ |
| 37 | initChan: make(chan *initConfig, 1), |
| 38 | // We allow queueing up to 100 messages before init is called. |
| 39 | // TODO(bjornick): Deal with the case that the queue is full. |
| 40 | messages: make(chan *message, 100), |
| 41 | done: make(chan bool), |
| 42 | } |
| 43 | go os.loop() |
| 44 | return os |
| 45 | } |
| 46 | |
| 47 | func (os *outstandingStream) send(data string, w lib.ClientWriter) { |
| 48 | os.messages <- &message{data, w} |
| 49 | } |
| 50 | |
| 51 | func (os *outstandingStream) end() { |
| 52 | close(os.messages) |
| 53 | } |
| 54 | |
| 55 | // Waits until the stream has been closed and all the messages |
| 56 | // have been drained. |
| 57 | func (os *outstandingStream) waitUntilDone() { |
| 58 | <-os.done |
| 59 | } |
| 60 | |
| 61 | func (os *outstandingStream) loop() { |
| 62 | config := <-os.initChan |
| 63 | for msg := range os.messages { |
| 64 | payload, err := vom.JSONToObject(msg.data, config.inType) |
| 65 | if err != nil { |
| 66 | msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err)) |
| 67 | continue |
| 68 | } |
| 69 | if err := config.stream.Send(payload); err != nil { |
| 70 | msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err)) |
| 71 | } |
| 72 | |
| 73 | } |
| 74 | close(os.done) |
| 75 | // If this is a client rpc, we need to call CloseSend on it. |
| 76 | if call, ok := config.stream.(ipc.Call); ok { |
| 77 | call.CloseSend() |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) { |
| 82 | os.initChan <- &initConfig{stream, inType} |
| 83 | } |