// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package drrqueue implements a deficit round-robin buffer queue.
//
//     Efficient Fair Queueing Using Deficit Round-Robin
//     M. Shreedhar and George Varghese
//     IEEE/ACM Transactions on Networking, Vol. 4, No. 3, June 1996
//
// The queue supports N-writers and 1-reader queue.  By "buffer," we mean []byte
// blocks.
//
// Writers have a priority that takes precedence over the deficit.  Writers
// with greater priority are served first.  Deficits are not even updated for
// lower priorities when higher priority Writers are being served.
package drrqueue

// LOCKING DISCIPLINE:
//
// Each Writer has a lock, and so does T.  Locks are always taken in order:
// Writer.mutex first, then T.mutex.  Never take the locks in the opposite
// order.

import (
	"fmt"
	"io"
	"sync"

	"v.io/x/ref/profiles/internal/lib/bqueue"
	"v.io/x/ref/profiles/internal/lib/deque"
	"v.io/x/ref/profiles/internal/lib/iobuf"
	vsync "v.io/x/ref/profiles/internal/lib/sync"
)

// T defines the type of round-robin buffer queues.  The queue has multiple
// input Writer queues that are served according to the deficit round-robin
// policy.
type T struct {
	mutex sync.Mutex
	cond  *sync.Cond

	// active contains an array of active Writers, indexed by Priority.
	active [][]*writer

	// writers contains all of the Writers.
	writers map[bqueue.ID]*writer

	// quantum is the amount of data that each Writer can send per round-robin cycle.
	quantum int

	isClosed bool
}

// Writer is a single bounded input queue supporting a Put operation.
type writer struct {
	id       bqueue.ID
	q        *T
	priority bqueue.Priority

	// free contains the number of free bytes in the writer queue.
	//
	// INVARIANT: free + size == size of the queue (a constant).  This can't be
	// computed, because <free> is a semaphore, but it is true nonetheless.
	free *vsync.Semaphore

	// The following are all protected by the mutex.
	mutex    sync.Mutex
	isClosed bool
	contents deque.T
	size     int // Total number of bytes in the queue.
	released int // Number of bytes that can be dequeued (negative for unlimited).

	// The following are protected by q.mutex.  mutex is not required.
	isActive activeMode
	deficit  int
}

// activeMode has three states:
//   busy: The Writer is being updated by Get.  It is not in the active list.
//   idle: The Writer is inactive.  It is not in the active list.
//   active: The Writer is in the active list.
type activeMode int

const (
	busy activeMode = iota
	idle
	active
)

// ID returns the numeric identifier for the queue.
func (w *writer) ID() bqueue.ID {
	return w.id
}

// Close closes the writer, without discarding the contents.  All Put operations
// currently running may, or may not, add their values to the queue.  All Put
// operations that happen-after the Close will fail.
func (w *writer) Close() {
	w.mutex.Lock()
	w.isClosed = true
	w.updateStateLocked(false, 0)
	w.mutex.Unlock()
	w.free.Close()
}

// Shutdown closes the writer as in Close and also discards the contents.
// If removeWriter is true the writer will be removed from the
// associated T's queue entirely, otherwise the now empty writer will
// remain and eventually be returned by a T.Get.
func (w *writer) Shutdown(removeWriter bool) {
	w.mutex.Lock()

	w.isClosed = true
	if !removeWriter {
		w.contents.Clear()
		w.size = 0
		w.updateStateLocked(false, 0)
	}

	w.mutex.Unlock()

	if removeWriter {
		w.q.removeWriter(w)
	}
	w.free.Close()
}

// IsClosed returns true iff the Writer is closed.
func (w *writer) IsClosed() bool {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	return w.isClosed
}

// IsDrained returns true iff the Writer is closed and empty.
func (w *writer) IsDrained() bool {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	return w.isClosed && w.size == 0
}

