lib/modules: Re-implement queue_rw.go without depending on upcqueue.

I would like to move upcqueue back to internal.  I think the new implementation
is simpler and more correct.  The previous implementation had the rather
odd semantic that writing a zero-byte slice closed the queueRW.

Change-Id: Ie2e96dcca6b8c78afdb1cf9fc0fec958d8fd92c4
diff --git a/lib/modules/queue_rw.go b/lib/modules/queue_rw.go
index 212462c..cc218da 100644
--- a/lib/modules/queue_rw.go
+++ b/lib/modules/queue_rw.go
@@ -1,58 +1,46 @@
 package modules
 
 import (
+	"bytes"
 	"io"
 	"sync"
-
-	// TODO(caprita): Move upcqueue into veyron/lib.
-	"v.io/x/ref/lib/upcqueue"
 )
 
 // queueRW implements a ReadWriteCloser backed by an unbounded in-memory
-// producer-consumer queue.
+// buffer.
 type queueRW struct {
-	sync.Mutex
-	q        *upcqueue.T
-	buf      []byte
-	buffered int
-	cancel   chan struct{}
+	mu     sync.Mutex
+	cond   *sync.Cond
+	buf    bytes.Buffer
+	closed bool
 }
 
 func newRW() io.ReadWriteCloser {
-	return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})}
+	q := &queueRW{}
+	q.cond = sync.NewCond(&q.mu)
+	return q
 }
 
 func (q *queueRW) Close() error {
-	// We use an empty message to signal EOF to the reader.
-	_, err := q.Write([]byte{})
-	return err
+	q.mu.Lock()
+	defer q.mu.Unlock()
+	defer q.cond.Broadcast()
+	q.closed = true
+	return nil
 }
 
 func (q *queueRW) Read(p []byte) (n int, err error) {
-	q.Lock()
-	defer q.Unlock()
-	for q.buffered == 0 {
-		elem, err := q.q.Get(q.cancel)
-		if err != nil {
-			return 0, io.EOF
-		}
-		s := elem.(string)
-		b := []byte(s)
-		if len(b) == 0 {
-			close(q.cancel)
-			return 0, io.EOF
-		}
-		q.buf, q.buffered = b, len(b)
+	q.mu.Lock()
+	defer q.mu.Unlock()
+	for q.buf.Len() == 0 && !q.closed {
+		q.cond.Wait()
 	}
-	copied := copy(p, q.buf[:q.buffered])
-	q.buf = q.buf[copied:]
-	q.buffered -= copied
-	return copied, nil
+	return q.buf.Read(p)
 }
 
 func (q *queueRW) Write(p []byte) (n int, err error) {
-	if err := q.q.Put(string(p)); err != nil {
-		return 0, err
-	}
-	return len(p), nil
+	q.mu.Lock()
+	defer q.mu.Unlock()
+	defer q.cond.Broadcast()
+	return q.buf.Write(p)
 }
diff --git a/lib/modules/queue_rw_test.go b/lib/modules/queue_rw_test.go
index 26f6f19..f38c7ff 100644
--- a/lib/modules/queue_rw_test.go
+++ b/lib/modules/queue_rw_test.go
@@ -28,8 +28,7 @@
 			break
 		}
 	}
-	// This marks EOF.
-	if _, err := q.Write([]byte{}); err != nil {
+	if err := q.Close(); err != nil {
 		t.Fatalf("err %v", err)
 	}
 	readData := make([]byte, 0, size)