blob: 74b50d4a124345ecd7388f03286256afc6cf9b66 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"io"
"sync"
"v.io/v23/context"
)
type readq struct {
mu sync.Mutex
bufs [][]byte
b, e int
closed bool
id uint64
size int
nbufs int
notify chan struct{}
conn *Conn
}
const initialReadqBufferSize = 10
func newReadQ(conn *Conn, id uint64) *readq {
return &readq{
bufs: make([][]byte, initialReadqBufferSize),
notify: make(chan struct{}, 1),
conn: conn,
id: id,
}
}
func (r *readq) put(ctx *context.T, bufs [][]byte) error {
l := 0
for _, b := range bufs {
l += len(b)
}
if l == 0 {
return nil
}
defer r.mu.Unlock()
r.mu.Lock()
if r.closed {
// The flow has already closed. Simply drop the data.
return nil
}
newSize := l + r.size
if newSize > DefaultBytesBufferedPerFlow {
return NewErrCounterOverflow(ctx)
}
newBufs := r.nbufs + len(bufs)
r.reserveLocked(newBufs)
for _, b := range bufs {
r.bufs[r.e] = b
r.e = (r.e + 1) % len(r.bufs)
}
r.nbufs = newBufs
if r.size == 0 {
select {
case r.notify <- struct{}{}:
default:
}
}
r.size = newSize
return nil
}
func (r *readq) read(ctx *context.T, data []byte) (n int, err error) {
r.mu.Lock()
if err = r.waitLocked(ctx); err == nil {
err = nil
buf := r.bufs[r.b]
n = copy(data, buf)
buf = buf[n:]
if len(buf) > 0 {
r.bufs[r.b] = buf
} else {
r.nbufs -= 1
r.b = (r.b + 1) % len(r.bufs)
}
r.size -= n
}
r.mu.Unlock()
if r.conn != nil {
r.conn.release(ctx, r.id, uint64(n))
}
return
}
func (r *readq) get(ctx *context.T) (out []byte, err error) {
r.mu.Lock()
if err = r.waitLocked(ctx); err == nil {
err = nil
out = r.bufs[r.b]
r.b = (r.b + 1) % len(r.bufs)
r.size -= len(out)
r.nbufs -= 1
}
r.mu.Unlock()
if r.conn != nil {
r.conn.release(ctx, r.id, uint64(len(out)))
}
return
}
func (r *readq) waitLocked(ctx *context.T) (err error) {
for r.size == 0 && err == nil {
r.mu.Unlock()
select {
case _, ok := <-r.notify:
if !ok {
err = io.EOF
}
case <-ctx.Done():
err = io.EOF
}
r.mu.Lock()
}
// Even if the flow is closed, if we have data already queued
// we'll let it be read.
if err == io.EOF && r.nbufs > 0 {
err = nil
}
return err
}
func (r *readq) close(ctx *context.T) bool {
r.mu.Lock()
closed := false
if !r.closed {
r.closed = true
closed = true
close(r.notify)
}
r.mu.Unlock()
return closed
}
func (r *readq) reserveLocked(n int) {
if n < len(r.bufs) {
return
}
nb := make([][]byte, 2*n)
copied := 0
if r.e >= r.b {
copied = copy(nb, r.bufs[r.b:r.e])
} else {
copied = copy(nb, r.bufs[r.b:])
copied += copy(nb[copied:], r.bufs[:r.e])
}
r.bufs, r.b, r.e = nb, 0, copied
}