// Put adds an element to the queue.  Put blocks until there is space in the
// Writer.  The element is not made available to T.Get until it is released with
// the Release method.  Returns an error if the queue is closed or the operation
// is cancelled.
func (w *writer) Put(buf *iobuf.Slice, cancel <-chan struct{}) error {
	// Block until there is space in the Writer.
	if err := w.free.DecN(uint(buf.Size()), cancel); err != nil {
		return err
	}
	return w.putContents(buf)
}

// TryPut is like Put, but it is nonblocking.
func (w *writer) TryPut(buf *iobuf.Slice) error {
	if err := w.free.TryDecN(uint(buf.Size())); err != nil {
		return err
	}
	return w.putContents(buf)
}

func (w *writer) putContents(buf *iobuf.Slice) error {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	if w.isClosed {
		return bqueue.ErrWriterIsClosed
	}
	w.contents.PushBack(buf)
	w.size += buf.Size()
	w.updateStateLocked(false, 0)
	return nil
}

// Release allows the next <bytes> to be removed from the queue and passed to
// Get.  If <bytes> is negative, all messages are released and flow control is
// no longer used.
func (w *writer) Release(bytes int) error {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	if w.released < 0 && bytes >= 0 {
		return bqueue.ErrCantToggleFlowControl
	}
	if bytes < 0 {
		w.released = -1
	} else {
		w.released += bytes
	}
	w.updateStateLocked(false, 0)
	return nil
}

// getContents returns as much data as possible, up to the deficit, and then
// updates the state.
func (w *writer) getContents(deficit int) ([]*iobuf.Slice, bool) {
	w.mutex.Lock()
	defer w.mutex.Unlock()

	// Collect the contents into bufs
	if w.released >= 0 && deficit > w.released {
		deficit = w.released
	}

	// Writer is closed.
	if w.size == 0 && w.isClosed {
		return nil, true
	}

	var consumed int
	var bufs []*iobuf.Slice
	for w.contents.Size() != 0 {
		b := w.contents.Front().(*iobuf.Slice)
		size := consumed + b.Size()
		if size > deficit {
			break
		}
		consumed = size
		bufs = append(bufs, b)
		w.contents.PopFront()
	}

	// Update counters by number of bytes consumed.
	w.size -= consumed
	// Decrement released, but only if it is nonnegative.
	if w.released >= 0 {
		w.released -= consumed
		if w.released < 0 {
			panic("released is negative")
		}
	}
	w.updateStateLocked(true, consumed)

	return bufs, bufs != nil
}

// updateStateLocked updates the ready state of the Writer.
//
// REQUIRES: w.mutex is locked.
func (w *writer) updateStateLocked(overrideBusy bool, consumed int) {
	w.free.IncN(uint(consumed))

	// The w.isActive state does not depend on the deficit.
	isActive := (w.size == 0 && w.isClosed) ||
		(w.size != 0 && (w.released < 0 || w.contents.Front().(*iobuf.Slice).Size() <= w.released))
	w.q.updateWriterState(w, overrideBusy, isActive, consumed)
}

// New returns a new T.  Each writer is allowed to send quantum bytes per round-robin cycle.
func New(quantum int) bqueue.T {
	q := &T{writers: make(map[bqueue.ID]*writer), quantum: quantum}
	q.cond = sync.NewCond(&q.mutex)
	return q
}

// Close closes the queue.
func (q *T) Close() {
	q.mutex.Lock()
	writers := q.writers
	q.isClosed = true
	q.writers = make(map[bqueue.ID]*writer)
	for i := 0; i != len(q.active); i++ {
		q.active[i] = nil
	}
	q.cond.Signal()
	q.mutex.Unlock()

	// Close the queues outside the q.mutex lock to preserve lock order.
	for _, w := range writers {
		w.Shutdown(true)
	}
}

