blob: 1a47adaf1948b8323790ebf8bbe8296a7d2c1c5e [file] [log] [blame]
Matt Rosencrantz5f811c02015-06-27 13:46:49 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package flowcontrol
6
7import (
8 "bytes"
9 "fmt"
10 "sync"
11
12 "v.io/v23/context"
13 "v.io/v23/verror"
14)
15
16const pkgPath = "v.io/x/ref/runtime/internal/flow/flowcontrol"
17
18var ErrConcurrentRun = verror.Register(
19 verror.ID(pkgPath+".ErrConcurrentRun"),
20 verror.NoRetry, "Run called concurrently.")
21var ErrWrongFlowController = verror.Register(
22 verror.ID(pkgPath+".ErrWrongFlowController"),
23 verror.NoRetry, "Release called for worker from different flow controller.")
24
25// Runners are called by Workers. For a given flow controller
26// only one Runner will be running at a time. tokens specifies
27// the number of tokens available for this call. Implementors
28// should return the number of tokens used, whether they are done
29// with all their work, and any error encountered.
30// Runners will be called repeatedly within a single Run call until
31// either err != nil or done is true.
32type Runner func(tokens int) (used int, done bool, err error)
33
34type counterState struct {
35 // TODO(mattr): Add deficit if we allow multi-slice writes.
36 borrowed int // Number of tokens borrowed from the shared pool.
37 released int // Number of tokens available via our flow control counters.
38 everReleased bool // True if tokens have ever been released to this worker.
39}
40
41type state int
42
43const (
44 idle = state(iota)
45 running
46 active
47)
48
49// Worker represents a single flowcontrolled worker.
50// Workers keep track of flow control counters to ensure
51// producers do not overwhelm consumers. Only one Worker
52// will be executing at a time.
53type Worker struct {
54 fc *FlowController
55 priority int
56 work chan struct{}
57
58 // These variables are protected by fc.mu.
59 counters *counterState // State related to the flow control counters.
60 state state
61 next, prev *Worker // Used as a list when in an active queue.
62}
63
64// Run runs r potentially multiple times.
65// Only one worker's r function will run at a time for a given FlowController.
66// A single worker's Run function should not be called concurrently from multiple
67// goroutines.
68func (w *Worker) Run(ctx *context.T, r Runner) (err error) {
69 w.fc.mu.Lock()
70 if w.state != idle {
71 w.fc.mu.Unlock()
72 return verror.New(ErrConcurrentRun, ctx)
73 }
74
75 w.state = running
76 if w.readyLocked() {
77 w.fc.activateLocked(w)
78 w.state = active
79 }
80
81 for {
82 next := w.fc.nextWorkerLocked()
83 for w.fc.writing != w && err == nil {
84 w.fc.mu.Unlock()
85 if next != nil {
86 next.work <- struct{}{}
87 }
88 select {
89 case <-ctx.Done():
90 err = ctx.Err()
91 case <-w.work:
92 }
93 w.fc.mu.Lock()
94 }
95 if err != nil {
96 break
97 }
98
99 toWrite := w.fc.mtu
100 if w.counters != nil {
101 if !w.counters.everReleased {
102 toWrite = min(w.fc.shared, w.fc.mtu)
103 w.counters.released += toWrite
104 w.counters.borrowed += toWrite
105 w.fc.shared -= toWrite
106 } else {
107 toWrite = min(w.counters.released, w.fc.mtu)
108 }
109 }
110
111 w.fc.mu.Unlock()
112 var written int
113 var done bool
114 written, done, err = r(toWrite)
115 w.fc.mu.Lock()
116
117 if w.counters != nil {
118 w.counters.released -= written
119 if w.counters.released > 0 && w.counters.borrowed > 0 {
120 toReturn := min(w.counters.released, w.counters.borrowed)
121 w.counters.borrowed -= toReturn
122 w.counters.released -= toReturn
123 w.fc.shared += toReturn
124 }
125 }
126
127 w.fc.writing = nil
128 if err != nil || done {
129 break
130 }
131 if !w.readyLocked() {
132 w.fc.deactivateLocked(w)
133 w.state = running
134 }
135 }
136
137 w.state = idle
138 w.fc.deactivateLocked(w)
139 next := w.fc.nextWorkerLocked()
140 w.fc.mu.Unlock()
141 if next != nil {
142 next.work <- struct{}{}
143 }
144 return err
145}
146
147func (w *Worker) releaseLocked(tokens int) {
148 if w.counters == nil {
149 return
150 }
151 w.counters.everReleased = true
152 if w.counters.borrowed > 0 {
153 n := min(w.counters.borrowed, tokens)
154 w.counters.borrowed -= n
155 w.fc.shared += n
156 tokens -= n
157 }
158 w.counters.released += tokens
159 if w.state == running && w.readyLocked() {
160 w.fc.activateLocked(w)
161 }
162}
163
164// Release releases tokens to this worker.
165// Workers will first repay any debts to the flow controllers shared pool
166// and use any surplus in subsequent calls to Run.
167func (w *Worker) Release(tokens int) {
168 w.fc.mu.Lock()
169 w.releaseLocked(tokens)
170 next := w.fc.nextWorkerLocked()
171 w.fc.mu.Unlock()
172 if next != nil {
173 next.work <- struct{}{}
174 }
175}
176
177func (w *Worker) readyLocked() bool {
178 if w.counters == nil {
179 return true
180 }
181 return w.counters.released > 0 || (!w.counters.everReleased && w.fc.shared > 0)
182}
183
184// FlowController manages multiple Workers to ensure only one runs at a time.
185// The workers also obey counters so that producers don't overwhelm consumers.
186type FlowController struct {
187 mtu int
188
189 mu sync.Mutex
190 shared int
191 active []*Worker
192 writing *Worker
193}
194
195// New creates a new FlowController. Shared is the number of shared tokens
196// that flows can borrow from before they receive their first Release.
197// Mtu is the maximum number of tokens to be consumed by a single Runner
198// invocation.
199func New(shared, mtu int) *FlowController {
200 return &FlowController{shared: shared, mtu: mtu}
201}
202
203// NewWorker creates a new worker. Workers keep track of token counters
204// for a flow controlled process. The order that workers
205// execute is controlled by priority. Higher priority
206// workers that are ready will run before any lower priority
207// workers.
208func (fc *FlowController) NewWorker(priority int) *Worker {
209 w := &Worker{
210 fc: fc,
211 priority: priority,
212 work: make(chan struct{}),
213 counters: &counterState{},
214 }
215 w.next, w.prev = w, w
216 return w
217}
218
219type Release struct {
220 Worker *Worker
221 Tokens int
222}
223
224// Release releases to many Workers atomically. It is conceptually
225// the same as calling release on each worker indepedently.
226func (fc *FlowController) Release(to []Release) error {
227 fc.mu.Lock()
228 for _, t := range to {
229 if t.Worker.fc != fc {
230 return verror.New(ErrWrongFlowController, nil)
231 }
232 t.Worker.releaseLocked(t.Tokens)
233 }
234 next := fc.nextWorkerLocked()
235 fc.mu.Unlock()
236 if next != nil {
237 next.work <- struct{}{}
238 }
239 return nil
240}
241
242// Run runs the given runner on a non-flow controlled Worker. This
243// worker does not wait for any flow control tokens and is limited
244// only by the MTU.
245func (fc *FlowController) Run(ctx *context.T, p int, r Runner) error {
246 w := &Worker{
247 fc: fc,
248 priority: p,
249 work: make(chan struct{}),
250 }
251 w.next, w.prev = w, w
252 return w.Run(ctx, r)
253}
254
255func (fc *FlowController) nextWorkerLocked() *Worker {
256 if fc.writing == nil {
257 for p, head := range fc.active {
258 if head != nil {
259 fc.active[p] = head.next
260 fc.writing = head
261 return head
262 }
263 }
264 }
265 return nil
266}
267
268func (fc *FlowController) activateLocked(w *Worker) {
269 if w.priority >= len(fc.active) {
270 newActive := make([]*Worker, int(w.priority)+1)
271 copy(newActive, fc.active)
272 fc.active = newActive
273 }
274 head := fc.active[w.priority]
275 if head == nil {
276 fc.active[w.priority] = w
277 } else {
278 w.prev, w.next = head.prev, head
279 w.prev.next, w.next.prev = w, w
280 }
281}
282
283func (fc *FlowController) deactivateLocked(w *Worker) {
284 if head := fc.active[w.priority]; head == w {
285 if w.next == w {
286 fc.active[w.priority] = nil
287 } else {
288 fc.active[w.priority] = w.next
289 }
290 }
291 w.next.prev, w.prev.next = w.prev, w.next
292 w.next, w.prev = w, w
293}
294
295func (fc *FlowController) numActive() int {
296 n := 0
297 fc.mu.Lock()
298 for _, head := range fc.active {
299 if head != nil {
300 n++
301 for cur := head.next; cur != head; cur = cur.next {
302 n++
303 }
304 }
305 }
306 fc.mu.Unlock()
307 return n
308}
309
310// String writes a string representation of the flow controller.
311// This can be helpful in debugging.
312func (fc *FlowController) String() string {
313 buf := &bytes.Buffer{}
314 fmt.Fprintf(buf, "FlowController %p: \n", fc)
315
316 fc.mu.Lock()
317 fmt.Fprintf(buf, "writing: %p\n", fc.writing)
318 fmt.Fprintln(buf, "active:")
319 for p, head := range fc.active {
320 fmt.Fprintf(buf, " %v: %p", p, head)
321 if head != nil {
322 for cur := head.next; cur != head; cur = cur.next {
323 fmt.Fprintf(buf, " %p", cur)
324 }
325 }
326 fmt.Fprintln(buf, "")
327 }
328 fc.mu.Unlock()
329 return buf.String()
330}
331
332func min(head int, rest ...int) int {
333 for _, r := range rest {
334 if r < head {
335 head = r
336 }
337 }
338 return head
339}