Bogdan Caprita | 490a451 | 2014-11-20 21:12:19 -0800 | [diff] [blame] | 1 | package modules |
| 2 | |
| 3 | import ( |
| 4 | "io" |
| 5 | "sync" |
| 6 | |
| 7 | // TODO(caprita): Move upcqueue into veyron/lib. |
| 8 | "veyron.io/veyron/veyron/runtimes/google/lib/upcqueue" |
| 9 | ) |
| 10 | |
Bogdan Caprita | 490a451 | 2014-11-20 21:12:19 -0800 | [diff] [blame] | 11 | // queueRW implements a ReadWriteCloser backed by an unbounded in-memory |
| 12 | // producer-consumer queue. |
| 13 | type queueRW struct { |
| 14 | sync.Mutex |
| 15 | q *upcqueue.T |
| 16 | buf []byte |
| 17 | buffered int |
| 18 | cancel chan struct{} |
| 19 | } |
| 20 | |
| 21 | func newRW() io.ReadWriteCloser { |
| 22 | return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})} |
| 23 | } |
| 24 | |
| 25 | func (q *queueRW) Close() error { |
| 26 | // We use an empty message to signal EOF to the reader. |
| 27 | _, err := q.Write([]byte{}) |
| 28 | return err |
| 29 | } |
| 30 | |
| 31 | func (q *queueRW) Read(p []byte) (n int, err error) { |
| 32 | q.Lock() |
| 33 | defer q.Unlock() |
| 34 | for q.buffered == 0 { |
| 35 | elem, err := q.q.Get(q.cancel) |
| 36 | if err != nil { |
| 37 | return 0, io.EOF |
| 38 | } |
| 39 | s := elem.(string) |
| 40 | b := []byte(s) |
| 41 | if len(b) == 0 { |
| 42 | close(q.cancel) |
| 43 | return 0, io.EOF |
| 44 | } |
| 45 | q.buf, q.buffered = b, len(b) |
| 46 | } |
| 47 | copied := copy(p, q.buf[:q.buffered]) |
Bogdan Caprita | 2f5585b | 2014-11-21 17:32:21 -0800 | [diff] [blame] | 48 | q.buf = q.buf[copied:] |
Bogdan Caprita | 490a451 | 2014-11-20 21:12:19 -0800 | [diff] [blame] | 49 | q.buffered -= copied |
| 50 | return copied, nil |
| 51 | } |
| 52 | |
| 53 | func (q *queueRW) Write(p []byte) (n int, err error) { |
| 54 | if err := q.q.Put(string(p)); err != nil { |
| 55 | return 0, err |
| 56 | } |
| 57 | return len(p), nil |
| 58 | } |