ref/runtime/internal/flow: Add a flowcontroller.

The flow controller's job is to schedule threads keeping in mind
flow control counters, MTUs, and fairness.

The flow controller does impose some overhead as shown
by a simple benchmark:
BenchmarkWithFlowControl-12          300          57130246 ns/op
BenchmarkWithoutFlowControl-12       500          31617551 ns/op

Change-Id: Ieda29148928c8a6b4c51ebde72543b2f51b9c970
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
new file mode 100644
index 0000000..1a47ada
--- /dev/null
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowcontrol
+
+import (
+	"bytes"
+	"fmt"
+	"sync"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/runtime/internal/flow/flowcontrol"
+
+var ErrConcurrentRun = verror.Register(
+	verror.ID(pkgPath+".ErrConcurrentRun"),
+	verror.NoRetry, "Run called concurrently.")
+var ErrWrongFlowController = verror.Register(
+	verror.ID(pkgPath+".ErrWrongFlowController"),
+	verror.NoRetry, "Release called for worker from different flow controller.")
+
+// Runners are called by Workers.  For a given flow controller
+// only one Runner will be running at a time.  tokens specifies
+// the number of tokens available for this call.  Implementors
+// should return the number of tokens used, whether they are done
+// with all their work, and any error encountered.
+// Runners will be called repeatedly within a single Run call until
+// either err != nil or done is true.
+type Runner func(tokens int) (used int, done bool, err error)
+
+type counterState struct {
+	// TODO(mattr): Add deficit if we allow multi-slice writes.
+	borrowed     int  // Number of tokens borrowed from the shared pool.
+	released     int  // Number of tokens available via our flow control counters.
+	everReleased bool // True if tokens have ever been released to this worker.
+}
+
+type state int
+
+const (
+	idle = state(iota)
+	running
+	active
+)
+
+// Worker represents a single flowcontrolled worker.
+// Workers keep track of flow control counters to ensure
+// producers do not overwhelm consumers.  Only one Worker
+// will be executing at a time.
+type Worker struct {
+	fc       *FlowController
+	priority int
+	work     chan struct{}
+
+	// These variables are protected by fc.mu.
+	counters   *counterState // State related to the flow control counters.
+	state      state
+	next, prev *Worker // Used as a list when in an active queue.
+}
+
+// Run runs r potentially multiple times.
+// Only one worker's r function will run at a time for a given FlowController.
+// A single worker's Run function should not be called concurrently from multiple
+// goroutines.
+func (w *Worker) Run(ctx *context.T, r Runner) (err error) {
+	w.fc.mu.Lock()
+	if w.state != idle {
+		w.fc.mu.Unlock()
+		return verror.New(ErrConcurrentRun, ctx)
+	}
+
+	w.state = running
+	if w.readyLocked() {
+		w.fc.activateLocked(w)
+		w.state = active
+	}
+
+	for {
+		next := w.fc.nextWorkerLocked()
+		for w.fc.writing != w && err == nil {
+			w.fc.mu.Unlock()
+			if next != nil {
+				next.work <- struct{}{}
+			}
+			select {
+			case <-ctx.Done():
+				err = ctx.Err()
+			case <-w.work:
+			}
+			w.fc.mu.Lock()
+		}
+		if err != nil {
+			break
+		}
+
+		toWrite := w.fc.mtu
+		if w.counters != nil {
+			if !w.counters.everReleased {
+				toWrite = min(w.fc.shared, w.fc.mtu)
+				w.counters.released += toWrite
+				w.counters.borrowed += toWrite
+				w.fc.shared -= toWrite
+			} else {
+				toWrite = min(w.counters.released, w.fc.mtu)
+			}
+		}
+
+		w.fc.mu.Unlock()
+		var written int
+		var done bool
+		written, done, err = r(toWrite)
+		w.fc.mu.Lock()
+
+		if w.counters != nil {
+			w.counters.released -= written
+			if w.counters.released > 0 && w.counters.borrowed > 0 {
+				toReturn := min(w.counters.released, w.counters.borrowed)
+				w.counters.borrowed -= toReturn
+				w.counters.released -= toReturn
+				w.fc.shared += toReturn
+			}
+		}
+
+		w.fc.writing = nil
+		if err != nil || done {
+			break
+		}
+		if !w.readyLocked() {
+			w.fc.deactivateLocked(w)
+			w.state = running
+		}
+	}
+
+	w.state = idle
+	w.fc.deactivateLocked(w)
+	next := w.fc.nextWorkerLocked()
+	w.fc.mu.Unlock()
+	if next != nil {
+		next.work <- struct{}{}
+	}
+	return err
+}
+
+func (w *Worker) releaseLocked(tokens int) {
+	if w.counters == nil {
+		return
+	}
+	w.counters.everReleased = true
+	if w.counters.borrowed > 0 {
+		n := min(w.counters.borrowed, tokens)
+		w.counters.borrowed -= n
+		w.fc.shared += n
+		tokens -= n
+	}
+	w.counters.released += tokens
+	if w.state == running && w.readyLocked() {
+		w.fc.activateLocked(w)
+	}
+}
+
+// Release releases tokens to this worker.
+// Workers will first repay any debts to the flow controllers shared pool
+// and use any surplus in subsequent calls to Run.
+func (w *Worker) Release(tokens int) {
+	w.fc.mu.Lock()
+	w.releaseLocked(tokens)
+	next := w.fc.nextWorkerLocked()
+	w.fc.mu.Unlock()
+	if next != nil {
+		next.work <- struct{}{}
+	}
+}
+
+func (w *Worker) readyLocked() bool {
+	if w.counters == nil {
+		return true
+	}
+	return w.counters.released > 0 || (!w.counters.everReleased && w.fc.shared > 0)
+}
+
+// FlowController manages multiple Workers to ensure only one runs at a time.
+// The workers also obey counters so that producers don't overwhelm consumers.
+type FlowController struct {
+	mtu int
+
+	mu      sync.Mutex
+	shared  int
+	active  []*Worker
+	writing *Worker
+}
+
+// New creates a new FlowController.  Shared is the number of shared tokens
+// that flows can borrow from before they receive their first Release.
+// Mtu is the maximum number of tokens to be consumed by a single Runner
+// invocation.
+func New(shared, mtu int) *FlowController {
+	return &FlowController{shared: shared, mtu: mtu}
+}
+
+// NewWorker creates a new worker.  Workers keep track of token counters
+// for a flow controlled process.  The order that workers
+// execute is controlled by priority.  Higher priority
+// workers that are ready will run before any lower priority
+// workers.
+func (fc *FlowController) NewWorker(priority int) *Worker {
+	w := &Worker{
+		fc:       fc,
+		priority: priority,
+		work:     make(chan struct{}),
+		counters: &counterState{},
+	}
+	w.next, w.prev = w, w
+	return w
+}
+
+type Release struct {
+	Worker *Worker
+	Tokens int
+}
+
+// Release releases to many Workers atomically.  It is conceptually
+// the same as calling release on each worker indepedently.
+func (fc *FlowController) Release(to []Release) error {
+	fc.mu.Lock()
+	for _, t := range to {
+		if t.Worker.fc != fc {
+			return verror.New(ErrWrongFlowController, nil)
+		}
+		t.Worker.releaseLocked(t.Tokens)
+	}
+	next := fc.nextWorkerLocked()
+	fc.mu.Unlock()
+	if next != nil {
+		next.work <- struct{}{}
+	}
+	return nil
+}
+
+// Run runs the given runner on a non-flow controlled Worker.  This
+// worker does not wait for any flow control tokens and is limited
+// only by the MTU.
+func (fc *FlowController) Run(ctx *context.T, p int, r Runner) error {
+	w := &Worker{
+		fc:       fc,
+		priority: p,
+		work:     make(chan struct{}),
+	}
+	w.next, w.prev = w, w
+	return w.Run(ctx, r)
+}
+
+func (fc *FlowController) nextWorkerLocked() *Worker {
+	if fc.writing == nil {
+		for p, head := range fc.active {
+			if head != nil {
+				fc.active[p] = head.next
+				fc.writing = head
+				return head
+			}
+		}
+	}
+	return nil
+}
+
+func (fc *FlowController) activateLocked(w *Worker) {
+	if w.priority >= len(fc.active) {
+		newActive := make([]*Worker, int(w.priority)+1)
+		copy(newActive, fc.active)
+		fc.active = newActive
+	}
+	head := fc.active[w.priority]
+	if head == nil {
+		fc.active[w.priority] = w
+	} else {
+		w.prev, w.next = head.prev, head
+		w.prev.next, w.next.prev = w, w
+	}
+}
+
+func (fc *FlowController) deactivateLocked(w *Worker) {
+	if head := fc.active[w.priority]; head == w {
+		if w.next == w {
+			fc.active[w.priority] = nil
+		} else {
+			fc.active[w.priority] = w.next
+		}
+	}
+	w.next.prev, w.prev.next = w.prev, w.next
+	w.next, w.prev = w, w
+}
+
+func (fc *FlowController) numActive() int {
+	n := 0
+	fc.mu.Lock()
+	for _, head := range fc.active {
+		if head != nil {
+			n++
+			for cur := head.next; cur != head; cur = cur.next {
+				n++
+			}
+		}
+	}
+	fc.mu.Unlock()
+	return n
+}
+
+// String writes a string representation of the flow controller.
+// This can be helpful in debugging.
+func (fc *FlowController) String() string {
+	buf := &bytes.Buffer{}
+	fmt.Fprintf(buf, "FlowController %p: \n", fc)
+
+	fc.mu.Lock()
+	fmt.Fprintf(buf, "writing: %p\n", fc.writing)
+	fmt.Fprintln(buf, "active:")
+	for p, head := range fc.active {
+		fmt.Fprintf(buf, "  %v: %p", p, head)
+		if head != nil {
+			for cur := head.next; cur != head; cur = cur.next {
+				fmt.Fprintf(buf, " %p", cur)
+			}
+		}
+		fmt.Fprintln(buf, "")
+	}
+	fc.mu.Unlock()
+	return buf.String()
+}
+
+func min(head int, rest ...int) int {
+	for _, r := range rest {
+		if r < head {
+			head = r
+		}
+	}
+	return head
+}
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol_test.go b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
new file mode 100644
index 0000000..7388b8f
--- /dev/null
+++ b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
@@ -0,0 +1,293 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowcontrol
+
+import (
+	"bytes"
+	"crypto/rand"
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+	"v.io/x/ref/test"
+)
+
+var testdata = make([]byte, 1<<20)
+
+func init() {
+	test.Init()
+	_, err := io.ReadFull(rand.Reader, testdata)
+	if err != nil {
+		panic(err)
+	}
+}
+
+func TestFlowControl(t *testing.T) {
+	const (
+		workers  = 10
+		messages = 10
+	)
+
+	msgs := make(map[int][]byte)
+	fc := New(256, 64)
+
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	var wg sync.WaitGroup
+	wg.Add(workers)
+	for i := 0; i < workers; i++ {
+		go func(idx int) {
+			el := fc.NewWorker(0)
+			go el.Release(messages * 5) // Try to make races happen
+			j := 0
+			el.Run(ctx, func(tokens int) (used int, done bool, err error) {
+				msgs[idx] = append(msgs[idx], []byte(fmt.Sprintf("%d-%d,", idx, j))...)
+				j++
+				return 3, j >= messages, nil
+			})
+			wg.Done()
+		}(i)
+	}
+	wg.Wait()
+
+	for i := 0; i < workers; i++ {
+		buf := &bytes.Buffer{}
+		for j := 0; j < messages; j++ {
+			fmt.Fprintf(buf, "%d-%d,", i, j)
+		}
+		if want, got := buf.String(), string(msgs[i]); want != got {
+			t.Errorf("Got %s, want %s for %d", got, want, i)
+		}
+	}
+}
+
+func expect(t *testing.T, work chan interface{}, values ...interface{}) {
+	for i, w := range values {
+		if got := <-work; got != w {
+			t.Errorf("expected %p in pos %d got %p", w, i, got)
+		}
+	}
+}
+
+func TestOrdering(t *testing.T) {
+	const mtu = 10
+
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	fc := New(0, mtu)
+
+	work := make(chan interface{})
+	worker := func(p int) *Worker {
+		w := fc.NewWorker(p)
+		go w.Run(ctx, func(t int) (int, bool, error) {
+			work <- w
+			return t, false, nil
+		})
+		w.Release(mtu)
+		<-work
+		return w
+	}
+
+	w0 := worker(0)
+	w1a := worker(1)
+	w1b := worker(1)
+	w1c := worker(1)
+	w2 := worker(2)
+
+	// Release to all the flows at once and ensure the writes
+	// happen in the correct order.
+	fc.Release([]Release{{w0, 2 * mtu}, {w1a, 2 * mtu}, {w1b, 3 * mtu}, {w1c, 0}, {w2, mtu}})
+	expect(t, work, w0, w0, w1a, w1b, w1a, w1b, w1b, w2)
+}
+
+func TestSharedCounters(t *testing.T) {
+	const (
+		mtu    = 10
+		shared = 2 * mtu
+	)
+
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	fc := New(shared, mtu)
+
+	work := make(chan interface{})
+
+	worker := func(p int) *Worker {
+		w := fc.NewWorker(p)
+		go w.Run(ctx, func(t int) (int, bool, error) {
+			work <- w
+			return t, false, nil
+		})
+		return w
+	}
+
+	// w0 should run twice on shared counters.
+	w0 := worker(0)
+	expect(t, work, w0, w0)
+
+	w1 := worker(1)
+	// Now Release to w0 which shouldn't allow it to run since it's just repaying, but
+	// should allow w1 to run on the returned shared counters.
+	w0.Release(2 * mtu)
+	expect(t, work, w1, w1)
+
+	// Releasing again will allow w0 to run.
+	w0.Release(mtu)
+	expect(t, work, w0)
+}
+
+func TestConcurrentRun(t *testing.T) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	const mtu = 10
+	fc := New(mtu, mtu)
+
+	ready, wait := make(chan struct{}), make(chan struct{})
+	w := fc.NewWorker(0)
+	go w.Run(ctx, func(t int) (int, bool, error) {
+		close(ready)
+		<-wait
+		return t, true, nil
+	})
+	<-ready
+	if err := w.Run(ctx, nil); verror.ErrorID(err) != ErrConcurrentRun.ID {
+		t.Errorf("expected concurrent run error got: %v", err)
+	}
+	close(wait)
+}
+
+func TestNonFlowControlledRun(t *testing.T) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	const mtu = 10
+	fc := New(0, mtu)
+
+	work := make(chan interface{})
+	ready, wait := make(chan struct{}), make(chan struct{})
+	// Start one worker running
+	go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+		close(ready)
+		<-wait
+		return t, true, nil
+	})
+	<-ready
+	// Now queue up sever workers and make sure they execute in order.
+	go fc.Run(ctx, 2, func(t int) (int, bool, error) {
+		work <- "c"
+		return t, true, nil
+	})
+	go fc.Run(ctx, 1, func(t int) (int, bool, error) {
+		work <- "b"
+		return t, true, nil
+	})
+	go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+		work <- "a"
+		return t, true, nil
+	})
+	for fc.numActive() < 4 {
+		time.Sleep(time.Millisecond)
+	}
+	close(wait)
+	expect(t, work, "a", "b", "c")
+}
+
+func newNullConn(mtu int) net.Conn {
+	ln, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(err)
+	}
+	addr := ln.Addr()
+
+	go func() {
+		conn, err := ln.Accept()
+		if err != nil {
+			panic(err)
+		}
+		ln.Close()
+		buf := make([]byte, mtu)
+		for {
+			_, err := conn.Read(buf)
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				panic(err)
+			}
+		}
+		conn.Close()
+	}()
+
+	conn, err := net.Dial(addr.Network(), addr.String())
+	if err != nil {
+		panic(err)
+	}
+	return conn
+}
+
+func BenchmarkWithFlowControl(b *testing.B) {
+	const (
+		mtu     = 1 << 16
+		shared  = 1 << 20
+		workers = 100
+	)
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	s := newNullConn(mtu)
+
+	for n := 0; n < b.N; n++ {
+		fc := New(shared, mtu)
+		var wg sync.WaitGroup
+		wg.Add(workers)
+		for i := 0; i < workers; i++ {
+			go func(idx int) {
+				w := fc.NewWorker(0)
+				w.Release(len(testdata))
+				t := testdata
+				err := w.Run(ctx, func(tokens int) (used int, done bool, err error) {
+					towrite := min(tokens, len(t))
+					written, err := s.Write(t[:min(tokens, len(t))])
+					t = t[written:]
+					return towrite, len(t) == 0, err
+				})
+				if err != nil {
+					panic(err)
+				}
+				wg.Done()
+			}(i)
+		}
+		wg.Wait()
+	}
+	if err := s.Close(); err != nil {
+		panic(err)
+	}
+}
+
+func BenchmarkWithoutFlowControl(b *testing.B) {
+	const (
+		workers = 100
+		mtu     = 1 << 16
+	)
+	s := newNullConn(mtu)
+	for n := 0; n < b.N; n++ {
+		for cursor := 0; cursor < len(testdata); cursor += mtu {
+			for i := 0; i < workers; i++ {
+				_, err := s.Write(testdata[cursor : cursor+mtu])
+				if err != nil {
+					panic(err)
+				}
+			}
+		}
+	}
+	if err := s.Close(); err != nil {
+		panic(err)
+	}
+}