blob: 10de132a9c4d127926bb1e4595e3c69e7546e0d0 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package nsync
import "sync"
import "sync/atomic"
import "time"
// See also the implementation notes at the top of mu.go.
// A CV is a condition variable in the style of Mesa, Java, POSIX, and Go's sync.Cond.
// It allows a thread to wait for a condition on state protected by a mutex,
// and to proceed with the mutex held and the condition true.
//
// When compared with sync.Cond: (a) CV adds WaitWithDeadline() which allows
// timeouts and cancellation, (b) the mutex is an explicit argument of the wait
// calls to remind the reader that they have a side-effect on the mutex, and
// (c) (as a result of (b)), a zero-valued CV is a valid CV with no enqueued
// waiters, so there is no need of a call to construct a CV.
//
// Usage:
//
// After making the desired predicate true, call:
// cv.Signal() // If at most one thread can make use of the predicate becoming true.
// or
// cv.Broadcast() // If multiple threads can make use of the predicate becoming true.
//
// To wait for a predicate with no deadline (assuming cv.Broadcast() is called
// whenever the predicate becomes true):
// mu.Lock()
// for !some_predicate_protected_by_mu { // the for-loop is required.
// cv.Wait(&mu)
// }
// // predicate is now true
// mu.Unlock()
//
// To wait for a predicate with a deadline (assuming cv.Broadcast() is called
// whenever the predicate becomes true):
// mu.Lock()
// for !some_predicate_protected_by_mu && cv.WaitWithDeadline(&mu, absDeadline, cancelChan) == nsync.OK {
// }
// if some_predicate_protected_by_mu { // predicate is true
// } else { // predicate is false, and deadline expired, or cancelChan was closed.
// }
// mu.Unlock()
// or, if the predicate is complex and you wish to write it just once and
// inline, you could use the following instead of the for-loop above:
// mu.Lock()
// var predIsTrue bool
// for outcome := OK; ; outcome = cv.WaitWithDeadline(&mu, absDeadline, cancelChan) {
// if predIsTrue = some_predicate_protected_by_mu; predIsTrue || outcome != nsync.OK {
// break
// }
// }
// if predIsTrue { // predicate is true
// } else { // predicate is false, and deadline expired, or cancelChan was closed.
// }
// mu.Unlock()
//
// As the examples show, Mesa-style condition variables require that waits use
// a loop that tests the predicate anew after each wait. It may be surprising
// that these are preferred over the precise wakeups offered by the condition
// variables in Hoare monitors. Imprecise wakeups make more efficient use of
// the critical section, because threads can enter it while a woken thread is
// still emerging from the scheduler, which may take thousands of cycles.
// Further, they make the programme easier to read and debug by making the
// predicate explicit locally at the wait, where the predicate is about to be
// assumed; the reader does not have to infer the predicate by examining all
// the places where wakeups may occur.
type CV struct {
word uint32 // see bits below; read and written atomically
waiters dll // Head of a doubly-linked list of enqueued waiters; under mu.
}
// Bits in CV.word
const (
cvSpinlock = 1 << iota // protects waiters
cvNonEmpty = 1 << iota // waiters list is non-empty
)
// Values returned by CV.WaitWithDeadline().
const (
OK = iota // Neither expired nor cancelled.
Expired = iota // absDeadline expired.
Cancelled = iota // cancelChan was closed.
)
// WaitWithDeadline() atomically releases "mu" and blocks the calling thread on
// *cv. It then waits until awakened by a call to Signal() or Broadcast() (or
// a spurious wakeup), or by the time reaching absDeadline, or by cancelChan
// being closed. In all cases, it reacquires "mu", and returns the reason for
// the call returned (OK, Expired, or Cancelled). Use
// absDeadline==nsync.NoDeadline for no deadline, and cancelChan==nil for no
// cancellation. WaitWithDeadline() should be used in a loop, as with all
// Mesa-style condition variables. See examples above.
//
// There are two reasons for using an absolute deadline, rather than a relative
// timeout---these are why pthread_cond_timedwait() also uses an absolute
// deadline. First, condition variable waits have to be used in a loop; with
// an absolute times, the deadline does not have to be recomputed on each
// iteration. Second, in most real programmes, some activity (such as an RPC
// to a server, or when guaranteeing response time in a UI), there is a
// deadline imposed by the specification or the caller/user; relative delays
// can shift arbitrarily with scheduling delays, and so after multiple waits
// might extend beyond the expected deadline. Relative delays tend to be more
// convenient mostly in tests and trivial examples than they are in real
// programmes.
func (cv *CV) WaitWithDeadline(mu sync.Locker, absDeadline time.Time, cancelChan <-chan struct{}) (outcome int) {
var w *waiter = newWaiter()
atomic.StoreUint32(&w.waiting, 1)
cvMu, _ := mu.(*Mu)
w.cvMu = cvMu // If the Locker is an nsync.Mu, record its address, else record nil.
oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock|cvNonEmpty) // acquire spinlock, set non-empty
if (oldWord & cvNonEmpty) == 0 {
cv.waiters.MakeEmpty() // initialize the waiter queue if it was empty.
}
w.q.InsertAfter(&cv.waiters)
// Release the spin lock.
atomic.StoreUint32(&cv.word, oldWord|cvNonEmpty) // release store
mu.Unlock() // Release *mu.
// Prepare a time.Timer for the deadline, if any. We use a time.Timer
// pre-allocated for the waiter, to avoid allocating and garbage
// collecting one on each wait.
var deadlineTimer *time.Timer
if absDeadline != NoDeadline {
deadlineTimer = w.deadlineTimer
if deadlineTimer.Reset(absDeadline.Sub(time.Now())) {
// w.deadlineTimer is guaranteed inactive and drained;
// see "Stop any active timer" code below.
panic("deadlineTimer was active")
}
}
// Wait until awoken or a timeout.
semOutcome := OK
var attempts uint
for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
if semOutcome == OK {
semOutcome = w.sem.PWithDeadline(deadlineTimer, cancelChan)
}
if semOutcome != OK && atomic.LoadUint32(&w.waiting) != 0 { // acquire load
// A timeout or cancellation occurred, and no wakeup. Acquire the spinlock, and confirm.
oldWord = spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock)
// Check that w wasn't removed from the queue after we
// checked above, but before we acquired the spinlock.
// The call to IsInList() confirms that the waiter *w is still governed
// by *cv's spinlock; otherwise, some other thread is about to set w.waiting==0.
if atomic.LoadUint32(&w.waiting) != 0 && w.q.IsInList(&cv.waiters) { // still in waiter queue
// Not woken, so remove ourselves from queue, and declare a timeout or cancellation.
outcome = semOutcome
w.q.Remove()
atomic.StoreUint32(&w.waiting, 0) // release store
if cv.waiters.IsEmpty() {
oldWord &^= cvNonEmpty
}
}
// Release spinlock.
atomic.StoreUint32(&cv.word, oldWord) // release store
if atomic.LoadUint32(&w.waiting) != 0 {
attempts = spinDelay(attempts) // so we will ultimately yield to scheduler.
}
}
}
// Stop any active timer, and drain its channel.
if deadlineTimer != nil && semOutcome != Expired && !deadlineTimer.Stop() /*expired*/ {
// This receive is synchonous because time.Timer's expire+send
// is not atomic: it may send after Stop() returns false! The
// "semOutcome != Expired" ensures that the value wasn't
// consumed by the PWithDeadline() above.
<-deadlineTimer.C
}
if cvMu != nil && w.cvMu == nil { // waiter was transferred to mu's queue, and woken.
// Requeue mu using existing waiter struct; current thread is the designated waker.
cvMu.lockSlow(w, muDesigWaker)
} else {
// Traditional case: We've woken from the CV, and need to reacquire mu.
freeWaiter(w)
mu.Lock()
}
return outcome
}
// Signal() wakes at least one thread currently enqueued on *cv.
func (cv *CV) Signal() {
if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
var toWakeList *waiter // waiters that we will wake
oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
if !cv.waiters.IsEmpty() {
// Point to first waiter that enqueued itself, and detach it from all others.
toWakeList = cv.waiters.prev.elem
toWakeList.q.Remove()
toWakeList.q.MakeEmpty()
if cv.waiters.IsEmpty() {
oldWord &^= cvNonEmpty
}
}
// Release spinlock.
atomic.StoreUint32(&cv.word, oldWord) // release store
if toWakeList != nil {
wakeWaiters(toWakeList)
}
}
}
// Broadcast() wakes all threads currently enqueued on *cv.
func (cv *CV) Broadcast() {
if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
var toWakeList *waiter // waiters that we will wake
spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
if !cv.waiters.IsEmpty() {
// Point to last waiter that enqueued itself, still attached to all other waiters.
toWakeList = cv.waiters.next.elem
cv.waiters.Remove()
cv.waiters.MakeEmpty()
}
// Release spinlock and mark queue empty.
atomic.StoreUint32(&cv.word, 0) // release store
if toWakeList != nil {
wakeWaiters(toWakeList)
}
}
}
// Wait() atomically releases "mu" and blocks the caller on *cv. It waits
// until it is awakened by a call to Signal() or Broadcast(), or a spurious
// wakeup. It then reacquires "mu", and returns. It is equivalent to a call
// to WaitWithDeadline() with absDeadline==NoDeadline, and a nil cancelChan.
// It should be used in a loop, as with all standard Mesa-style condition
// variables. See examples above.
func (cv *CV) Wait(mu sync.Locker) {
cv.WaitWithDeadline(mu, NoDeadline, nil)
}
// ------------------------------------------
// wakeWaiters() wakes the CV waiters in the circular list pointed to by toWakeList,
// which may not be nil. If the waiter is associated with an nsync.Mu (as
// opposed to another implementation of sync.Locker), the "wakeup" may consist
// of transferring the waiters to the nsync.Mu's queue. Requires:
// - Every element of the list pointed to by toWakeList is a waiter---there is
// no head/sentinel.
// - Every waiter is associated with the same mutex.
func wakeWaiters(toWakeList *waiter) {
var firstWaiter *waiter = toWakeList.q.prev.elem
var mu *Mu = firstWaiter.cvMu
if mu != nil { // waiter is associated with the nsync.Mu *mu.
// We will transfer elements of toWakeList to *mu if all of:
// - mu's spinlock is not held, and
// - either mu is locked, or there's more than one thread on toWakeList, and
// - we acquire the spinlock on the first try.
// The spinlock acquisition also marks mu as having waiters.
var oldMuWord uint32 = atomic.LoadUint32(&mu.word)
var locked bool = (oldMuWord & muLock) != 0
var setDesigWaker uint32 // set to muDesigWaker if a thread is to be woken rather than transferred
if !locked {
setDesigWaker = muDesigWaker
}
if (oldMuWord&muSpinlock) == 0 &&
(locked || firstWaiter != toWakeList) &&
atomic.CompareAndSwapUint32(&mu.word, oldMuWord, (oldMuWord|muSpinlock|muWaiting|setDesigWaker)) { // acquire CAS
// Choose which waiters to transfer, and which to wake.
toTransferList := toWakeList
if locked { // *mu is held; all the threads get transferred.
toWakeList = nil
} else { // *mu is not held; we transfer all but the first thread, which will be woken.
toWakeList = firstWaiter
toWakeList.q.Remove()
toWakeList.q.MakeEmpty()
}
// Transfer the waiters on toTransferList to *mu's
// waiter queue. We've acquired *mu's spinlock. Queue
// the threads there instead of waking them.
for toTransferList != nil {
var toTransfer *waiter = toTransferList.q.prev.elem
if toTransfer == toTransferList { // *toTransferList was singleton; *toTransfer is last waiter
toTransferList = nil
} else {
toTransfer.q.Remove()
}
if toTransfer.cvMu != mu {
panic("multiple mutexes used with condition variable")
}
toTransfer.cvMu = nil // tell WaitWithDeadline() that we moved the waiter to *mu's queue.
// toTransfer.waiting is already 1, from being on CV's waiter queue.
if (oldMuWord & muWaiting) == 0 { // if there were previously no waiters, initialize.
mu.waiters.MakeEmpty()
oldMuWord |= muWaiting // so next iteration won't initialize again.
}
toTransfer.q.InsertAfter(&mu.waiters)
}
// release *mu's spinlock (muWaiting was set by CAS above)
oldMuWord = atomic.LoadUint32(&mu.word)
for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord&^muSpinlock) { // release CAS
oldMuWord = atomic.LoadUint32(&mu.word)
}
} else { // Set muDesigWaker because at least one thread is to be woken.
oldMuWord = atomic.LoadUint32(&mu.word)
for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord|muDesigWaker) {
oldMuWord = atomic.LoadUint32(&mu.word)
}
}
}
// Wake any waiters we didn't manage to enqueue on the Mu.
for toWakeList != nil {
// Take one waiter from the toWakeList.
var toWake *waiter = toWakeList.q.prev.elem
if toWake == toWakeList { // *toWakeList was a singleton; *toWake is the last waiter
toWakeList = nil // tell the loop to exit
} else {
toWake.q.Remove() // get the waiter out of the list
}
// Wake the waiter.
atomic.StoreUint32(&toWake.waiting, 0) // release store
toWake.sem.V()
}
}