ref/runtime/internal/flow: Add a flowcontroller.
The flow controller's job is to schedule threads keeping in mind
flow control counters, MTUs, and fairness.
The flow controller does impose some overhead as shown
by a simple benchmark:
BenchmarkWithFlowControl-12 300 57130246 ns/op
BenchmarkWithoutFlowControl-12 500 31617551 ns/op
Change-Id: Ieda29148928c8a6b4c51ebde72543b2f51b9c970
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
new file mode 100644
index 0000000..1a47ada
--- /dev/null
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowcontrol
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/runtime/internal/flow/flowcontrol"
+
+var ErrConcurrentRun = verror.Register(
+ verror.ID(pkgPath+".ErrConcurrentRun"),
+ verror.NoRetry, "Run called concurrently.")
+var ErrWrongFlowController = verror.Register(
+ verror.ID(pkgPath+".ErrWrongFlowController"),
+ verror.NoRetry, "Release called for worker from different flow controller.")
+
+// Runners are called by Workers. For a given flow controller
+// only one Runner will be running at a time. tokens specifies
+// the number of tokens available for this call. Implementors
+// should return the number of tokens used, whether they are done
+// with all their work, and any error encountered.
+// Runners will be called repeatedly within a single Run call until
+// either err != nil or done is true.
+type Runner func(tokens int) (used int, done bool, err error)
+
+type counterState struct {
+ // TODO(mattr): Add deficit if we allow multi-slice writes.
+ borrowed int // Number of tokens borrowed from the shared pool.
+ released int // Number of tokens available via our flow control counters.
+ everReleased bool // True if tokens have ever been released to this worker.
+}
+
+type state int
+
+const (
+ idle = state(iota)
+ running
+ active
+)
+
+// Worker represents a single flowcontrolled worker.
+// Workers keep track of flow control counters to ensure
+// producers do not overwhelm consumers. Only one Worker
+// will be executing at a time.
+type Worker struct {
+ fc *FlowController
+ priority int
+ work chan struct{}
+
+ // These variables are protected by fc.mu.
+ counters *counterState // State related to the flow control counters.
+ state state
+ next, prev *Worker // Used as a list when in an active queue.
+}
+
+// Run runs r potentially multiple times.
+// Only one worker's r function will run at a time for a given FlowController.
+// A single worker's Run function should not be called concurrently from multiple
+// goroutines.
+func (w *Worker) Run(ctx *context.T, r Runner) (err error) {
+ w.fc.mu.Lock()
+ if w.state != idle {
+ w.fc.mu.Unlock()
+ return verror.New(ErrConcurrentRun, ctx)
+ }
+
+ w.state = running
+ if w.readyLocked() {
+ w.fc.activateLocked(w)
+ w.state = active
+ }
+
+ for {
+ next := w.fc.nextWorkerLocked()
+ for w.fc.writing != w && err == nil {
+ w.fc.mu.Unlock()
+ if next != nil {
+ next.work <- struct{}{}
+ }
+ select {
+ case <-ctx.Done():
+ err = ctx.Err()
+ case <-w.work:
+ }
+ w.fc.mu.Lock()
+ }
+ if err != nil {
+ break
+ }
+
+ toWrite := w.fc.mtu
+ if w.counters != nil {
+ if !w.counters.everReleased {
+ toWrite = min(w.fc.shared, w.fc.mtu)
+ w.counters.released += toWrite
+ w.counters.borrowed += toWrite
+ w.fc.shared -= toWrite
+ } else {
+ toWrite = min(w.counters.released, w.fc.mtu)
+ }
+ }
+
+ w.fc.mu.Unlock()
+ var written int
+ var done bool
+ written, done, err = r(toWrite)
+ w.fc.mu.Lock()
+
+ if w.counters != nil {
+ w.counters.released -= written
+ if w.counters.released > 0 && w.counters.borrowed > 0 {
+ toReturn := min(w.counters.released, w.counters.borrowed)
+ w.counters.borrowed -= toReturn
+ w.counters.released -= toReturn
+ w.fc.shared += toReturn
+ }
+ }
+
+ w.fc.writing = nil
+ if err != nil || done {
+ break
+ }
+ if !w.readyLocked() {
+ w.fc.deactivateLocked(w)
+ w.state = running
+ }
+ }
+
+ w.state = idle
+ w.fc.deactivateLocked(w)
+ next := w.fc.nextWorkerLocked()
+ w.fc.mu.Unlock()
+ if next != nil {
+ next.work <- struct{}{}
+ }
+ return err
+}
+
+func (w *Worker) releaseLocked(tokens int) {
+ if w.counters == nil {
+ return
+ }
+ w.counters.everReleased = true
+ if w.counters.borrowed > 0 {
+ n := min(w.counters.borrowed, tokens)
+ w.counters.borrowed -= n
+ w.fc.shared += n
+ tokens -= n
+ }
+ w.counters.released += tokens
+ if w.state == running && w.readyLocked() {
+ w.fc.activateLocked(w)
+ }
+}
+
+// Release releases tokens to this worker.
+// Workers will first repay any debts to the flow controllers shared pool
+// and use any surplus in subsequent calls to Run.
+func (w *Worker) Release(tokens int) {
+ w.fc.mu.Lock()
+ w.releaseLocked(tokens)
+ next := w.fc.nextWorkerLocked()
+ w.fc.mu.Unlock()
+ if next != nil {
+ next.work <- struct{}{}
+ }
+}
+
+func (w *Worker) readyLocked() bool {
+ if w.counters == nil {
+ return true
+ }
+ return w.counters.released > 0 || (!w.counters.everReleased && w.fc.shared > 0)
+}
+
+// FlowController manages multiple Workers to ensure only one runs at a time.
+// The workers also obey counters so that producers don't overwhelm consumers.
+type FlowController struct {
+ mtu int
+
+ mu sync.Mutex
+ shared int
+ active []*Worker
+ writing *Worker
+}
+
+// New creates a new FlowController. Shared is the number of shared tokens
+// that flows can borrow from before they receive their first Release.
+// Mtu is the maximum number of tokens to be consumed by a single Runner
+// invocation.
+func New(shared, mtu int) *FlowController {
+ return &FlowController{shared: shared, mtu: mtu}
+}
+
+// NewWorker creates a new worker. Workers keep track of token counters
+// for a flow controlled process. The order that workers
+// execute is controlled by priority. Higher priority
+// workers that are ready will run before any lower priority
+// workers.
+func (fc *FlowController) NewWorker(priority int) *Worker {
+ w := &Worker{
+ fc: fc,
+ priority: priority,
+ work: make(chan struct{}),
+ counters: &counterState{},
+ }
+ w.next, w.prev = w, w
+ return w
+}
+
+type Release struct {
+ Worker *Worker
+ Tokens int
+}
+
+// Release releases to many Workers atomically. It is conceptually
+// the same as calling release on each worker indepedently.
+func (fc *FlowController) Release(to []Release) error {
+ fc.mu.Lock()
+ for _, t := range to {
+ if t.Worker.fc != fc {
+ return verror.New(ErrWrongFlowController, nil)
+ }
+ t.Worker.releaseLocked(t.Tokens)
+ }
+ next := fc.nextWorkerLocked()
+ fc.mu.Unlock()
+ if next != nil {
+ next.work <- struct{}{}
+ }
+ return nil
+}
+
+// Run runs the given runner on a non-flow controlled Worker. This
+// worker does not wait for any flow control tokens and is limited
+// only by the MTU.
+func (fc *FlowController) Run(ctx *context.T, p int, r Runner) error {
+ w := &Worker{
+ fc: fc,
+ priority: p,
+ work: make(chan struct{}),
+ }
+ w.next, w.prev = w, w
+ return w.Run(ctx, r)
+}
+
+func (fc *FlowController) nextWorkerLocked() *Worker {
+ if fc.writing == nil {
+ for p, head := range fc.active {
+ if head != nil {
+ fc.active[p] = head.next
+ fc.writing = head
+ return head
+ }
+ }
+ }
+ return nil
+}
+
+func (fc *FlowController) activateLocked(w *Worker) {
+ if w.priority >= len(fc.active) {
+ newActive := make([]*Worker, int(w.priority)+1)
+ copy(newActive, fc.active)
+ fc.active = newActive
+ }
+ head := fc.active[w.priority]
+ if head == nil {
+ fc.active[w.priority] = w
+ } else {
+ w.prev, w.next = head.prev, head
+ w.prev.next, w.next.prev = w, w
+ }
+}
+
+func (fc *FlowController) deactivateLocked(w *Worker) {
+ if head := fc.active[w.priority]; head == w {
+ if w.next == w {
+ fc.active[w.priority] = nil
+ } else {
+ fc.active[w.priority] = w.next
+ }
+ }
+ w.next.prev, w.prev.next = w.prev, w.next
+ w.next, w.prev = w, w
+}
+
+func (fc *FlowController) numActive() int {
+ n := 0
+ fc.mu.Lock()
+ for _, head := range fc.active {
+ if head != nil {
+ n++
+ for cur := head.next; cur != head; cur = cur.next {
+ n++
+ }
+ }
+ }
+ fc.mu.Unlock()
+ return n
+}
+
+// String writes a string representation of the flow controller.
+// This can be helpful in debugging.
+func (fc *FlowController) String() string {
+ buf := &bytes.Buffer{}
+ fmt.Fprintf(buf, "FlowController %p: \n", fc)
+
+ fc.mu.Lock()
+ fmt.Fprintf(buf, "writing: %p\n", fc.writing)
+ fmt.Fprintln(buf, "active:")
+ for p, head := range fc.active {
+ fmt.Fprintf(buf, " %v: %p", p, head)
+ if head != nil {
+ for cur := head.next; cur != head; cur = cur.next {
+ fmt.Fprintf(buf, " %p", cur)
+ }
+ }
+ fmt.Fprintln(buf, "")
+ }
+ fc.mu.Unlock()
+ return buf.String()
+}
+
+func min(head int, rest ...int) int {
+ for _, r := range rest {
+ if r < head {
+ head = r
+ }
+ }
+ return head
+}
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol_test.go b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
new file mode 100644
index 0000000..7388b8f
--- /dev/null
+++ b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
@@ -0,0 +1,293 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowcontrol
+
+import (
+ "bytes"
+ "crypto/rand"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/ref/test"
+)
+
+var testdata = make([]byte, 1<<20)
+
+func init() {
+ test.Init()
+ _, err := io.ReadFull(rand.Reader, testdata)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func TestFlowControl(t *testing.T) {
+ const (
+ workers = 10
+ messages = 10
+ )
+
+ msgs := make(map[int][]byte)
+ fc := New(256, 64)
+
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ var wg sync.WaitGroup
+ wg.Add(workers)
+ for i := 0; i < workers; i++ {
+ go func(idx int) {
+ el := fc.NewWorker(0)
+ go el.Release(messages * 5) // Try to make races happen
+ j := 0
+ el.Run(ctx, func(tokens int) (used int, done bool, err error) {
+ msgs[idx] = append(msgs[idx], []byte(fmt.Sprintf("%d-%d,", idx, j))...)
+ j++
+ return 3, j >= messages, nil
+ })
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+
+ for i := 0; i < workers; i++ {
+ buf := &bytes.Buffer{}
+ for j := 0; j < messages; j++ {
+ fmt.Fprintf(buf, "%d-%d,", i, j)
+ }
+ if want, got := buf.String(), string(msgs[i]); want != got {
+ t.Errorf("Got %s, want %s for %d", got, want, i)
+ }
+ }
+}
+
+func expect(t *testing.T, work chan interface{}, values ...interface{}) {
+ for i, w := range values {
+ if got := <-work; got != w {
+ t.Errorf("expected %p in pos %d got %p", w, i, got)
+ }
+ }
+}
+
+func TestOrdering(t *testing.T) {
+ const mtu = 10
+
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ fc := New(0, mtu)
+
+ work := make(chan interface{})
+ worker := func(p int) *Worker {
+ w := fc.NewWorker(p)
+ go w.Run(ctx, func(t int) (int, bool, error) {
+ work <- w
+ return t, false, nil
+ })
+ w.Release(mtu)
+ <-work
+ return w
+ }
+
+ w0 := worker(0)
+ w1a := worker(1)
+ w1b := worker(1)
+ w1c := worker(1)
+ w2 := worker(2)
+
+ // Release to all the flows at once and ensure the writes
+ // happen in the correct order.
+ fc.Release([]Release{{w0, 2 * mtu}, {w1a, 2 * mtu}, {w1b, 3 * mtu}, {w1c, 0}, {w2, mtu}})
+ expect(t, work, w0, w0, w1a, w1b, w1a, w1b, w1b, w2)
+}
+
+func TestSharedCounters(t *testing.T) {
+ const (
+ mtu = 10
+ shared = 2 * mtu
+ )
+
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ fc := New(shared, mtu)
+
+ work := make(chan interface{})
+
+ worker := func(p int) *Worker {
+ w := fc.NewWorker(p)
+ go w.Run(ctx, func(t int) (int, bool, error) {
+ work <- w
+ return t, false, nil
+ })
+ return w
+ }
+
+ // w0 should run twice on shared counters.
+ w0 := worker(0)
+ expect(t, work, w0, w0)
+
+ w1 := worker(1)
+ // Now Release to w0 which shouldn't allow it to run since it's just repaying, but
+ // should allow w1 to run on the returned shared counters.
+ w0.Release(2 * mtu)
+ expect(t, work, w1, w1)
+
+ // Releasing again will allow w0 to run.
+ w0.Release(mtu)
+ expect(t, work, w0)
+}
+
+func TestConcurrentRun(t *testing.T) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ const mtu = 10
+ fc := New(mtu, mtu)
+
+ ready, wait := make(chan struct{}), make(chan struct{})
+ w := fc.NewWorker(0)
+ go w.Run(ctx, func(t int) (int, bool, error) {
+ close(ready)
+ <-wait
+ return t, true, nil
+ })
+ <-ready
+ if err := w.Run(ctx, nil); verror.ErrorID(err) != ErrConcurrentRun.ID {
+ t.Errorf("expected concurrent run error got: %v", err)
+ }
+ close(wait)
+}
+
+func TestNonFlowControlledRun(t *testing.T) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ const mtu = 10
+ fc := New(0, mtu)
+
+ work := make(chan interface{})
+ ready, wait := make(chan struct{}), make(chan struct{})
+ // Start one worker running
+ go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+ close(ready)
+ <-wait
+ return t, true, nil
+ })
+ <-ready
+ // Now queue up sever workers and make sure they execute in order.
+ go fc.Run(ctx, 2, func(t int) (int, bool, error) {
+ work <- "c"
+ return t, true, nil
+ })
+ go fc.Run(ctx, 1, func(t int) (int, bool, error) {
+ work <- "b"
+ return t, true, nil
+ })
+ go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+ work <- "a"
+ return t, true, nil
+ })
+ for fc.numActive() < 4 {
+ time.Sleep(time.Millisecond)
+ }
+ close(wait)
+ expect(t, work, "a", "b", "c")
+}
+
+func newNullConn(mtu int) net.Conn {
+ ln, err := net.Listen("tcp", ":0")
+ if err != nil {
+ panic(err)
+ }
+ addr := ln.Addr()
+
+ go func() {
+ conn, err := ln.Accept()
+ if err != nil {
+ panic(err)
+ }
+ ln.Close()
+ buf := make([]byte, mtu)
+ for {
+ _, err := conn.Read(buf)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ panic(err)
+ }
+ }
+ conn.Close()
+ }()
+
+ conn, err := net.Dial(addr.Network(), addr.String())
+ if err != nil {
+ panic(err)
+ }
+ return conn
+}
+
+func BenchmarkWithFlowControl(b *testing.B) {
+ const (
+ mtu = 1 << 16
+ shared = 1 << 20
+ workers = 100
+ )
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ s := newNullConn(mtu)
+
+ for n := 0; n < b.N; n++ {
+ fc := New(shared, mtu)
+ var wg sync.WaitGroup
+ wg.Add(workers)
+ for i := 0; i < workers; i++ {
+ go func(idx int) {
+ w := fc.NewWorker(0)
+ w.Release(len(testdata))
+ t := testdata
+ err := w.Run(ctx, func(tokens int) (used int, done bool, err error) {
+ towrite := min(tokens, len(t))
+ written, err := s.Write(t[:min(tokens, len(t))])
+ t = t[written:]
+ return towrite, len(t) == 0, err
+ })
+ if err != nil {
+ panic(err)
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ }
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+}
+
+func BenchmarkWithoutFlowControl(b *testing.B) {
+ const (
+ workers = 100
+ mtu = 1 << 16
+ )
+ s := newNullConn(mtu)
+ for n := 0; n < b.N; n++ {
+ for cursor := 0; cursor < len(testdata); cursor += mtu {
+ for i := 0; i < workers; i++ {
+ _, err := s.Write(testdata[cursor : cursor+mtu])
+ if err != nil {
+ panic(err)
+ }
+ }
+ }
+ }
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+}