blob: 0ffa4c9c6e48b5318141620af8b9b8c341c5b58a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package upcqueue
import (
"sync"
"sync/atomic"
"testing"
"time"
"v.io/x/ref/internal/logger"
vsync "v.io/x/ref/runtime/internal/lib/sync"
)
const (
elementCount = 100
writerCount = 10
readerCount = 10
)
// Test normal Put()/Get() combination.
func TestSimplePut(t *testing.T) {
queue := New()
done := make(chan struct{}, 1)
go func() {
queue.Put(1)
done <- struct{}{}
}()
<-done
item, err := queue.Get(nil)
if err != nil {
t.Errorf("Get: %v", err)
}
if item.(int) != 1 {
t.Errorf("Expected 1, actual=%v", item)
}
}
// Test normal Put()/Get() combination.
func TestSimpleGet(t *testing.T) {
queue := New()
done := make(chan struct{}, 1)
go func() {
item, err := queue.Get(nil)
if err != nil {
t.Errorf("Get: %v", item)
}
if item.(int) != 1 {
t.Errorf("Expected 1, actual=%v", item)
}
done <- struct{}{}
}()
select {
case <-done:
t.Errorf("Unexpected completion")
default:
}
queue.Put(1)
<-done
}
// Test normal queue operation with a single producer and single consumer.
func TestSequential(t *testing.T) {
queue := New()
done := make(chan struct{}, 1)
cancel := make(chan struct{})
// Check that the queue elements are sequentially increasing ints.
logger.Global().VI(1).Infof("Start consumer")
go func() {
for i := 0; i != elementCount; i++ {
item, err := queue.Get(cancel)
if err != nil {
t.Errorf("Get: %v", err)
}
if item == nil {
break
}
j := item.(int)
if j != i {
t.Errorf("Expected %d, actual %d", i, j)
}
}
done <- struct{}{}
}()
// Generate the sequential ints.
logger.Global().VI(1).Infof("Put values")
for i := 0; i != elementCount; i++ {
queue.Put(i)
}
// Wait for the consumer.
logger.Global().VI(1).Infof("Waiting for consumer")
<-done
// Any subsequent read should timeout.
logger.Global().VI(1).Infof("Start consumer")
go func() {
_, err := queue.Get(cancel)
if err != vsync.ErrCanceled {
t.Errorf("Expected timeout: %v", err)
}
logger.Global().VI(1).Infof("Consumer done")
done <- struct{}{}
}()
logger.Global().VI(1).Infof("Sleep a little")
time.Sleep(100 * time.Millisecond)
select {
case <-done:
t.Errorf("Unexpected completion")
default:
}
logger.Global().VI(1).Infof("Cancel")
close(cancel)
logger.Global().VI(1).Infof("Wait for consumer")
<-done
}
// Test that Get() returns an error when the queue is closed.
func TestSequentialClose(t *testing.T) {
logger.Global().VI(1).Infof("Put")
queue := New()
err := queue.Put(0)
if err != nil {
t.Errorf("Put: %v", err)
}
logger.Global().VI(1).Infof("Close")
queue.Close()
// Check that Get() returns the element.
logger.Global().VI(1).Infof("Get")
item, err := queue.Get(nil)
if err != nil {
t.Errorf("Get: %v", err)
}
if item.(int) != 0 {
t.Errorf("Unexpected value: %v", item)
}
// Check that Get() returns an error.
logger.Global().VI(1).Infof("Get")
_, err = queue.Get(nil)
if err != ErrQueueIsClosed {
t.Errorf("Expected queue to be closed: %v", err)
}
// Check that Put() returns an error.
logger.Global().VI(1).Infof("Put")
err = queue.Put(0)
if err != ErrQueueIsClosed {
t.Errorf("Expected queue to be closed: %v", err)
}
}
// Test that concurrent Puts() may add values to the queue.
func TestConcurrentClose(t *testing.T) {
queue := New()
pending := &sync.WaitGroup{}
pending.Add(2 * writerCount)
for i := 0; i != writerCount; i++ {
go func() {
err := queue.Put(1)
if err != nil {
logger.Global().VI(1).Infof("Put: %v", err)
}
pending.Done()
}()
}
time.Sleep(100 * time.Millisecond)
queue.Close()
for i := 0; i != writerCount; i++ {
go func() {
err := queue.Put(2)
if err == nil {
t.Errorf("Expected error")
}
pending.Done()
}()
}
readers := 0
for {
item, err := queue.Get(nil)
if err != nil {
break
}
if item.(int) != 1 {
t.Errorf("Expected 1, actual=%v", item)
}
readers++
}
logger.Global().VI(1).Infof("%d operations completed", readers)
if readers > writerCount {
t.Errorf("Too many readers")
}
pending.Wait()
}
// Test that Get() returns an error when the queue is shut down.
func TestSequentialShutdown(t *testing.T) {
queue := New()
logger.Global().VI(1).Infof("Put")
err := queue.Put(0)
if err != nil {
t.Errorf("Put: %v", err)
}
logger.Global().VI(1).Infof("Shutdown")
queue.Shutdown()
// Check that Get() returns an error.
logger.Global().VI(1).Infof("Get")
_, err = queue.Get(nil)
if err != ErrQueueIsClosed {
t.Errorf("Expected queue to be closed: %v", err)
}
// Check that Put() returns an error.
logger.Global().VI(1).Infof("Put")
err = queue.Put(0)
if err != ErrQueueIsClosed {
t.Errorf("Expected queue to be closed: %v", err)
}
}
// Test with concurrent producers, but a single consumer.
func TestConcurrentPutNoTimeouts(t *testing.T) {
queue := New()
pending := &sync.WaitGroup{}
// Generate the sequential ints.
for i := 0; i != writerCount; i++ {
pending.Add(1)
go func() {
for j := 0; j != elementCount; j++ {
queue.Put(j)
}
pending.Done()
}()
}
// Sum up the results and compare.
sum := 0
for i := 0; i != writerCount*elementCount; i++ {
item, err := queue.Get(nil)
if err != nil {
t.Errorf("Get: %v", err)
}
if item == nil {
break
}
sum += item.(int)
}
expected := writerCount * elementCount * (elementCount - 1) / 2
if sum != expected {
t.Errorf("Expected sum %d, received %d", expected, sum)
}
pending.Wait()
}
// Test with concurrent consumers and concurrent producers.
func TestConcurrentGet(t *testing.T) {
queue := New()
done := make(chan struct{})
pending := &sync.WaitGroup{}
pending.Add(readerCount + writerCount)
cancel := make(chan struct{})
// Sum up the results and compare.
sum := uint32(0)
count := uint32(0)
logger.Global().VI(1).Infof("Start consumers")
for i := 0; i != readerCount; i++ {
pid := i
go func() {
for {
c := atomic.LoadUint32(&count)
if c == writerCount*elementCount {
break
}
// The timeout is required for termination.
item, err := queue.Get(cancel)
if err != nil {
continue
}
atomic.AddUint32(&sum, uint32(item.(int)))
atomic.AddUint32(&count, 1)
}
logger.Global().VI(1).Infof("Consumer %d done", pid)
pending.Done()
}()
}
// Generate the sequential ints.
logger.Global().VI(1).Infof("Start producers")
for i := 0; i != writerCount; i++ {
pid := i
go func() {
for j := 0; j != elementCount; j++ {
err := queue.Put(j)
if err != nil {
t.Errorf("Put: %v", err)
}
}
logger.Global().VI(1).Infof("Producer %d done", pid)
pending.Done()
}()
}
logger.Global().VI(1).Infof("Start termination checker")
go func() {
pending.Wait()
done <- struct{}{}
}()
logger.Global().VI(1).Infof("Wait for processes")
stop := false
for !stop {
time.Sleep(100 * time.Millisecond)
select {
case <-done:
stop = true
default:
cancel <- struct{}{}
}
}
logger.Global().VI(1).Infof("Checking the sum")
expected := writerCount * elementCount * (elementCount - 1) / 2
s := atomic.LoadUint32(&sum)
if s != uint32(expected) {
t.Errorf("Expected sum %d, received %d", expected, sum)
}
}