blob: 2bd42a31c2fc6761cc2d24482425db510a89524b [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The nsync package provides a mutex Mu and a Mesa-style condition variable CV.
//
// The nsync primitives differ from those in sync in that nsync provides
// timed/cancellable wait on CV, and try-lock on Mu; CV's wait primitives take
// the mutex as an explicit argument to remind the reader that they have a side
// effect on the mutex; the zero value CV can be used without further
// initialization; and Mu forbids a lock acquired by one thread to be released
// by another.
//
// As well as Mu and CV being usable with one another, an nsync,Mu can be used
// with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.
package nsync
import "sync/atomic"
// Implementation notes
//
// The implementation of Mu and CV both use spinlocks to protect their waiter
// queues. The spinlocks are implemented with atomic operations and a delay
// loop found in common.go. They could use sync.Mutex, but I wished to have an
// implementation independent of the sync package (except for the trivial use
// of the sync.Locker interface.
//
// Mu and CV use the same type of doubly-linked list of waiters (see
// waiter.go). This allows waiters to be transferred from the CV queue to the
// Mu queue when a thread is logically woken from the CV but would immediately
// go to sleep on the Mu. See the wakeWaiters() call in cv.go.
//
// In Mu, the "designated waker" is a thread that was waiting on Mu, has been
// woken up, but as yet has neither acquired nor gone back to waiting. The
// presence of such a thread is indicated by the muDesigWaker bit in the Mu
// word. This bit allows the Unlock() code to avoid waking a second waiter
// when there's already one will wake the next thread when the time comes.
// This speeds things up when the lock is heavily contended, and the critical
// sections are small.
//
// The weasel words "with high probability" in the specification of TryLock()
// prevent clients from believing that they can determine with certainty
// whether another thread has given up a lock yet. This, together with the
// requirement that a thread that acquired a mutex must release it (rather than
// it being released by another thread), prohibits clients from using Mu as a
// sort of semaphore. The intent is that it be used only for traditional
// mutual exclusion. This leaves room for certain future optimizations, and
// make it easier to apply detection of potential races via candidate lock-set
// algorithms, should that ever be desired.
//
// The condition variable has a wait with an absolute rather than a relative
// timeout. This is less error prone, as described in the comment on
// CV.WaitWithDeadline(). Alas, relative timeouts are seductive in trivial
// examples (such as tests). These are the first things that people try, so
// they are likely to be requested. If enough people complain we could give
// them that particular piece of rope.
//
// The condition variable uses an internal binary semaphore that is passed a
// *time.Timer stored in a *waiter for its expirations. It is as invariant
// that the timer is inactive and its channel drained when the waiter is on the
// waiter free list.
// A Mu is a mutex. Its zero value is valid, and unlocked.
// It is similar to sync.Mutex, but implements TryLock().
//
// A Mu can be "free", or held by a single thread (aka goroutine). A thread
// that acquires it should eventually release it. It is not legal to acquire a
// Mu in one thread and release it in another.
//
// Example usage, where p.mu is an nsync.Mu protecting the invariant p.a+p.b==0
// p.mu.Lock()
// // The current thread now has exclusive access to p.a and p.b; invariant assumed true.
// p.a++
// p.b-- // restore invariant p.a+p.b==0 before releasing p.mu
// p.mu.Unlock()
type Mu struct {
word uint32 // bits: see below
waiters dll // Head of a doubly-linked list of enqueued waiters; under mu.
}
// Bits in Mu.word
const (
muLock = 1 << iota // lock is held.
muSpinlock = 1 << iota // spinlock is held (protects waiters).
muWaiting = 1 << iota // waiter list is non-empty.
muDesigWaker = 1 << iota // a former waiter has been woken, and has not yet acquired or slept once more.
)
// TryLock() attempts to acquire *mu without blocking, and returns whether it is successful.
// It returns true with high probability if *mu was free on entry.
func (mu *Mu) TryLock() bool {
if atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
return true
}
oldWord := atomic.LoadUint32(&mu.word)
return (oldWord&muLock) == 0 && atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) // acquire CAS
}
// Lock() blocks until *mu is free and then acquires it.
func (mu *Mu) Lock() {
if !atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
oldWord := atomic.LoadUint32(&mu.word)
if (oldWord&muLock) != 0 || !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) { // acquire CAS
mu.lockSlow(newWaiter(), 0)
}
}
}
// lockSlow() locks *mu, waiting on *w if it needs to wait.
// "clear" should be zero if the thread has not previously slept on *mu, and
// muDesigWaker if it has; this represents bits that lockSlow() must clear when
// it either acquires or sleeps on *mu.
func (mu *Mu) lockSlow(w *waiter, clear uint32) {
var attempts uint // attempt count; used for spinloop backoff
w.cvMu = nil // not a CV wait
for {
oldWord := atomic.LoadUint32(&mu.word)
if (oldWord & muLock) == 0 {
// lock is not held; try to acquire, possibly clearing muDesigWaker
if atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muLock)&^clear) { // acquire CAS
freeWaiter(w)
return
}
} else if (oldWord&muSpinlock) == 0 &&
atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muSpinlock|muWaiting)&^clear) { // acquire CAS
// Spinlock is now held, and lock is held by someone
// else; muWaiting has also been set; queue ourselves.
atomic.StoreUint32(&w.waiting, 1)
if (oldWord & muWaiting) == 0 { // init the waiter list if it's empty.
mu.waiters.MakeEmpty()
}
w.q.InsertAfter(&mu.waiters)
// Release spinlock. Cannot use a store here, because
// the current thread does not hold the mutex. If
// another thread were a designated waker, the mutex
// holder could be concurrently unlocking, even though
// we hold the spinlock.
oldWord = atomic.LoadUint32(&mu.word)
for !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord&^muSpinlock) { // release CAS
oldWord = atomic.LoadUint32(&mu.word)
}
// Wait until awoken.
for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
w.sem.P()
}
attempts = 0
clear = muDesigWaker
}
attempts = spinDelay(attempts)
}
}
// Unlock() unlocks *mu, and wakes a waiter if there is one.
func (mu *Mu) Unlock() {
// Go is a garbage-collected language, so it's legal (and slightly
// faster on x86) to release with an atomic add before checking for
// waiters. Without GC, this would not work because another thread
// could acquire, decrement a reference count and deallocate the mutex
// before the current thread touched the mutex word again to wake
// waiters.
newWord := atomic.AddUint32(&mu.word, ^uint32(muLock-1))
if (newWord&(muLock|muWaiting)) == 0 || (newWord&(muLock|muDesigWaker)) == muDesigWaker {
return
}
if (newWord & muLock) != 0 {
panic("attempt to Unlock a free nsync.Mu")
}
var attempts uint // attempt count; used for backoff
for {
oldWord := atomic.LoadUint32(&mu.word)
if (oldWord&muWaiting) == 0 || (oldWord&muDesigWaker) == muDesigWaker {
return // no one to wake, or there's already a designated waker waking up.
} else if (oldWord&muSpinlock) == 0 &&
atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muSpinlock|muDesigWaker) { // acquire CAS
// The spinlock is now held, and we've set the designated wake flag, since
// we're likely to wake a thread that will become that designated waker.
if mu.waiters.elem != nil {
panic("non-nil mu.waiters.dll.elem")
}
// Remove a waiter from the queue, if possible.
var wake *waiter = mu.waiters.prev.elem
var clearOnRelease uint32 = muSpinlock
if wake != nil {
wake.q.Remove()
} else {
clearOnRelease |= muDesigWaker // not waking a waiter => no designated waker
}
if mu.waiters.IsEmpty() {
clearOnRelease |= muWaiting // no waiters left
}
// Unset the spinlock bit; set the designated wake
// bit---the thread we're waking is the new designated
// waker. Cannot use a store here because we no longer
// hold the main lock.
oldWord = atomic.LoadUint32(&mu.word)
for !atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muDesigWaker)&^clearOnRelease) { // release CAS
oldWord = atomic.LoadUint32(&mu.word)
}
if wake != nil { // Wake the waiter
atomic.StoreUint32(&wake.waiting, 0) // release store
wake.sem.V()
}
return
}
attempts = spinDelay(attempts)
}
}
// AssertHeld() panics if *mu is not held.
func (mu *Mu) AssertHeld() {
if (atomic.LoadUint32(&mu.word) & muLock) == 0 {
panic("nsync.Mu not held")
}
}