blob: ab428cf599c02bf38e78b20b13aa37a82756bb0c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package bqueue implements several kinds of buffer queues, as a N-writer,
// 1-reader queue. By "buffer," we mean iobuf.Slice values. Each writer has a
// separate bounded queue to which it writes buffers. The queue also supports
// flow control.
//
// Initialization:
//
// // Create a new queue using one of the implementations
// // (currently only bqueue/drrqueue).
// q := drrqueue.New()
//
// Reader API:
//
// // Returns the next buffer in the queue, blocking until there is one
// // available. Returns with an error if <q> is closed:
// _, buf, err := q.Get()
//
// Writer API:
//
// // Allocate a new Writer with the id, priority, and space for N elements.
// w := q.New(id, priority, N)
//
// // Add <buf> to the <w>. Blocks until there is space in the Writer.
// // Aborts if <cancel> is closed or contains a value.
// err := w.Put(buf, cancel)
//
// w.Release(N) // Make the next N buffers available to q.Get().
//
// The q.Release() method is used for rate limiting. Buffers can be added with
// q.Put(), but they are not passed to q.Get() until they are released.
package bqueue
import (
"errors"
"v.io/x/ref/runtime/internal/lib/iobuf"
)
// Priority is an integer priority. Smaller is greater priority.
//
// For performance, priorities should be dense and start from 0. Some
// implementations like drrqueue have use space linear in the max priority.
type Priority uint // TODO(jyh): Change the dense requirement if we need it.
// ID is the type of Writer identifiers.
type ID int64
// FlushFunc is the type of flushing functions. See T.Get for more info.
type FlushFunc func() error
// T specifies a buffer queue. The NewWriter method is used to create new
// writer queues, and the Get method returns the next buffer to be served.
type T interface {
Close()
String() string
// Find returns the Writer with the specified ID. Returns nil if there is
// no such writer.
Find(id ID) Writer
// Get returns the next contents of the queue. Get returns a Writer and an
// array of elements dequeued from the Writer. The number of elements
// returned depends on the implementation (for example, drrqueue specifies a
// cap on how many bytes can be dequeued per Writer per round-robin cycle).
// In addition, multiple elements are returned so that iobuf.Coalesce() can
// be used to coalesce the contents.
//
// Get blocks until at least one element can be returned or the queue is
// closed. If non-nil, the <flush> function is called just before Get
// blocks.
//
// If a Writer is closed (the Writer's Close() method was called), then Get
// returns the Writer with empty contents. The caller should call
// writer.Shutdown() to remove the Writer and prevent it from being returned
// in subsequent calls.
//
// It is not safe to call Get() concurrently.
Get(flush FlushFunc) (Writer, []*iobuf.Slice, error)
// NewWriter allocates a new Writer.
NewWriter(id ID, p Priority, n int) (Writer, error)
}
// Writer represents a single writer queue. Writer queues are served
// according to the policy defined by the container queue T.
type Writer interface {
ID() ID
// Close closes the Writer, without discarding the contents. All Put
// operations currently running may, or may not, add their values to the
// Writer. All Put operations that happen-after the Close will fail.
Close()
// Shutdown closes the Writer as in Close and also discards the contents.
// If removeWriter is true the writer will be removed from the
// associated T's queue entirely, otherwise the now empty writer will
// remain and eventually be returned by a T.Get.
Shutdown(removeWriter bool)
// IsClosed returns true if the Writer is closed.
IsClosed() bool
// IsDrained returns true if the Writer is closed and has no data
IsDrained() bool
// Put adds an element to the queue. Put blocks until there is space in
// the Writer. The element is not made available to T.Get until it is
// released with the Release method. Returns an error if the queue is
// closed or the operation is cancelled.
Put(buf *iobuf.Slice, cancel <-chan struct{}) error
// TryPut is like Put, but it is nonblocking.
TryPut(buf *iobuf.Slice) error
// Release allows the next <n> elements to be removed from the Writer and
// passed to Get. If <n> is negative, all messages are released and flow
// control is no longer used.
Release(n int) error
}
var (
ErrBQueueIsClosed = errors.New("bqueue: queue is closed")
ErrWriterAlreadyExists = errors.New("bqueue: writer already exists with this identifier")
ErrWriterIsClosed = errors.New("bqueue: writer is closed")
ErrCantToggleFlowControl = errors.New("bqueue: can't turn on flow control when it is off")
ErrCancelled = errors.New("bqueue: operation was canceled")
ErrTryAgain = errors.New("bqueue: writer is not ready, try again")
)