blob: fe19778cad36734b6eda1cac58d705d30d01d7b6 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package upcqueue implements an unbounded producer/consumer queue. An
// unbounded producer/consumer queue is a concurrent buffer supporting multiple
// concurrent producers and consumers, with timeouts. The queue can be closed
// from either end, by the producer and/or the consumer. When closed, the
// contents are discarded, and subsequent operations return an error.
//
// Note: the main reason to use a producer/consumer queue instead of a channel
// is to allow the consumer to close the channel. This queue can be used for
// many-to-many communication with multiple producers and/or multiple consumers.
// Any of the producers and any of the consumers are allowed to close the
// queue.
package upcqueue
import (
"errors"
"sync"
"v.io/x/ref/runtime/internal/lib/deque"
vsync "v.io/x/ref/runtime/internal/lib/sync"
)
var (
ErrQueueIsClosed = errors.New("queue is closed")
)
// T is an unbounded producer/consumer queue. It fulfills the same purpose as a
// Go channel, the main advantage is that the Put() operation does not panic,
// even after the queue is closed. The main disadvantage is that the T can't be
// used in a select operation.
type T struct {
mutex sync.Mutex
contents deque.T // GUARDED_BY(mutex)
isClosed bool // GUARDED_BY(mutex)
// Number of available elements.
avail *vsync.Semaphore
}
// New returns a producer/consumer queue.
func New() *T {
return &T{avail: vsync.NewSemaphore()}
}
// Put adds an item to the queue, or returns an error if the queue is closed.
func (q *T) Put(item interface{}) error {
q.mutex.Lock()
if q.isClosed {
q.mutex.Unlock()
return ErrQueueIsClosed
}
q.contents.PushBack(item)
q.mutex.Unlock()
q.avail.Inc()
return nil
}
// Get returns the next item from the queue, or an error if the queue is closed
// or the operation is cancelled.
func (q *T) Get(cancel <-chan struct{}) (interface{}, error) {
err := q.avail.Dec(cancel)
if err != nil {
if err == vsync.ErrClosed {
err = ErrQueueIsClosed
}
return nil, err
}
return q.getContents()
}
// TryGet returns the next item from the queue if there is one.
func (q *T) TryGet() (interface{}, error) {
err := q.avail.TryDec()
if err != nil {
if err == vsync.ErrClosed {
err = ErrQueueIsClosed
}
return nil, err
}
return q.getContents()
}
func (q *T) getContents() (interface{}, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.contents.Size() == 0 {
if q.isClosed {
return nil, ErrQueueIsClosed
}
return nil, nil
}
item := q.contents.PopFront()
return item, nil
}
// Close closes the queue, without discarding the contents. All Put* operations
// currently running may, or may not, add their values to the queue. All Put*
// operations that happen-after the Close will fail.
func (q *T) Close() {
q.mutex.Lock()
q.isClosed = true
q.mutex.Unlock()
q.avail.Close()
}
// Shutdown closes the queue and returns the contents. Any concurrent Get
// and Put operations might exchange values, but all operations that
// happen-after the Shutdown will fail.
func (q *T) Shutdown() []interface{} {
q.mutex.Lock()
q.isClosed = true
var contents []interface{}
for {
item := q.contents.PopFront()
if item == nil {
break
}
contents = append(contents, item)
}
q.mutex.Unlock()
q.avail.Close()
return contents
}
// IsClosed returns whether the queue has been closed (with Close or Shutdown).
func (q *T) IsClosed() bool {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.isClosed
}