| // TODO(caprita): Move upcqueue into veyron/lib. |
| "v.io/core/veyron/runtimes/google/lib/upcqueue" |
| // queueRW implements a ReadWriteCloser backed by an unbounded in-memory |
| // producer-consumer queue. |
| func newRW() io.ReadWriteCloser { |
| return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})} |
| func (q *queueRW) Close() error { |
| // We use an empty message to signal EOF to the reader. |
| _, err := q.Write([]byte{}) |
| func (q *queueRW) Read(p []byte) (n int, err error) { |
| elem, err := q.q.Get(q.cancel) |
| q.buf, q.buffered = b, len(b) |
| copied := copy(p, q.buf[:q.buffered]) |
| func (q *queueRW) Write(p []byte) (n int, err error) { |
| if err := q.q.Put(string(p)); err != nil { |