blob: a42731b92cceb4aa49080ae169d85060f478f3c5 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package drrqueue implements a deficit round-robin buffer queue.
//
// Efficient Fair Queueing Using Deficit Round-Robin
// M. Shreedhar and George Varghese
// IEEE/ACM Transactions on Networking, Vol. 4, No. 3, June 1996
//
// The queue supports N-writers and 1-reader queue. By "buffer," we mean []byte
// blocks.
//
// Writers have a priority that takes precedence over the deficit. Writers
// with greater priority are served first. Deficits are not even updated for
// lower priorities when higher priority Writers are being served.
package drrqueue
// LOCKING DISCIPLINE:
//
// Each Writer has a lock, and so does T. Locks are always taken in order:
// Writer.mutex first, then T.mutex. Never take the locks in the opposite
// order.
import (
"fmt"
"io"
"sync"
"v.io/x/ref/runtime/internal/lib/bqueue"
"v.io/x/ref/runtime/internal/lib/deque"
"v.io/x/ref/runtime/internal/lib/iobuf"
vsync "v.io/x/ref/runtime/internal/lib/sync"
)
// T defines the type of round-robin buffer queues. The queue has multiple
// input Writer queues that are served according to the deficit round-robin
// policy.
type T struct {
mutex sync.Mutex
cond *sync.Cond
// active contains an array of active Writers, indexed by Priority.
active [][]*writer
// writers contains all of the Writers.
writers map[bqueue.ID]*writer
// quantum is the amount of data that each Writer can send per round-robin cycle.
quantum int
isClosed bool
}
// Writer is a single bounded input queue supporting a Put operation.
type writer struct {
id bqueue.ID
q *T
priority bqueue.Priority
// free contains the number of free bytes in the writer queue.
//
// INVARIANT: free + size == size of the queue (a constant). This can't be
// computed, because <free> is a semaphore, but it is true nonetheless.
free *vsync.Semaphore
// The following are all protected by the mutex.
mutex sync.Mutex
isClosed bool
contents deque.T
size int // Total number of bytes in the queue.
released int // Number of bytes that can be dequeued (negative for unlimited).
// The following are protected by q.mutex. mutex is not required.
isActive activeMode
deficit int
}
// activeMode has three states:
// busy: The Writer is being updated by Get. It is not in the active list.
// idle: The Writer is inactive. It is not in the active list.
// active: The Writer is in the active list.
type activeMode int
const (
busy activeMode = iota
idle
active
)
// ID returns the numeric identifier for the queue.
func (w *writer) ID() bqueue.ID {
return w.id
}
// Close closes the writer, without discarding the contents. All Put operations
// currently running may, or may not, add their values to the queue. All Put
// operations that happen-after the Close will fail.
func (w *writer) Close() {
w.mutex.Lock()
w.isClosed = true
w.updateStateLocked(false, 0)
w.mutex.Unlock()
w.free.Close()
}
// Shutdown closes the writer as in Close and also discards the contents.
// If removeWriter is true the writer will be removed from the
// associated T's queue entirely, otherwise the now empty writer will
// remain and eventually be returned by a T.Get.
func (w *writer) Shutdown(removeWriter bool) {
w.mutex.Lock()
w.isClosed = true
if !removeWriter {
w.contents.Clear()
w.size = 0
w.updateStateLocked(false, 0)
}
w.mutex.Unlock()
if removeWriter {
w.q.removeWriter(w)
}
w.free.Close()
}
// IsClosed returns true iff the Writer is closed.
func (w *writer) IsClosed() bool {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.isClosed
}
// IsDrained returns true iff the Writer is closed and empty.
func (w *writer) IsDrained() bool {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.isClosed && w.size == 0
}
// Put adds an element to the queue. Put blocks until there is space in the
// Writer. The element is not made available to T.Get until it is released with
// the Release method. Returns an error if the queue is closed or the operation
// is cancelled.
func (w *writer) Put(buf *iobuf.Slice, cancel <-chan struct{}) error {
// Block until there is space in the Writer.
if err := w.free.DecN(uint(buf.Size()), cancel); err != nil {
return err
}
return w.putContents(buf)
}
// TryPut is like Put, but it is nonblocking.
func (w *writer) TryPut(buf *iobuf.Slice) error {
if err := w.free.TryDecN(uint(buf.Size())); err != nil {
return err
}
return w.putContents(buf)
}
func (w *writer) putContents(buf *iobuf.Slice) error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isClosed {
return bqueue.ErrWriterIsClosed
}
w.contents.PushBack(buf)
w.size += buf.Size()
w.updateStateLocked(false, 0)
return nil
}
// Release allows the next <bytes> to be removed from the queue and passed to
// Get. If <bytes> is negative, all messages are released and flow control is
// no longer used.
func (w *writer) Release(bytes int) error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.released < 0 && bytes >= 0 {
return bqueue.ErrCantToggleFlowControl
}
if bytes < 0 {
w.released = -1
} else {
w.released += bytes
}
w.updateStateLocked(false, 0)
return nil
}
// getContents returns as much data as possible, up to the deficit, and then
// updates the state.
func (w *writer) getContents(deficit int) ([]*iobuf.Slice, bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
// Collect the contents into bufs
if w.released >= 0 && deficit > w.released {
deficit = w.released
}
// Writer is closed.
if w.size == 0 && w.isClosed {
return nil, true
}
var consumed int
var bufs []*iobuf.Slice
for w.contents.Size() != 0 {
b := w.contents.Front().(*iobuf.Slice)
size := consumed + b.Size()
if size > deficit {
break
}
consumed = size
bufs = append(bufs, b)
w.contents.PopFront()
}
// Update counters by number of bytes consumed.
w.size -= consumed
// Decrement released, but only if it is nonnegative.
if w.released >= 0 {
w.released -= consumed
if w.released < 0 {
panic("released is negative")
}
}
w.updateStateLocked(true, consumed)
return bufs, bufs != nil
}
// updateStateLocked updates the ready state of the Writer.
//
// REQUIRES: w.mutex is locked.
func (w *writer) updateStateLocked(overrideBusy bool, consumed int) {
w.free.IncN(uint(consumed))
// The w.isActive state does not depend on the deficit.
isActive := (w.size == 0 && w.isClosed) ||
(w.size != 0 && (w.released < 0 || w.contents.Front().(*iobuf.Slice).Size() <= w.released))
w.q.updateWriterState(w, overrideBusy, isActive, consumed)
}
// New returns a new T. Each writer is allowed to send quantum bytes per round-robin cycle.
func New(quantum int) bqueue.T {
q := &T{writers: make(map[bqueue.ID]*writer), quantum: quantum}
q.cond = sync.NewCond(&q.mutex)
return q
}
// Close closes the queue.
func (q *T) Close() {
q.mutex.Lock()
writers := q.writers
q.isClosed = true
q.writers = make(map[bqueue.ID]*writer)
for i := 0; i != len(q.active); i++ {
q.active[i] = nil
}
q.cond.Signal()
q.mutex.Unlock()
// Close the queues outside the q.mutex lock to preserve lock order.
for _, w := range writers {
w.Shutdown(true)
}
}
// NewWriter allocates a new Writer.
func (q *T) NewWriter(id bqueue.ID, p bqueue.Priority, bytes int) (bqueue.Writer, error) {
w := &writer{
id: id,
priority: p,
q: q,
free: vsync.NewSemaphore(),
isActive: idle,
}
w.free.IncN(uint(bytes))
q.mutex.Lock()
defer q.mutex.Unlock()
if q.isClosed {
return nil, bqueue.ErrBQueueIsClosed
}
q.addPriorityLocked(p)
if _, ok := q.writers[w.id]; ok {
return nil, bqueue.ErrWriterAlreadyExists
}
q.writers[w.id] = w
return w, nil
}
// String provides a string representation of the queue.
func (q *T) String() string {
q.mutex.Lock()
defer q.mutex.Unlock()
s := "q{"
for _, w := range q.writers {
s += fmt.Sprintf("Writer{id: %d, size: %d, released: %d}, ", w.id, w.size, w.released)
}
s += "}"
return s
}
// Find returns the queue with the specified ID.
func (q *T) Find(id bqueue.ID) bqueue.Writer {
q.mutex.Lock()
defer q.mutex.Unlock()
w, ok := q.writers[id]
if !ok {
// Don't return w; that would return a non-nil Writer interface
// containing nil.
return nil
}
return w
}
// Get returns the next element from a queue. Get blocks until a buffer is
// available or the queue is closed.
func (q *T) Get(flush bqueue.FlushFunc) (bqueue.Writer, []*iobuf.Slice, error) {
for {
w, deficit, err := q.nextWriter(flush)
if w == nil {
return nil, nil, err
}
bufs, ok := w.getContents(deficit)
if ok {
return w, bufs, nil
}
}
}
// nextWriter walks through the pending buffers and returns the first active
// Writer. The writer is removed from the active queue and made 'busy' so that
// it will not be re-added to the active queue.
func (q *T) nextWriter(flush bqueue.FlushFunc) (*writer, int, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
for {
if q.isClosed {
return nil, 0, io.EOF
}
for p, writers := range q.active {
if len(writers) != 0 {
w := writers[0]
w.isActive = busy
w.deficit += q.quantum
q.active[p] = writers[1:]
return w, w.deficit, nil
}
}
if flush != nil {
flush()
}
q.cond.Wait()
}
}
// addPriorityLocked adds a ready queue with the specified priority level.
//
// REQUIRES: q.mutex is locked.
func (q *T) addPriorityLocked(p bqueue.Priority) {
if int(p) >= len(q.active) {
newActive := make([][]*writer, int(p)+1)
copy(newActive, q.active)
q.active = newActive
}
}
// removeWriter removes the queue from the q.
//
// NOTE: does not require that w.mutex is locked.
func (q *T) removeWriter(w *writer) {
q.mutex.Lock()
if w.isActive == active {
// Remove the writer from the active queue.
active := q.active[w.priority]
for i, w2 := range active {
if w2 == w {
copy(active[i:], active[i+1:])
q.active[w.priority] = active[:len(active)-1]
break
}
}
}
w.isActive = idle
delete(q.writers, w.id)
q.mutex.Unlock()
}
// updateWriterState updates the active state of the queue.
//
// REQUIRES: w.mutex is locked.
func (q *T) updateWriterState(w *writer, overrideBusy bool, isActive bool, consumed int) {
q.mutex.Lock()
if isActive {
if w.isActive == idle || w.isActive == busy && overrideBusy {
q.active[w.priority] = append(q.active[w.priority], w)
w.isActive = active
w.deficit -= consumed
if w.deficit < 0 {
panic("deficit is negative")
}
q.cond.Signal()
}
} else {
if w.isActive == active {
panic("Writer is active when it should not be")
}
if overrideBusy {
w.isActive = idle
}
w.deficit = 0
}
q.mutex.Unlock()
}