v.io/x/lib/nsync: An implementation of mutex and condition variables
The nsync package provides a mutex Mu and a Mesa-style condition
variable CV.
The nsync primitives differ from those in sync in that nsync provides
timed wait on CV and try-lock on Mu, CV's wait primitives take the mutex
as an explicit argument to remind the reader that they have a side
effect on the mutex, the zero value CV can be used without further
initialization, and Mu forbids a lock acquired by one thread to be
released by another.
As well as Mu and CV being usable with one another, an nsync,Mu can be
used with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.
Change-Id: Id6a14ebcf007bd4cb33dcc7b138d35f61db9546a
diff --git a/nsync/.api b/nsync/.api
new file mode 100644
index 0000000..9d10a53
--- /dev/null
+++ b/nsync/.api
@@ -0,0 +1,13 @@
+pkg nsync, const Cancelled ideal-int
+pkg nsync, const Expired ideal-int
+pkg nsync, const OK ideal-int
+pkg nsync, method (*CV) Broadcast()
+pkg nsync, method (*CV) Signal()
+pkg nsync, method (*CV) Wait(sync.Locker)
+pkg nsync, method (*CV) WaitWithDeadline(sync.Locker, time.Time, <-chan struct{}) int
+pkg nsync, method (*Mu) Lock()
+pkg nsync, method (*Mu) TryLock() bool
+pkg nsync, method (*Mu) Unlock()
+pkg nsync, type CV struct
+pkg nsync, type Mu struct
+pkg nsync, var NoDeadline time.Time
diff --git a/nsync/binary_semaphore.go b/nsync/binary_semaphore.go
new file mode 100644
index 0000000..15d055b
--- /dev/null
+++ b/nsync/binary_semaphore.go
@@ -0,0 +1,58 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "time"
+
+// A binarySemaphore is a binary semaphore; it can have values 0 and 1.
+type binarySemaphore struct {
+ ch chan struct{}
+}
+
+// Init() initializes binarySemaphore *s; the initial value is 0.
+func (s *binarySemaphore) Init() {
+ s.ch = make(chan struct{}, 1)
+}
+
+// P() waits until the count of semaphore *s is 1 and decrements the
+// count to 0.
+func (s *binarySemaphore) P() {
+ <-s.ch
+}
+
+// PWithDeadline() waits until one of:
+// the count of semaphore *s is 1, in which case the semaphore is decremented to 0, then OK is returned;
+// or deadlineTimer!=nil and *deadlineTimer expires, then Expired is returned;
+// or cancelChan != nil and cancelChan becomes readable or closed, then Cancelled is returned.
+// The channel "v.io/v23/context".T.Done() is a suitable cancelChan.
+func (s *binarySemaphore) PWithDeadline(deadlineTimer *time.Timer, cancelChan <-chan struct{}) (res int) {
+ var deadlineChan <-chan time.Time
+ if deadlineTimer != nil {
+ deadlineChan = deadlineTimer.C
+ }
+ // Avoid select if possible---it's slow.
+ if deadlineTimer != nil || cancelChan != nil {
+ select {
+ case <-s.ch:
+ res = OK
+ case <-deadlineChan:
+ res = Expired
+ case <-cancelChan:
+ res = Cancelled
+ }
+ } else {
+ <-s.ch
+ res = OK
+ }
+ return res
+}
+
+// V() ensures that the semaphore count of *s is 1.
+func (s *binarySemaphore) V() {
+ select {
+ case s.ch <- struct{}{}:
+ default: // Don't block if the semaphore count is already 1.
+ }
+}
diff --git a/nsync/common.go b/nsync/common.go
new file mode 100644
index 0000000..251ba94
--- /dev/null
+++ b/nsync/common.go
@@ -0,0 +1,49 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "math"
+import "runtime"
+import "sync/atomic"
+import "time"
+
+// NoDeadline represents a time in the far future---a deadline that will not expire.
+var NoDeadline time.Time
+
+// init() initializes the variable NoDeadline.
+// If done inline, the godoc output is even more ugly.
+func init() {
+ NoDeadline = time.Now().Add(time.Duration(math.MaxInt64)).Add(time.Duration(math.MaxInt64))
+}
+
+// spinDelay() is used in spinloops to delay resumption of the loop.
+// Usage:
+// var attempts uint
+// for try_something {
+// attempts = spinDelay(attempts)
+// }
+func spinDelay(attempts uint) uint {
+ if attempts < 7 {
+ for i := 0; i != 1<<attempts; i++ {
+ }
+ attempts++
+ } else {
+ runtime.Gosched()
+ }
+ return attempts
+}
+
+// spinTestAndSet() spins until (*w & test) == 0. It then atomically performs
+// *w |= set and returns the previous value of *w. It performs an acquire
+// barrier.
+func spinTestAndSet(w *uint32, test uint32, set uint32) uint32 {
+ var attempts uint // cvSpinlock retry count
+ var old uint32 = atomic.LoadUint32(w)
+ for (old&test) != 0 || !atomic.CompareAndSwapUint32(w, old, old|set) { // acquire CAS
+ attempts = spinDelay(attempts)
+ old = atomic.LoadUint32(w)
+ }
+ return old
+}
diff --git a/nsync/cv.go b/nsync/cv.go
new file mode 100644
index 0000000..10de132
--- /dev/null
+++ b/nsync/cv.go
@@ -0,0 +1,328 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "sync"
+import "sync/atomic"
+import "time"
+
+// See also the implementation notes at the top of mu.go.
+
+// A CV is a condition variable in the style of Mesa, Java, POSIX, and Go's sync.Cond.
+// It allows a thread to wait for a condition on state protected by a mutex,
+// and to proceed with the mutex held and the condition true.
+//
+// When compared with sync.Cond: (a) CV adds WaitWithDeadline() which allows
+// timeouts and cancellation, (b) the mutex is an explicit argument of the wait
+// calls to remind the reader that they have a side-effect on the mutex, and
+// (c) (as a result of (b)), a zero-valued CV is a valid CV with no enqueued
+// waiters, so there is no need of a call to construct a CV.
+//
+// Usage:
+//
+// After making the desired predicate true, call:
+// cv.Signal() // If at most one thread can make use of the predicate becoming true.
+// or
+// cv.Broadcast() // If multiple threads can make use of the predicate becoming true.
+//
+// To wait for a predicate with no deadline (assuming cv.Broadcast() is called
+// whenever the predicate becomes true):
+// mu.Lock()
+// for !some_predicate_protected_by_mu { // the for-loop is required.
+// cv.Wait(&mu)
+// }
+// // predicate is now true
+// mu.Unlock()
+//
+// To wait for a predicate with a deadline (assuming cv.Broadcast() is called
+// whenever the predicate becomes true):
+// mu.Lock()
+// for !some_predicate_protected_by_mu && cv.WaitWithDeadline(&mu, absDeadline, cancelChan) == nsync.OK {
+// }
+// if some_predicate_protected_by_mu { // predicate is true
+// } else { // predicate is false, and deadline expired, or cancelChan was closed.
+// }
+// mu.Unlock()
+// or, if the predicate is complex and you wish to write it just once and
+// inline, you could use the following instead of the for-loop above:
+// mu.Lock()
+// var predIsTrue bool
+// for outcome := OK; ; outcome = cv.WaitWithDeadline(&mu, absDeadline, cancelChan) {
+// if predIsTrue = some_predicate_protected_by_mu; predIsTrue || outcome != nsync.OK {
+// break
+// }
+// }
+// if predIsTrue { // predicate is true
+// } else { // predicate is false, and deadline expired, or cancelChan was closed.
+// }
+// mu.Unlock()
+//
+// As the examples show, Mesa-style condition variables require that waits use
+// a loop that tests the predicate anew after each wait. It may be surprising
+// that these are preferred over the precise wakeups offered by the condition
+// variables in Hoare monitors. Imprecise wakeups make more efficient use of
+// the critical section, because threads can enter it while a woken thread is
+// still emerging from the scheduler, which may take thousands of cycles.
+// Further, they make the programme easier to read and debug by making the
+// predicate explicit locally at the wait, where the predicate is about to be
+// assumed; the reader does not have to infer the predicate by examining all
+// the places where wakeups may occur.
+type CV struct {
+ word uint32 // see bits below; read and written atomically
+ waiters dll // Head of a doubly-linked list of enqueued waiters; under mu.
+}
+
+// Bits in CV.word
+const (
+ cvSpinlock = 1 << iota // protects waiters
+ cvNonEmpty = 1 << iota // waiters list is non-empty
+)
+
+// Values returned by CV.WaitWithDeadline().
+const (
+ OK = iota // Neither expired nor cancelled.
+ Expired = iota // absDeadline expired.
+ Cancelled = iota // cancelChan was closed.
+)
+
+// WaitWithDeadline() atomically releases "mu" and blocks the calling thread on
+// *cv. It then waits until awakened by a call to Signal() or Broadcast() (or
+// a spurious wakeup), or by the time reaching absDeadline, or by cancelChan
+// being closed. In all cases, it reacquires "mu", and returns the reason for
+// the call returned (OK, Expired, or Cancelled). Use
+// absDeadline==nsync.NoDeadline for no deadline, and cancelChan==nil for no
+// cancellation. WaitWithDeadline() should be used in a loop, as with all
+// Mesa-style condition variables. See examples above.
+//
+// There are two reasons for using an absolute deadline, rather than a relative
+// timeout---these are why pthread_cond_timedwait() also uses an absolute
+// deadline. First, condition variable waits have to be used in a loop; with
+// an absolute times, the deadline does not have to be recomputed on each
+// iteration. Second, in most real programmes, some activity (such as an RPC
+// to a server, or when guaranteeing response time in a UI), there is a
+// deadline imposed by the specification or the caller/user; relative delays
+// can shift arbitrarily with scheduling delays, and so after multiple waits
+// might extend beyond the expected deadline. Relative delays tend to be more
+// convenient mostly in tests and trivial examples than they are in real
+// programmes.
+func (cv *CV) WaitWithDeadline(mu sync.Locker, absDeadline time.Time, cancelChan <-chan struct{}) (outcome int) {
+ var w *waiter = newWaiter()
+ atomic.StoreUint32(&w.waiting, 1)
+ cvMu, _ := mu.(*Mu)
+ w.cvMu = cvMu // If the Locker is an nsync.Mu, record its address, else record nil.
+
+ oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock|cvNonEmpty) // acquire spinlock, set non-empty
+ if (oldWord & cvNonEmpty) == 0 {
+ cv.waiters.MakeEmpty() // initialize the waiter queue if it was empty.
+ }
+ w.q.InsertAfter(&cv.waiters)
+ // Release the spin lock.
+ atomic.StoreUint32(&cv.word, oldWord|cvNonEmpty) // release store
+
+ mu.Unlock() // Release *mu.
+
+ // Prepare a time.Timer for the deadline, if any. We use a time.Timer
+ // pre-allocated for the waiter, to avoid allocating and garbage
+ // collecting one on each wait.
+ var deadlineTimer *time.Timer
+ if absDeadline != NoDeadline {
+ deadlineTimer = w.deadlineTimer
+ if deadlineTimer.Reset(absDeadline.Sub(time.Now())) {
+ // w.deadlineTimer is guaranteed inactive and drained;
+ // see "Stop any active timer" code below.
+ panic("deadlineTimer was active")
+ }
+ }
+
+ // Wait until awoken or a timeout.
+ semOutcome := OK
+ var attempts uint
+ for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+ if semOutcome == OK {
+ semOutcome = w.sem.PWithDeadline(deadlineTimer, cancelChan)
+ }
+ if semOutcome != OK && atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+ // A timeout or cancellation occurred, and no wakeup. Acquire the spinlock, and confirm.
+ oldWord = spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock)
+ // Check that w wasn't removed from the queue after we
+ // checked above, but before we acquired the spinlock.
+ // The call to IsInList() confirms that the waiter *w is still governed
+ // by *cv's spinlock; otherwise, some other thread is about to set w.waiting==0.
+ if atomic.LoadUint32(&w.waiting) != 0 && w.q.IsInList(&cv.waiters) { // still in waiter queue
+ // Not woken, so remove ourselves from queue, and declare a timeout or cancellation.
+ outcome = semOutcome
+ w.q.Remove()
+ atomic.StoreUint32(&w.waiting, 0) // release store
+ if cv.waiters.IsEmpty() {
+ oldWord &^= cvNonEmpty
+ }
+ }
+ // Release spinlock.
+ atomic.StoreUint32(&cv.word, oldWord) // release store
+ if atomic.LoadUint32(&w.waiting) != 0 {
+ attempts = spinDelay(attempts) // so we will ultimately yield to scheduler.
+ }
+ }
+ }
+
+ // Stop any active timer, and drain its channel.
+ if deadlineTimer != nil && semOutcome != Expired && !deadlineTimer.Stop() /*expired*/ {
+ // This receive is synchonous because time.Timer's expire+send
+ // is not atomic: it may send after Stop() returns false! The
+ // "semOutcome != Expired" ensures that the value wasn't
+ // consumed by the PWithDeadline() above.
+ <-deadlineTimer.C
+ }
+
+ if cvMu != nil && w.cvMu == nil { // waiter was transferred to mu's queue, and woken.
+ // Requeue mu using existing waiter struct; current thread is the designated waker.
+ cvMu.lockSlow(w, muDesigWaker)
+ } else {
+ // Traditional case: We've woken from the CV, and need to reacquire mu.
+ freeWaiter(w)
+ mu.Lock()
+ }
+ return outcome
+}
+
+// Signal() wakes at least one thread currently enqueued on *cv.
+func (cv *CV) Signal() {
+ if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
+ var toWakeList *waiter // waiters that we will wake
+ oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
+ if !cv.waiters.IsEmpty() {
+ // Point to first waiter that enqueued itself, and detach it from all others.
+ toWakeList = cv.waiters.prev.elem
+ toWakeList.q.Remove()
+ toWakeList.q.MakeEmpty()
+ if cv.waiters.IsEmpty() {
+ oldWord &^= cvNonEmpty
+ }
+ }
+ // Release spinlock.
+ atomic.StoreUint32(&cv.word, oldWord) // release store
+ if toWakeList != nil {
+ wakeWaiters(toWakeList)
+ }
+ }
+}
+
+// Broadcast() wakes all threads currently enqueued on *cv.
+func (cv *CV) Broadcast() {
+ if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
+ var toWakeList *waiter // waiters that we will wake
+ spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
+ if !cv.waiters.IsEmpty() {
+ // Point to last waiter that enqueued itself, still attached to all other waiters.
+ toWakeList = cv.waiters.next.elem
+ cv.waiters.Remove()
+ cv.waiters.MakeEmpty()
+ }
+ // Release spinlock and mark queue empty.
+ atomic.StoreUint32(&cv.word, 0) // release store
+ if toWakeList != nil {
+ wakeWaiters(toWakeList)
+ }
+ }
+}
+
+// Wait() atomically releases "mu" and blocks the caller on *cv. It waits
+// until it is awakened by a call to Signal() or Broadcast(), or a spurious
+// wakeup. It then reacquires "mu", and returns. It is equivalent to a call
+// to WaitWithDeadline() with absDeadline==NoDeadline, and a nil cancelChan.
+// It should be used in a loop, as with all standard Mesa-style condition
+// variables. See examples above.
+func (cv *CV) Wait(mu sync.Locker) {
+ cv.WaitWithDeadline(mu, NoDeadline, nil)
+}
+
+// ------------------------------------------
+
+// wakeWaiters() wakes the CV waiters in the circular list pointed to by toWakeList,
+// which may not be nil. If the waiter is associated with an nsync.Mu (as
+// opposed to another implementation of sync.Locker), the "wakeup" may consist
+// of transferring the waiters to the nsync.Mu's queue. Requires:
+// - Every element of the list pointed to by toWakeList is a waiter---there is
+// no head/sentinel.
+// - Every waiter is associated with the same mutex.
+func wakeWaiters(toWakeList *waiter) {
+ var firstWaiter *waiter = toWakeList.q.prev.elem
+ var mu *Mu = firstWaiter.cvMu
+ if mu != nil { // waiter is associated with the nsync.Mu *mu.
+ // We will transfer elements of toWakeList to *mu if all of:
+ // - mu's spinlock is not held, and
+ // - either mu is locked, or there's more than one thread on toWakeList, and
+ // - we acquire the spinlock on the first try.
+ // The spinlock acquisition also marks mu as having waiters.
+ var oldMuWord uint32 = atomic.LoadUint32(&mu.word)
+ var locked bool = (oldMuWord & muLock) != 0
+ var setDesigWaker uint32 // set to muDesigWaker if a thread is to be woken rather than transferred
+ if !locked {
+ setDesigWaker = muDesigWaker
+ }
+ if (oldMuWord&muSpinlock) == 0 &&
+ (locked || firstWaiter != toWakeList) &&
+ atomic.CompareAndSwapUint32(&mu.word, oldMuWord, (oldMuWord|muSpinlock|muWaiting|setDesigWaker)) { // acquire CAS
+
+ // Choose which waiters to transfer, and which to wake.
+ toTransferList := toWakeList
+ if locked { // *mu is held; all the threads get transferred.
+ toWakeList = nil
+ } else { // *mu is not held; we transfer all but the first thread, which will be woken.
+ toWakeList = firstWaiter
+ toWakeList.q.Remove()
+ toWakeList.q.MakeEmpty()
+ }
+
+ // Transfer the waiters on toTransferList to *mu's
+ // waiter queue. We've acquired *mu's spinlock. Queue
+ // the threads there instead of waking them.
+ for toTransferList != nil {
+ var toTransfer *waiter = toTransferList.q.prev.elem
+ if toTransfer == toTransferList { // *toTransferList was singleton; *toTransfer is last waiter
+ toTransferList = nil
+ } else {
+ toTransfer.q.Remove()
+ }
+ if toTransfer.cvMu != mu {
+ panic("multiple mutexes used with condition variable")
+ }
+ toTransfer.cvMu = nil // tell WaitWithDeadline() that we moved the waiter to *mu's queue.
+ // toTransfer.waiting is already 1, from being on CV's waiter queue.
+ if (oldMuWord & muWaiting) == 0 { // if there were previously no waiters, initialize.
+ mu.waiters.MakeEmpty()
+ oldMuWord |= muWaiting // so next iteration won't initialize again.
+ }
+ toTransfer.q.InsertAfter(&mu.waiters)
+ }
+
+ // release *mu's spinlock (muWaiting was set by CAS above)
+ oldMuWord = atomic.LoadUint32(&mu.word)
+ for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord&^muSpinlock) { // release CAS
+ oldMuWord = atomic.LoadUint32(&mu.word)
+ }
+ } else { // Set muDesigWaker because at least one thread is to be woken.
+ oldMuWord = atomic.LoadUint32(&mu.word)
+ for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord|muDesigWaker) {
+ oldMuWord = atomic.LoadUint32(&mu.word)
+ }
+ }
+ }
+
+ // Wake any waiters we didn't manage to enqueue on the Mu.
+ for toWakeList != nil {
+ // Take one waiter from the toWakeList.
+ var toWake *waiter = toWakeList.q.prev.elem
+ if toWake == toWakeList { // *toWakeList was a singleton; *toWake is the last waiter
+ toWakeList = nil // tell the loop to exit
+ } else {
+ toWake.q.Remove() // get the waiter out of the list
+ }
+
+ // Wake the waiter.
+ atomic.StoreUint32(&toWake.waiting, 0) // release store
+ toWake.sem.V()
+ }
+}
diff --git a/nsync/cv_test.go b/nsync/cv_test.go
new file mode 100644
index 0000000..325f861
--- /dev/null
+++ b/nsync/cv_test.go
@@ -0,0 +1,256 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "testing"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// ---------------------------
+
+// A queue represents a FIFO queue with up to Limit elements.
+// The storage for the queue expands as necessary up to Limit.
+type queue struct {
+ Limit int // max value of count---should not be changed after initialization
+ nonEmpty nsync.CV // signalled when count transitions from zero to non-zero
+ nonFull nsync.CV // signalled when count transitions from Limit to less than Limit
+ mu nsync.Mu // protects fields below
+ data []interface{} // in use elements are data[pos, ..., (pos+count-1)%len(data)]
+ pos int // index of first in-use element
+ count int // number of elements in use
+}
+
+// Put() adds v to the end of the FIFO *q and returns true, or if the FIFO already
+// has Limit elements and continues to do so until absDeadline, do nothing and
+// return false.
+func (q *queue) Put(v interface{}, absDeadline time.Time) (added bool) {
+ q.mu.Lock()
+ for q.count == q.Limit && q.nonFull.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+ }
+ if q.count != q.Limit {
+ length := len(q.data)
+ i := q.pos + q.count
+ if q.count == length {
+ newLength := length * 2
+ if newLength == 0 {
+ newLength = 16
+ }
+ if q.Limit < newLength {
+ newLength = q.Limit
+ }
+ newData := make([]interface{}, newLength)
+ if i <= length {
+ copy(newData[:], q.data[q.pos:i])
+ } else {
+ n := copy(newData[:], q.data[q.pos:length])
+ copy(newData[n:], q.data[:i-length])
+ }
+ q.pos = 0
+ i = q.count
+ q.data = newData
+ length = newLength
+ }
+ if length <= i {
+ i -= length
+ }
+ q.data[i] = v
+ if q.count == 0 {
+ q.nonEmpty.Broadcast()
+ }
+ q.count++
+ added = true
+ }
+ q.mu.Unlock()
+ return added
+}
+
+// Get() removes the first value from the front of the FIFO *q and returns it
+// and true, or if the FIFO is empty and continues to be so until absDeadline,
+// do nothing and return nil and false.
+func (q *queue) Get(absDeadline time.Time) (v interface{}, ok bool) {
+ q.mu.Lock()
+ for q.count == 0 && q.nonEmpty.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+ }
+ if q.count != 0 {
+ v = q.data[q.pos]
+ q.data[q.pos] = nil
+ if q.count == q.Limit {
+ q.nonFull.Broadcast()
+ }
+ q.pos++
+ q.count--
+ if q.pos == len(q.data) {
+ q.pos = 0
+ }
+ ok = true
+ }
+ q.mu.Unlock()
+ return v, ok
+}
+
+// ---------------------------
+
+// producerN() Put()s count integers on *q, in the sequence start*3, (start+1)*3, (start+2)*3, ....
+func producerN(t *testing.T, q *queue, start int, count int) {
+ for i := 0; i != count; i++ {
+ if !q.Put((start+i)*3, nsync.NoDeadline) {
+ t.Fatalf("queue.Put() returned false with no deadline")
+ }
+ }
+}
+
+// consumerN() Get()s count integers from *q, and checks that they are in the
+// sequence start*3, (start+1)*3, (start+2)*3, ....
+func consumerN(t *testing.T, q *queue, start int, count int) {
+ for i := 0; i != count; i++ {
+ v, ok := q.Get(nsync.NoDeadline)
+ if !ok {
+ t.Fatalf("queue.Get() returned false with no deadline")
+ }
+ x, isInt := v.(int)
+ if !isInt {
+ t.Fatalf("queue.Get() returned non integer value; wanted int %d, got %#v", (start+i)*3, v)
+ }
+ if x != (start+i)*3 {
+ t.Fatalf("queue.Get() returned bad value; want %d, got %d", (start+i)*3, x)
+ }
+ }
+}
+
+// producerConsumerN is the number of elements passed from producer to consumer in the
+// TestCVProducerConsumerX() tests below.
+const producerConsumerN = 300000
+
+// TestCVProducerConsumer0() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**0.
+func TestCVProducerConsumer0(t *testing.T) {
+ q := queue{Limit: 1}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer1() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**1.
+func TestCVProducerConsumer1(t *testing.T) {
+ q := queue{Limit: 10}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer2() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**2.
+func TestCVProducerConsumer2(t *testing.T) {
+ q := queue{Limit: 100}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer3() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**3.
+func TestCVProducerConsumer3(t *testing.T) {
+ q := queue{Limit: 1000}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer4() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**4.
+func TestCVProducerConsumer4(t *testing.T) {
+ q := queue{Limit: 10000}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer5() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**5.
+func TestCVProducerConsumer5(t *testing.T) {
+ q := queue{Limit: 100000}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer6() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**6.
+func TestCVProducerConsumer6(t *testing.T) {
+ q := queue{Limit: 1000000}
+ go producerN(t, &q, 0, producerConsumerN)
+ consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVDeadline() checks timeouts on a CV WaitWithDeadline().
+func TestCVDeadline(t *testing.T) {
+ var mu nsync.Mu
+ var cv nsync.CV
+
+ // The following two values control how aggressively we police the timeout.
+ var tooEarly time.Duration = 1 * time.Millisecond
+ var tooLate time.Duration = 35 * time.Millisecond // longer, to accommodate scheduling delays
+
+ mu.Lock()
+ for i := 0; i != 50; i++ {
+ startTime := time.Now()
+ expectedEndTime := startTime.Add(87 * time.Millisecond)
+ if cv.WaitWithDeadline(&mu, expectedEndTime, nil) != nsync.Expired {
+ t.Fatalf("cv.Wait() returns non-Expired for a timeout")
+ }
+ endTime := time.Now()
+ if endTime.Before(expectedEndTime.Add(-tooEarly)) {
+ t.Errorf("cvWait() returned %v too early", expectedEndTime.Sub(endTime))
+ }
+ if endTime.After(expectedEndTime.Add(tooLate)) {
+ t.Errorf("cvWait() returned %v too late", endTime.Sub(expectedEndTime))
+ }
+ }
+ mu.Unlock()
+}
+
+// TestCVCancel() checks cancellations on a CV WaitWithDeadline().
+func TestCVCancel(t *testing.T) {
+ var mu nsync.Mu
+ var cv nsync.CV
+
+ // The loops below cancel after 87 milliseconds, like the timeout tests above.
+
+ // The following two values control how aggressively we police the timeout.
+ var tooEarly time.Duration = 1 * time.Millisecond
+ var tooLate time.Duration = 35 * time.Millisecond // longer, to accommodate scheduling delays
+
+ var futureTime time.Time = time.Now().Add(1 * time.Hour) // a future time, to test cancels with pending timeout
+
+ mu.Lock()
+ for i := 0; i != 50; i++ {
+ startTime := time.Now()
+ expectedEndTime := startTime.Add(87 * time.Millisecond)
+
+ cancel := make(chan struct{})
+ time.AfterFunc(87*time.Millisecond, func() { close(cancel) })
+
+ if cv.WaitWithDeadline(&mu, futureTime, cancel) != nsync.Cancelled {
+ t.Fatalf("cv.Wait() return non-Cancelled for a cancellation")
+ }
+ endTime := time.Now()
+ if endTime.Before(expectedEndTime.Add(-tooEarly)) {
+ t.Errorf("cvWait() returned %v too early", expectedEndTime.Sub(endTime))
+ }
+ if endTime.After(expectedEndTime.Add(tooLate)) {
+ t.Errorf("cvWait() returned %v too late", endTime.Sub(expectedEndTime))
+ }
+
+ // Check that an already cancelled wait returns immediately.
+ startTime = time.Now()
+ if cv.WaitWithDeadline(&mu, nsync.NoDeadline, cancel) != nsync.Cancelled {
+ t.Fatalf("cv.Wait() returns non-Cancelled for a cancellation")
+ }
+ endTime = time.Now()
+ if endTime.Before(startTime) {
+ t.Errorf("cvWait() returned %v too early", endTime.Sub(startTime))
+ }
+ if endTime.After(startTime.Add(tooLate)) {
+ t.Errorf("cvWait() returned %v too late", endTime.Sub(startTime))
+ }
+ }
+ mu.Unlock()
+}
diff --git a/nsync/cv_wait_example_test.go b/nsync/cv_wait_example_test.go
new file mode 100644
index 0000000..005871f
--- /dev/null
+++ b/nsync/cv_wait_example_test.go
@@ -0,0 +1,114 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Example use of Mu.Wait(): A priority queue of strings whose
+// RemoveWithDeadline() operation has a deadline.
+
+package nsync_test
+
+import "container/heap"
+import "fmt"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// ---------------------------------------
+
+// A priQueue implements heap.Interface and holds strings.
+type priQueue []string
+
+func (pq priQueue) Len() int { return len(pq) }
+func (pq priQueue) Less(i int, j int) bool { return pq[i] < pq[j] }
+func (pq priQueue) Swap(i int, j int) { pq[i], pq[j] = pq[j], pq[i] }
+func (pq *priQueue) Push(x interface{}) { *pq = append(*pq, x.(string)) }
+func (pq *priQueue) Pop() interface{} {
+ old := *pq
+ n := len(old)
+ s := old[n-1]
+ *pq = old[0 : n-1]
+ return s
+}
+
+// ---------------------------------------
+
+// A StringPriorityQueue is a priority queue of strings, which emits the
+// lexicographically least string available.
+type StringPriorityQueue struct {
+ nonEmpty nsync.CV // signalled when heap becomes non-empty
+ mu nsync.Mu // protects priQueue
+ heap priQueue
+}
+
+// Add() adds "s" to the queue *q.
+func (q *StringPriorityQueue) Add(s string) {
+ q.mu.Lock()
+ if q.heap.Len() == 0 {
+ q.nonEmpty.Broadcast()
+ }
+ heap.Push(&q.heap, s)
+ q.mu.Unlock()
+}
+
+// RemoveWithDeadline() waits until queue *q is non-empty, then removes a string from its
+// beginning, and returns it with true; or if absDeadline is reached before the
+// queue becomes non-empty, returns the empty string and false.
+func (q *StringPriorityQueue) RemoveWithDeadline(absDeadline time.Time) (s string, ok bool) {
+ q.mu.Lock()
+ for q.heap.Len() == 0 && q.nonEmpty.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+ }
+ if q.heap.Len() != 0 {
+ s = heap.Pop(&q.heap).(string)
+ ok = true
+ }
+ q.mu.Unlock()
+ return s, ok
+}
+
+// ---------------------------------------
+
+// addAndWait() adds strings s[0, ...] to *q, with the specified delay between additions.
+func addAndWait(q *StringPriorityQueue, delay time.Duration, s ...string) {
+ for i := range s {
+ q.Add(s[i])
+ time.Sleep(delay)
+ }
+}
+
+// removeAndPrint() removes the first item from *q and outputs it on stdout,
+// or outputs "timeout: <delay>" if no value can be found before "delay" elapses.
+func removeAndPrint(q *StringPriorityQueue, delay time.Duration) {
+ if s, ok := q.RemoveWithDeadline(time.Now().Add(delay)); ok {
+ fmt.Printf("%s\n", s)
+ } else {
+ fmt.Printf("timeout %v\n", delay)
+ }
+}
+
+// ExampleMuWait() demonstrates the use of nsync.Mu's Wait() via a priority queue of strings.
+// See the routine RemoveWithDeadline(), above.
+func ExampleCVWait() {
+ var q StringPriorityQueue
+
+ go addAndWait(&q, 500*time.Millisecond, "one", "two", "three", "four", "five")
+
+ time.Sleep(1100 * time.Millisecond) // delay while "one", "two" and "three" are queued, but not yet "four"
+
+ removeAndPrint(&q, 1*time.Second) // should get "one"
+ removeAndPrint(&q, 1*time.Second) // should get "three" (it's lexicographically less than "two")
+ removeAndPrint(&q, 1*time.Second) // should get "two"
+ removeAndPrint(&q, 100*time.Millisecond) // should time out because 1.1 < 0.5*3
+ removeAndPrint(&q, 1*time.Second) // should get "four"
+ removeAndPrint(&q, 100*time.Millisecond) // should time out because 0.1 < 0.5
+ removeAndPrint(&q, 1*time.Second) // should get "five"
+ removeAndPrint(&q, 1*time.Second) // should time out because there's no more to fetch
+ // Output:
+ // one
+ // three
+ // two
+ // timeout 100ms
+ // four
+ // timeout 100ms
+ // five
+ // timeout 1s
+}
diff --git a/nsync/mu.go b/nsync/mu.go
new file mode 100644
index 0000000..6f419c1
--- /dev/null
+++ b/nsync/mu.go
@@ -0,0 +1,215 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The nsync package provides a mutex Mu and a Mesa-style condition variable CV.
+//
+// The nsync primitives differ from those in sync in that nsync provides
+// timed/cancellable wait on CV, and try-lock on Mu; CV's wait primitives take
+// the mutex as an explicit argument to remind the reader that they have a side
+// effect on the mutex; the zero value CV can be used without further
+// initialization; and Mu forbids a lock acquired by one thread to be released
+// by another.
+//
+// As well as Mu and CV being usable with one another, an nsync,Mu can be used
+// with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.
+package nsync
+
+import "sync/atomic"
+
+// Implementation notes
+//
+// The implementation of Mu and CV both use spinlocks to protect their waiter
+// queues. The spinlocks are implemented with atomic operations and a delay
+// loop found in common.go. They could use sync.Mutex, but I wished to have an
+// implementation independent of the sync package (except for the trivial use
+// of the sync.Locker interface.
+//
+// Mu and CV use the same type of doubly-linked list of waiters (see
+// waiter.go). This allows waiters to be transferred from the CV queue to the
+// Mu queue when a thread is logically woken from the CV but would immediately
+// go to sleep on the Mu. See the wakeWaiters() call in cv.go.
+//
+// In Mu, the "designated waker" is a thread that was waiting on Mu, has been
+// woken up, but as yet has neither acquired nor gone back to waiting. The
+// presence of such a thread is indicated by the muDesigWaker bit in the Mu
+// word. This bit allows the Unlock() code to avoid waking a second waiter
+// when there's already one will wake the next thread when the time comes.
+// This speeds things up when the lock is heavily contended, and the critical
+// sections are small.
+//
+// The weasel words "with high probability" in the specification of TryLock()
+// prevent clients from believing that they can determine with certainty
+// whether another thread has given up a lock yet. This, together with the
+// requirement that a thread that acquired a mutex must release it (rather than
+// it being released by another thread), prohibits clients from using Mu as a
+// sort of semaphore. The intent is that it be used only for traditional
+// mutual exclusion. This leaves room for certain future optimizations, and
+// make it easier to apply detection of potential races via candidate lock-set
+// algorithms, should that ever be desired.
+//
+// The condition variable has a wait with an absolute rather than a relative
+// timeout. This is less error prone, as described in the comment on
+// CV.WaitWithDeadline(). Alas, relative timeouts are seductive in trivial
+// examples (such as tests). These are the first things that people try, so
+// they are likely to be requested. If enough people complain we could give
+// them that particular piece of rope.
+//
+// The condition variable uses an internal binary semaphore that is passed a
+// *time.Timer stored in a *waiter for its expirations. It is as invariant
+// that the timer is inactive and its channel drained when the waiter is on the
+// waiter free list.
+
+// A Mu is a mutex. Its zero value is valid, and unlocked.
+// It is similar to sync.Mutex, but implements TryLock().
+//
+// A Mu can be "free", or held by a single thread (aka goroutine). A thread
+// that acquires it should eventually release it. It is not legal to acquire a
+// Mu in one thread and release it in another.
+//
+// Example usage, where p.mu is an nsync.Mu protecting the invariant p.a+p.b==0
+// p.mu.Lock()
+// // The current thread now has exclusive access to p.a and p.b; invariant assumed true.
+// p.a++
+// p.b-- // restore invariant p.a+p.b==0 before releasing p.mu
+// p.mu.Unlock()
+type Mu struct {
+ word uint32 // bits: see below
+ waiters dll // Head of a doubly-linked list of enqueued waiters; under mu.
+}
+
+// Bits in Mu.word
+const (
+ muLock = 1 << iota // lock is held.
+ muSpinlock = 1 << iota // spinlock is held (protects waiters).
+ muWaiting = 1 << iota // waiter list is non-empty.
+ muDesigWaker = 1 << iota // a former waiter has been woken, and has not yet acquired or slept once more.
+)
+
+// TryLock() attempts to acquire *mu without blocking, and returns whether it is successful.
+// It returns true with high probability if *mu was free on entry.
+func (mu *Mu) TryLock() bool {
+ if atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
+ return true
+ }
+ oldWord := atomic.LoadUint32(&mu.word)
+ return (oldWord&muLock) == 0 && atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) // acquire CAS
+}
+
+// Lock() blocks until *mu is free and then acquires it.
+func (mu *Mu) Lock() {
+ if !atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
+ oldWord := atomic.LoadUint32(&mu.word)
+ if (oldWord&muLock) != 0 || !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) { // acquire CAS
+ mu.lockSlow(newWaiter(), 0)
+ }
+ }
+}
+
+// lockSlow() locks *mu, waiting on *w if it needs to wait.
+// "clear" should be zero if the thread has not previously slept on *mu, and
+// muDesigWaker if it has; this represents bits that lockSlow() must clear when
+// it either acquires or sleeps on *mu.
+func (mu *Mu) lockSlow(w *waiter, clear uint32) {
+ var attempts uint // attempt count; used for spinloop backoff
+ w.cvMu = nil // not a CV wait
+ for {
+ oldWord := atomic.LoadUint32(&mu.word)
+ if (oldWord & muLock) == 0 {
+ // lock is not held; try to acquire, possibly clearing muDesigWaker
+ if atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muLock)&^clear) { // acquire CAS
+ freeWaiter(w)
+ return
+ }
+ } else if (oldWord&muSpinlock) == 0 &&
+ atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muSpinlock|muWaiting)&^clear) { // acquire CAS
+
+ // Spinlock is now held, and lock is held by someone
+ // else; muWaiting has also been set; queue ourselves.
+ atomic.StoreUint32(&w.waiting, 1)
+ if (oldWord & muWaiting) == 0 { // init the waiter list if it's empty.
+ mu.waiters.MakeEmpty()
+ }
+ w.q.InsertAfter(&mu.waiters)
+
+ // Release spinlock. Cannot use a store here, because
+ // the current thread does not hold the mutex. If
+ // another thread were a designated waker, the mutex
+ // holder could be concurrently unlocking, even though
+ // we hold the spinlock.
+ oldWord = atomic.LoadUint32(&mu.word)
+ for !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord&^muSpinlock) { // release CAS
+ oldWord = atomic.LoadUint32(&mu.word)
+ }
+
+ // Wait until awoken.
+ for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+ w.sem.P()
+ }
+
+ attempts = 0
+ clear = muDesigWaker
+ }
+ attempts = spinDelay(attempts)
+ }
+}
+
+// Unlock() unlocks *mu, and wakes a waiter if there is one.
+func (mu *Mu) Unlock() {
+ // Go is a garbage-collected language, so it's legal (and slightly
+ // faster on x86) to release with an atomic add before checking for
+ // waiters. Without GC, this would not work because another thread
+ // could acquire, decrement a reference count and deallocate the mutex
+ // before the current thread touched the mutex word again to wake
+ // waiters.
+ newWord := atomic.AddUint32(&mu.word, ^uint32(muLock-1))
+ if (newWord&(muLock|muWaiting)) == 0 || (newWord&(muLock|muDesigWaker)) == muDesigWaker {
+ return
+ }
+
+ if (newWord & muLock) != 0 {
+ panic("attempt to Unlock a free nsync.Mu")
+ }
+
+ var attempts uint // attempt count; used for backoff
+ for {
+ oldWord := atomic.LoadUint32(&mu.word)
+ if (oldWord&muWaiting) == 0 || (oldWord&muDesigWaker) == muDesigWaker {
+ return // no one to wake, or there's already a designated waker waking up.
+ } else if (oldWord&muSpinlock) == 0 &&
+ atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muSpinlock|muDesigWaker) { // acquire CAS
+ // The spinlock is now held, and we've set the designated wake flag, since
+ // we're likely to wake a thread that will become that designated waker.
+
+ if mu.waiters.elem != nil {
+ panic("non-nil mu.waiters.dll.elem")
+ }
+
+ // Remove a waiter from the queue, if possible.
+ var wake *waiter = mu.waiters.prev.elem
+ var clearOnRelease uint32 = muSpinlock
+ if wake != nil {
+ wake.q.Remove()
+ } else {
+ clearOnRelease |= muDesigWaker // not waking a waiter => no designated waker
+ }
+ if mu.waiters.IsEmpty() {
+ clearOnRelease |= muWaiting // no waiters left
+ }
+ // Unset the spinlock bit; set the designated wake
+ // bit---the thread we're waking is the new designated
+ // waker. Cannot use a store here because we no longer
+ // hold the main lock.
+ oldWord = atomic.LoadUint32(&mu.word)
+ for !atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muDesigWaker)&^clearOnRelease) { // release CAS
+ oldWord = atomic.LoadUint32(&mu.word)
+ }
+ if wake != nil { // Wake the waiter
+ atomic.StoreUint32(&wake.waiting, 0) // release store
+ wake.sem.V()
+ }
+ return
+ }
+ attempts = spinDelay(attempts)
+ }
+}
diff --git a/nsync/mu_test.go b/nsync/mu_test.go
new file mode 100644
index 0000000..d80b841
--- /dev/null
+++ b/nsync/mu_test.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "runtime"
+import "sync"
+import "testing"
+
+import "v.io/x/lib/nsync"
+
+// A testData is the state shared between the threads in each of the tests below.
+type testData struct {
+ nThreads int // Number of test threads; constant after init.
+ loopCount int // Iteration count for each test thread; constant after init.
+
+ mu nsync.Mu // Protects i, id, and finishedThreads.
+ i int // Counter incremented by test loops.
+ id int // id of current lock-holding thread in some tests.
+
+ mutex sync.Mutex // Protects i and id when in countingLoopMutex.
+
+ done nsync.CV // Signalled when finishedThread==nThreads.
+ finishedThreads int // Count of threads that have finished.
+}
+
+// threadFinished() indicates that a thread has finished its operations on testData
+// by incrementing td.finishedThreads, and signalling td.done when it reaches td.nThreads.
+// See waitForAllThreads().
+// We could use sync.WaitGroup here, but this code exercises nsync more.
+func (td *testData) threadFinished() {
+ td.mu.Lock()
+ td.finishedThreads++
+ if td.finishedThreads == td.nThreads {
+ td.done.Broadcast()
+ }
+ td.mu.Unlock()
+}
+
+// waitForAllThreads() waits until all td.nThreads have called threadFinished(),
+// and then returns.
+// We could use sync.WaitGroup here, but this code exercises nsync more.
+func (td *testData) waitForAllThreads() {
+ td.mu.Lock()
+ for td.finishedThreads != td.nThreads {
+ td.done.Wait(&td.mu)
+ }
+ td.mu.Unlock()
+}
+
+// ---------------------------------------
+
+// countingLoopMu() is the body of each thread executed by TestMuNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread.
+func countingLoopMu(td *testData, id int) {
+ var n int = td.loopCount
+ for i := 0; i != n; i++ {
+ td.mu.Lock()
+ td.id = id
+ td.i++
+ if td.id != id {
+ panic("td.id != id")
+ }
+ td.mu.Unlock()
+ }
+ td.threadFinished()
+}
+
+// TestMuNThread creates a few threads, each of which increment an
+// integer a fixed number of times, using an nsync.Mu for mutual exclusion.
+// It checks that the integer is incremented the correct number of times.
+func TestMuNThread(t *testing.T) {
+ td := testData{nThreads: 5, loopCount: 1000000}
+ for i := 0; i != td.nThreads; i++ {
+ go countingLoopMu(&td, i)
+ }
+ td.waitForAllThreads()
+ if td.i != td.nThreads*td.loopCount {
+ t.Fatalf("TestMuNThread final count inconsistent: want %d, got %d",
+ td.nThreads*td.loopCount, td.i)
+ }
+}
+
+// ---------------------------------------
+
+// countingLoopMutex() is the body of each thread executed by TestMutexNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread, here protected by a sync.Mutex.
+func countingLoopMutex(td *testData, id int) {
+ var n int = td.loopCount
+ for i := 0; i != n; i++ {
+ td.mutex.Lock()
+ td.id = id
+ td.i++
+ if td.id != id {
+ panic("td.id != id")
+ }
+ td.mutex.Unlock()
+ }
+ td.threadFinished()
+}
+
+// TestMutexNThread creates a few threads, each of which increment an
+// integer a fixed number of times, using a sync.Mutex for mutual exclusion.
+// It checks that the integer is incremented the correct number of times.
+func TestMutexNThread(t *testing.T) {
+ td := testData{nThreads: 5, loopCount: 1000000}
+ for i := 0; i != td.nThreads; i++ {
+ go countingLoopMutex(&td, i)
+ }
+ td.waitForAllThreads()
+ if td.i != td.nThreads*td.loopCount {
+ t.Fatalf("TestMutexNThread final count inconsistent: want %d, got %d",
+ td.nThreads*td.loopCount, td.i)
+ }
+}
+
+// ---------------------------------------
+
+// countingLoopTryMu() is the body of each thread executed by TestTryMuNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread.
+func countingLoopTryMu(td *testData, id int) {
+ var n int = td.loopCount
+ for i := 0; i != n; i++ {
+ for !td.mu.TryLock() {
+ runtime.Gosched()
+ }
+ td.id = id
+ td.i++
+ if td.id != id {
+ panic("td.id != id")
+ }
+ td.mu.Unlock()
+ }
+ td.threadFinished()
+}
+
+// TestTryMuNThread() tests that acquiring an nsync.Mu with TryLock()
+// using several threads provides mutual exclusion.
+func TestTryMuNThread(t *testing.T) {
+ td := testData{nThreads: 5, loopCount: 100000}
+ for i := 0; i != td.nThreads; i++ {
+ go countingLoopTryMu(&td, i)
+ }
+ td.waitForAllThreads()
+ if td.i != td.nThreads*td.loopCount {
+ t.Fatalf("TestTryMuNThread final count inconsistent: want %d, got %d",
+ td.nThreads*td.loopCount, td.i)
+ }
+}
+
+// ---------------------------------------
+
+// BenchmarkMuUncontended() measures the performance of an uncontended nsync.Mu.
+func BenchmarkMuUncontended(b *testing.B) {
+ var mu nsync.Mu
+ for i := 0; i != b.N; i++ {
+ mu.Lock()
+ mu.Unlock()
+ }
+}
+
+// BenchmarkMutexUncontended() measures the performance of an uncontended sync.Mutex.
+func BenchmarkMutexUncontended(b *testing.B) {
+ var mu sync.Mutex
+ for i := 0; i != b.N; i++ {
+ mu.Lock()
+ mu.Unlock()
+ }
+}
diff --git a/nsync/pingpong_test.go b/nsync/pingpong_test.go
new file mode 100644
index 0000000..cfb314f
--- /dev/null
+++ b/nsync/pingpong_test.go
@@ -0,0 +1,150 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "sync"
+import "testing"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// The benchmarks in this file use various mechanisms to
+// ping-pong back and forth between two threads as they count i from
+// 0 to limit.
+// The data structure contains multiple synchronization primitives,
+// but each benchmark uses only those it needs.
+//
+// The setting of GOMAXPROCS, and the exact choices of the thread scheduler can
+// have great effect on the timings.
+type pingPong struct {
+ mu nsync.Mu
+ cv [2]nsync.CV
+
+ mutex sync.Mutex
+ cond [2]*sync.Cond
+
+ i int
+ limit int
+}
+
+// ---------------------------------------
+
+// mutexCVPingPong() is run by each thread in BenchmarkPingPongMutexCV().
+func (pp *pingPong) mutexCVPingPong(parity int) {
+ pp.mutex.Lock()
+ for pp.i < pp.limit {
+ for (pp.i & 1) == parity {
+ pp.cv[parity].Wait(&pp.mutex)
+ }
+ pp.i++
+ pp.cv[1-parity].Signal()
+ }
+ pp.mutex.Unlock()
+}
+
+// BenchmarkPingPongMutexCV() measures the wakeup speed of
+// sync.Mutex/nsync.CV used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMutexCV(b *testing.B) {
+ pp := pingPong{limit: b.N}
+ go pp.mutexCVPingPong(0)
+ pp.mutexCVPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCVPingPong() is run by each thread in BenchmarkPingPongMuCV().
+func (pp *pingPong) muCVPingPong(parity int) {
+ pp.mu.Lock()
+ for pp.i < pp.limit {
+ for (pp.i & 1) == parity {
+ pp.cv[parity].Wait(&pp.mu)
+ }
+ pp.i++
+ pp.cv[1-parity].Signal()
+ }
+ pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCV() measures the wakeup speed of nsync.Mu/nsync.CV used to
+// ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCV(b *testing.B) {
+ pp := pingPong{limit: b.N}
+ go pp.muCVPingPong(0)
+ pp.muCVPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCVUnexpiredDeadlinePingPong() is run by each thread in BenchmarkPingPongMuCVUnexpiredDeadline().
+func (pp *pingPong) muCVUnexpiredDeadlinePingPong(parity int) {
+ var deadlineIn1Hour time.Time = time.Now().Add(1 * time.Hour)
+ pp.mu.Lock()
+ for pp.i < pp.limit {
+ for (pp.i & 1) == parity {
+ pp.cv[parity].WaitWithDeadline(&pp.mu, deadlineIn1Hour, nil)
+ }
+ pp.i++
+ pp.cv[1-parity].Signal()
+ }
+ pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCVUnexpiredDeadline() measures the wakeup speed of nsync.Mu/nsync.CV used to
+// ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCVUnexpiredDeadline(b *testing.B) {
+ pp := pingPong{limit: b.N}
+ go pp.muCVUnexpiredDeadlinePingPong(0)
+ pp.muCVUnexpiredDeadlinePingPong(1)
+}
+
+// ---------------------------------------
+
+// mutexCondPingPong() is run by each thread in BenchmarkPingPongMutexCond().
+func (pp *pingPong) mutexCondPingPong(parity int) {
+ pp.mutex.Lock()
+ for pp.i < pp.limit {
+ for (pp.i & 1) == parity {
+ pp.cond[parity].Wait()
+ }
+ pp.i++
+ pp.cond[1-parity].Signal()
+ }
+ pp.mutex.Unlock()
+}
+
+// BenchmarkPingPongMutexCond() measures the wakeup speed of
+// sync.Mutex/sync.Cond used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMutexCond(b *testing.B) {
+ pp := pingPong{limit: b.N}
+ pp.cond[0] = sync.NewCond(&pp.mutex)
+ pp.cond[1] = sync.NewCond(&pp.mutex)
+ go pp.mutexCondPingPong(0)
+ pp.mutexCondPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCondPingPong() is run by each thread in BenchmarkPingPongMuCond().
+func (pp *pingPong) muCondPingPong(parity int) {
+ pp.mu.Lock()
+ for pp.i < pp.limit {
+ for (pp.i & 1) == parity {
+ pp.cond[parity].Wait()
+ }
+ pp.i++
+ pp.cond[1-parity].Signal()
+ }
+ pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCond() measures the wakeup speed of nsync.Mutex/sync.Cond
+// used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCond(b *testing.B) {
+ pp := pingPong{limit: b.N}
+ pp.cond[0] = sync.NewCond(&pp.mu)
+ pp.cond[1] = sync.NewCond(&pp.mu)
+ go pp.muCondPingPong(0)
+ pp.muCondPingPong(1)
+}
diff --git a/nsync/waiter.go b/nsync/waiter.go
new file mode 100644
index 0000000..3625ecc
--- /dev/null
+++ b/nsync/waiter.go
@@ -0,0 +1,117 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "math"
+import "sync/atomic"
+import "time"
+
+// --------------------------------
+
+// A dll represents a doubly-linked list of waiters.
+type dll struct {
+ next *dll
+ prev *dll
+ elem *waiter // points to the waiter struct this dll struct is embedded in, or nil if none.
+}
+
+// MakeEmpty() makes list *l empty.
+// Requires that *l is currently not part of a non-empty list.
+func (l *dll) MakeEmpty() {
+ l.next = l
+ l.prev = l
+}
+
+// IsEmpty() returns whether list *l is empty.
+// Requires that *l is currently part of a list, or the zero dll element.
+func (l *dll) IsEmpty() bool {
+ return l.next == l
+}
+
+// InsertAfter() inserts element *e into the list after position *p.
+// Requires that *e is currently not part of a list and that *p is part of a list.
+func (e *dll) InsertAfter(p *dll) {
+ e.next = p.next
+ e.prev = p
+ e.next.prev = e
+ e.prev.next = e
+}
+
+// Remove() removes *e from the list it is currently in.
+// Requires that *e is currently part of a list.
+func (e *dll) Remove() {
+ e.next.prev = e.prev
+ e.prev.next = e.next
+}
+
+// IsInList() returns whether element e can be found in list l.
+func (e *dll) IsInList(l *dll) bool {
+ p := l.next
+ for p != e && p != l {
+ p = p.next
+ }
+ return p == e
+}
+
+// --------------------------------
+
+// A waiter represents a single waiter on a CV or a Mu.
+//
+// To wait:
+// Allocate a waiter struct *w with newWaiter(), set w.waiting=1, and
+// w.cvMu=nil or to the associated Mu if waiting on a condition variable, then
+// queue w.dll on some queue, and then wait using:
+// for atomic.LoadUint32(&w.waiting) != 0 { w.sem.P() }
+// Return *w to the freepool by calling freeWaiter(w).
+//
+// To wakeup:
+// Remove *w from the relevant queue then:
+// atomic.Store(&w.waiting, 0)
+// w.sem.V()
+type waiter struct {
+ q dll // Doubly-linked list element.
+ sem binarySemaphore // Thread waits on this semaphore.
+ deadlineTimer *time.Timer // Used for waits with deadlines.
+
+ // If this waiter is waiting on a CV associated with a Mu, mu is a
+ // pointer to that Mu, otherwise nil
+ cvMu *Mu
+
+ // non-zero <=> the waiter is waiting (read and written atomically)
+ waiting uint32
+}
+
+var freeWaiters dll // freeWaiters is a doubly-linked list of free waiter structs.
+var freeWaitersMu uint32 // spinlock protects freeWaiters
+
+// newWaiter() returns a pointer to an unused waiter struct.
+// Ensures that the enclosed timer is stopped and its channel drained.
+func newWaiter() (w *waiter) {
+ spinTestAndSet(&freeWaitersMu, 1, 1)
+ if freeWaiters.next == nil { // first time through, initialize the free list.
+ freeWaiters.MakeEmpty()
+ }
+ if !freeWaiters.IsEmpty() { // If free list is non-empty, dequeue an item.
+ var q *dll = freeWaiters.next
+ q.Remove()
+ w = q.elem
+ }
+ atomic.StoreUint32(&freeWaitersMu, 0) // release store
+ if w == nil { // If free list was empty, allocate an item.
+ w = new(waiter)
+ w.sem.Init()
+ w.deadlineTimer = time.NewTimer(time.Duration(math.MaxInt64))
+ w.deadlineTimer.Stop()
+ w.q.elem = w
+ }
+ return w
+}
+
+// freeWaiter() returns an unused waiter struct *w to the free pool.
+func freeWaiter(w *waiter) {
+ spinTestAndSet(&freeWaitersMu, 1, 1)
+ w.q.InsertAfter(&freeWaiters)
+ atomic.StoreUint32(&freeWaitersMu, 0) // release store
+}