// NewWriter allocates a new Writer.
func (q *T) NewWriter(id bqueue.ID, p bqueue.Priority, bytes int) (bqueue.Writer, error) {
	w := &writer{
		id:       id,
		priority: p,
		q:        q,
		free:     vsync.NewSemaphore(),
		isActive: idle,
	}
	w.free.IncN(uint(bytes))

	q.mutex.Lock()
	defer q.mutex.Unlock()
	if q.isClosed {
		return nil, bqueue.ErrBQueueIsClosed
	}
	q.addPriorityLocked(p)
	if _, ok := q.writers[w.id]; ok {
		return nil, bqueue.ErrWriterAlreadyExists
	}
	q.writers[w.id] = w
	return w, nil
}

// String provides a string representation of the queue.
func (q *T) String() string {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	s := "q{"
	for _, w := range q.writers {
		s += fmt.Sprintf("Writer{id: %d, size: %d, released: %d}, ", w.id, w.size, w.released)
	}
	s += "}"
	return s
}

// Find returns the queue with the specified ID.
func (q *T) Find(id bqueue.ID) bqueue.Writer {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	w, ok := q.writers[id]
	if !ok {
		// Don't return w; that would return a non-nil Writer interface
		// containing nil.
		return nil
	}
	return w
}

// Get returns the next element from a queue.  Get blocks until a buffer is
// available or the queue is closed.
func (q *T) Get(flush bqueue.FlushFunc) (bqueue.Writer, []*iobuf.Slice, error) {
	for {
		w, deficit, err := q.nextWriter(flush)
		if w == nil {
			return nil, nil, err
		}
		bufs, ok := w.getContents(deficit)
		if ok {
			return w, bufs, nil
		}
	}
}

// nextWriter walks through the pending buffers and returns the first active
// Writer.  The writer is removed from the active queue and made 'busy' so that
// it will not be re-added to the active queue.
func (q *T) nextWriter(flush bqueue.FlushFunc) (*writer, int, error) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	for {
		if q.isClosed {
			return nil, 0, io.EOF
		}
		for p, writers := range q.active {
			if len(writers) != 0 {
				w := writers[0]
				w.isActive = busy
				w.deficit += q.quantum
				q.active[p] = writers[1:]
				return w, w.deficit, nil
			}
		}
		if flush != nil {
			flush()
		}
		q.cond.Wait()
	}
}

// addPriorityLocked adds a ready queue with the specified priority level.
//
// REQUIRES: q.mutex is locked.
func (q *T) addPriorityLocked(p bqueue.Priority) {
	if int(p) >= len(q.active) {
		newActive := make([][]*writer, int(p)+1)
		copy(newActive, q.active)
		q.active = newActive
	}
}

// removeWriter removes the queue from the q.
//
// NOTE: does not require that w.mutex is locked.
func (q *T) removeWriter(w *writer) {
	q.mutex.Lock()
	if w.isActive == active {
		// Remove the writer from the active queue.
		active := q.active[w.priority]
		for i, w2 := range active {
			if w2 == w {
				copy(active[i:], active[i+1:])
				q.active[w.priority] = active[:len(active)-1]
				break
			}
		}
	}
	w.isActive = idle
	delete(q.writers, w.id)
	q.mutex.Unlock()
}

// updateWriterState updates the active state of the queue.
//
// REQUIRES: w.mutex is locked.
func (q *T) updateWriterState(w *writer, overrideBusy bool, isActive bool, consumed int) {
	q.mutex.Lock()
	if isActive {
		if w.isActive == idle || w.isActive == busy && overrideBusy {
			q.active[w.priority] = append(q.active[w.priority], w)
			w.isActive = active
			w.deficit -= consumed
			if w.deficit < 0 {
				panic("deficit is negative")
			}
			q.cond.Signal()
		}
	} else {
		if w.isActive == active {
			panic("Writer is active when it should not be")
		}
		if overrideBusy {
			w.isActive = idle
		}
		w.deficit = 0
	}
	q.mutex.Unlock()
}
