blob: b4c83e9f0820fdf7b642767bbb8982755a1860e9 [file] [log] [blame]
Bogdan Caprita490a4512014-11-20 21:12:19 -08001package modules
2
3import (
4 "io"
5 "sync"
6
7 // TODO(caprita): Move upcqueue into veyron/lib.
8 "veyron.io/veyron/veyron/runtimes/google/lib/upcqueue"
9)
10
Bogdan Caprita490a4512014-11-20 21:12:19 -080011// queueRW implements a ReadWriteCloser backed by an unbounded in-memory
12// producer-consumer queue.
13type queueRW struct {
14 sync.Mutex
15 q *upcqueue.T
16 buf []byte
17 buffered int
18 cancel chan struct{}
19}
20
21func newRW() io.ReadWriteCloser {
22 return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})}
23}
24
25func (q *queueRW) Close() error {
26 // We use an empty message to signal EOF to the reader.
27 _, err := q.Write([]byte{})
28 return err
29}
30
31func (q *queueRW) Read(p []byte) (n int, err error) {
32 q.Lock()
33 defer q.Unlock()
34 for q.buffered == 0 {
35 elem, err := q.q.Get(q.cancel)
36 if err != nil {
37 return 0, io.EOF
38 }
39 s := elem.(string)
40 b := []byte(s)
41 if len(b) == 0 {
42 close(q.cancel)
43 return 0, io.EOF
44 }
45 q.buf, q.buffered = b, len(b)
46 }
47 copied := copy(p, q.buf[:q.buffered])
Bogdan Caprita2f5585b2014-11-21 17:32:21 -080048 q.buf = q.buf[copied:]
Bogdan Caprita490a4512014-11-20 21:12:19 -080049 q.buffered -= copied
50 return copied, nil
51}
52
53func (q *queueRW) Write(p []byte) (n int, err error) {
54 if err := q.q.Put(string(p)); err != nil {
55 return 0, err
56 }
57 return len(p), nil
58}