blob: cd61bc75fbe9138713bd54cc4b95bfec08bca7fc [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"io"
"sync"
"v.io/v23/context"
)
type readq struct {
mu sync.Mutex
bufs [][]byte
b, e int
size int
nbufs int
toRelease int
notify chan struct{}
}
const initialReadqBufferSize = 10
func newReadQ() *readq {
return &readq{
bufs: make([][]byte, initialReadqBufferSize),
notify: make(chan struct{}, 1),
toRelease: defaultBufferSize,
}
}
func (r *readq) put(ctx *context.T, bufs [][]byte) error {
l := 0
for _, b := range bufs {
l += len(b)
}
if l == 0 {
return nil
}
defer r.mu.Unlock()
r.mu.Lock()
if r.e == -1 {
// The flow has already closed. Simply drop the data.
return nil
}
newSize := l + r.size
if newSize > defaultBufferSize {
return NewErrCounterOverflow(ctx)
}
newBufs := r.nbufs + len(bufs)
r.reserveLocked(newBufs)
for _, b := range bufs {
r.bufs[r.e] = b
r.e = (r.e + 1) % len(r.bufs)
}
r.nbufs = newBufs
if r.size == 0 {
select {
case r.notify <- struct{}{}:
default:
}
}
r.size = newSize
return nil
}
func (r *readq) read(ctx *context.T, data []byte) (n int, release bool, err error) {
defer r.mu.Unlock()
r.mu.Lock()
if err := r.waitLocked(ctx); err != nil {
return 0, false, err
}
buf := r.bufs[r.b]
n = copy(data, buf)
buf = buf[n:]
if len(buf) > 0 {
r.bufs[r.b] = buf
} else {
r.nbufs -= 1
r.b = (r.b + 1) % len(r.bufs)
}
r.size -= n
r.toRelease += n
return n, r.toRelease > defaultBufferSize/2, nil
}
func (r *readq) get(ctx *context.T) (out []byte, release bool, err error) {
defer r.mu.Unlock()
r.mu.Lock()
if err := r.waitLocked(ctx); err != nil {
return nil, false, err
}
out = r.bufs[r.b]
r.b = (r.b + 1) % len(r.bufs)
r.size -= len(out)
r.nbufs -= 1
r.toRelease += len(out)
return out, r.toRelease > defaultBufferSize/2, nil
}
func (r *readq) waitLocked(ctx *context.T) (err error) {
for r.size == 0 && err == nil {
r.mu.Unlock()
select {
case _, ok := <-r.notify:
if !ok {
err = io.EOF
}
case <-ctx.Done():
if r.size == 0 {
err = ctx.Err()
}
}
r.mu.Lock()
}
return
}
func (r *readq) close(ctx *context.T) {
r.mu.Lock()
if r.e != -1 {
r.e = -1
r.toRelease = 0
close(r.notify)
}
r.mu.Unlock()
}
func (r *readq) reserveLocked(n int) {
if n < len(r.bufs) {
return
}
nb := make([][]byte, 2*n)
copied := 0
if r.e >= r.b {
copied = copy(nb, r.bufs[r.b:r.e])
} else {
copied = copy(nb, r.bufs[r.b:])
copied += copy(nb[copied:], r.bufs[:r.e])
}
r.bufs, r.b, r.e = nb, 0, copied
}
func (r *readq) release() (out int) {
r.mu.Lock()
out, r.toRelease = r.toRelease, 0
r.mu.Unlock()
return out
}