lib/modules: Re-implement queue_rw.go without depending on upcqueue.
I would like to move upcqueue back to internal. I think the new implementation
is simpler and more correct. The previous implementation had the rather
odd semantic that writing a zero-byte slice closed the queueRW.
Change-Id: Ie2e96dcca6b8c78afdb1cf9fc0fec958d8fd92c4
diff --git a/lib/modules/queue_rw.go b/lib/modules/queue_rw.go
index 212462c..cc218da 100644
--- a/lib/modules/queue_rw.go
+++ b/lib/modules/queue_rw.go
@@ -1,58 +1,46 @@
package modules
import (
+ "bytes"
"io"
"sync"
-
- // TODO(caprita): Move upcqueue into veyron/lib.
- "v.io/x/ref/lib/upcqueue"
)
// queueRW implements a ReadWriteCloser backed by an unbounded in-memory
-// producer-consumer queue.
+// buffer.
type queueRW struct {
- sync.Mutex
- q *upcqueue.T
- buf []byte
- buffered int
- cancel chan struct{}
+ mu sync.Mutex
+ cond *sync.Cond
+ buf bytes.Buffer
+ closed bool
}
func newRW() io.ReadWriteCloser {
- return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})}
+ q := &queueRW{}
+ q.cond = sync.NewCond(&q.mu)
+ return q
}
func (q *queueRW) Close() error {
- // We use an empty message to signal EOF to the reader.
- _, err := q.Write([]byte{})
- return err
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ defer q.cond.Broadcast()
+ q.closed = true
+ return nil
}
func (q *queueRW) Read(p []byte) (n int, err error) {
- q.Lock()
- defer q.Unlock()
- for q.buffered == 0 {
- elem, err := q.q.Get(q.cancel)
- if err != nil {
- return 0, io.EOF
- }
- s := elem.(string)
- b := []byte(s)
- if len(b) == 0 {
- close(q.cancel)
- return 0, io.EOF
- }
- q.buf, q.buffered = b, len(b)
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ for q.buf.Len() == 0 && !q.closed {
+ q.cond.Wait()
}
- copied := copy(p, q.buf[:q.buffered])
- q.buf = q.buf[copied:]
- q.buffered -= copied
- return copied, nil
+ return q.buf.Read(p)
}
func (q *queueRW) Write(p []byte) (n int, err error) {
- if err := q.q.Put(string(p)); err != nil {
- return 0, err
- }
- return len(p), nil
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ defer q.cond.Broadcast()
+ return q.buf.Write(p)
}
diff --git a/lib/modules/queue_rw_test.go b/lib/modules/queue_rw_test.go
index 26f6f19..f38c7ff 100644
--- a/lib/modules/queue_rw_test.go
+++ b/lib/modules/queue_rw_test.go
@@ -28,8 +28,7 @@
break
}
}
- // This marks EOF.
- if _, err := q.Write([]byte{}); err != nil {
+ if err := q.Close(); err != nil {
t.Fatalf("err %v", err)
}
readData := make([]byte, 0, size)