blob: ce42db5a35b98f0f433db577a6cb76d61c879c74 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package flowcontrol
import (
var testdata = make([]byte, 1<<20)
func init() {
_, err := io.ReadFull(rand.Reader, testdata)
if err != nil {
func TestFlowControl(t *testing.T) {
const (
workers = 10
messages = 10
msgs := make(map[int][]byte)
fc := New(256, 64)
ctx, cancel := context.RootContext()
defer cancel()
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
go func(idx int) {
el := fc.NewWorker(0)
go el.Release(ctx, messages*5) // Try to make races happen
j := 0
el.Run(ctx, func(tokens int) (used int, done bool, err error) {
msgs[idx] = append(msgs[idx], []byte(fmt.Sprintf("%d-%d,", idx, j))...)
return 3, j >= messages, nil
for i := 0; i < workers; i++ {
buf := &bytes.Buffer{}
for j := 0; j < messages; j++ {
fmt.Fprintf(buf, "%d-%d,", i, j)
if want, got := buf.String(), string(msgs[i]); want != got {
t.Errorf("Got %s, want %s for %d", got, want, i)
func expect(t *testing.T, work chan interface{}, values ...interface{}) {
for i, w := range values {
if got := <-work; got != w {
t.Errorf("expected %p in pos %d got %p", w, i, got)
func TestOrdering(t *testing.T) {
const mtu = 10
ctx, cancel := context.RootContext()
defer cancel()
fc := New(0, mtu)
work := make(chan interface{})
worker := func(p int) *Worker {
w := fc.NewWorker(p)
go w.Run(ctx, func(t int) (int, bool, error) {
work <- w
return t, false, nil
w.Release(ctx, mtu)
return w
w0 := worker(0)
w1a := worker(1)
w1b := worker(1)
w1c := worker(1)
w2 := worker(2)
// Release to all the flows at once and ensure the writes
// happen in the correct order.
fc.Release(ctx, []Release{{w0, 2 * mtu}, {w1a, 2 * mtu}, {w1b, 3 * mtu}, {w1c, 0}, {w2, mtu}})
expect(t, work, w0, w0, w1a, w1b, w1a, w1b, w1b, w2)
func TestSharedCounters(t *testing.T) {
const (
mtu = 10
shared = 2 * mtu
ctx, cancel := context.RootContext()
defer cancel()
fc := New(shared, mtu)
work := make(chan interface{})
worker := func(p int) *Worker {
w := fc.NewWorker(p)
go w.Run(ctx, func(t int) (int, bool, error) {
work <- w
return t, false, nil
return w
// w0 should run twice on shared counters.
w0 := worker(0)
expect(t, work, w0, w0)
w1 := worker(1)
// Now Release to w0 which shouldn't allow it to run since it's just repaying, but
// should allow w1 to run on the returned shared counters.
w0.Release(ctx, 2*mtu)
expect(t, work, w1, w1)
// Releasing again will allow w0 to run.
w0.Release(ctx, mtu)
expect(t, work, w0)
func TestConcurrentRun(t *testing.T) {
ctx, cancel := context.RootContext()
defer cancel()
const mtu = 10
fc := New(mtu, mtu)
ready, wait := make(chan struct{}), make(chan struct{})
w := fc.NewWorker(0)
go w.Run(ctx, func(t int) (int, bool, error) {
return t, true, nil
if err := w.Run(ctx, nil); verror.ErrorID(err) != ErrConcurrentRun.ID {
t.Errorf("expected concurrent run error got: %v", err)
func TestNonFlowControlledRun(t *testing.T) {
ctx, cancel := context.RootContext()
defer cancel()
const mtu = 10
fc := New(0, mtu)
work := make(chan interface{})
ready, wait := make(chan struct{}), make(chan struct{})
// Start one worker running
go fc.Run(ctx, 0, func(t int) (int, bool, error) {
return t, true, nil
// Now queue up sever workers and make sure they execute in order.
go fc.Run(ctx, 2, func(t int) (int, bool, error) {
work <- "c"
return t, true, nil
go fc.Run(ctx, 1, func(t int) (int, bool, error) {
work <- "b"
return t, true, nil
go fc.Run(ctx, 0, func(t int) (int, bool, error) {
work <- "a"
return t, true, nil
for fc.numActive() < 4 {
expect(t, work, "a", "b", "c")
func newNullConn(mtu int) net.Conn {
ln, err := net.Listen("tcp", ":0")
if err != nil {
addr := ln.Addr()
go func() {
conn, err := ln.Accept()
if err != nil {
buf := make([]byte, mtu)
for {
_, err := conn.Read(buf)
if err == io.EOF {
if err != nil {
conn, err := net.Dial(addr.Network(), addr.String())
if err != nil {
return conn
func BenchmarkWithFlowControl(b *testing.B) {
const (
mtu = 1 << 16
shared = 1 << 20
workers = 100
ctx, cancel := context.RootContext()
defer cancel()
s := newNullConn(mtu)
for n := 0; n < b.N; n++ {
fc := New(shared, mtu)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
go func(idx int) {
w := fc.NewWorker(0)
w.Release(ctx, len(testdata))
t := testdata
err := w.Run(ctx, func(tokens int) (used int, done bool, err error) {
towrite := min(tokens, len(t))
written, err := s.Write(t[:min(tokens, len(t))])
t = t[written:]
return towrite, len(t) == 0, err
if err != nil {
if err := s.Close(); err != nil {
func BenchmarkWithoutFlowControl(b *testing.B) {
const (
workers = 100
mtu = 1 << 16
s := newNullConn(mtu)
for n := 0; n < b.N; n++ {
for cursor := 0; cursor < len(testdata); cursor += mtu {
for i := 0; i < workers; i++ {
_, err := s.Write(testdata[cursor : cursor+mtu])
if err != nil {
if err := s.Close(); err != nil {