v.io/x/lib/nsync: An implementation of mutex and condition variables

The nsync package provides a mutex Mu and a Mesa-style condition
variable CV.

The nsync primitives differ from those in sync in that nsync provides
timed wait on CV and try-lock on Mu, CV's wait primitives take the mutex
as an explicit argument to remind the reader that they have a side
effect on the mutex, the zero value CV can be used without further
initialization, and Mu forbids a lock acquired by one thread to be
released by another.

As well as Mu and CV being usable with one another, an nsync,Mu can be
used with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.

Change-Id: Id6a14ebcf007bd4cb33dcc7b138d35f61db9546a
diff --git a/nsync/.api b/nsync/.api
new file mode 100644
index 0000000..9d10a53
--- /dev/null
+++ b/nsync/.api
@@ -0,0 +1,13 @@
+pkg nsync, const Cancelled ideal-int
+pkg nsync, const Expired ideal-int
+pkg nsync, const OK ideal-int
+pkg nsync, method (*CV) Broadcast()
+pkg nsync, method (*CV) Signal()
+pkg nsync, method (*CV) Wait(sync.Locker)
+pkg nsync, method (*CV) WaitWithDeadline(sync.Locker, time.Time, <-chan struct{}) int
+pkg nsync, method (*Mu) Lock()
+pkg nsync, method (*Mu) TryLock() bool
+pkg nsync, method (*Mu) Unlock()
+pkg nsync, type CV struct
+pkg nsync, type Mu struct
+pkg nsync, var NoDeadline time.Time
diff --git a/nsync/binary_semaphore.go b/nsync/binary_semaphore.go
new file mode 100644
index 0000000..15d055b
--- /dev/null
+++ b/nsync/binary_semaphore.go
@@ -0,0 +1,58 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "time"
+
+// A binarySemaphore is a binary semaphore; it can have values 0 and 1.
+type binarySemaphore struct {
+	ch chan struct{}
+}
+
+// Init() initializes binarySemaphore *s; the initial value is 0.
+func (s *binarySemaphore) Init() {
+	s.ch = make(chan struct{}, 1)
+}
+
+// P() waits until the count of semaphore *s is 1 and decrements the
+// count to 0.
+func (s *binarySemaphore) P() {
+	<-s.ch
+}
+
+// PWithDeadline() waits until one of:
+// the count of semaphore *s is 1, in which case the semaphore is decremented to 0, then OK is returned;
+// or deadlineTimer!=nil and *deadlineTimer expires, then Expired is returned;
+// or cancelChan != nil and cancelChan becomes readable or closed, then Cancelled is returned.
+// The channel "v.io/v23/context".T.Done() is a suitable cancelChan.
+func (s *binarySemaphore) PWithDeadline(deadlineTimer *time.Timer, cancelChan <-chan struct{}) (res int) {
+	var deadlineChan <-chan time.Time
+	if deadlineTimer != nil {
+		deadlineChan = deadlineTimer.C
+	}
+	// Avoid select if possible---it's slow.
+	if deadlineTimer != nil || cancelChan != nil {
+		select {
+		case <-s.ch:
+			res = OK
+		case <-deadlineChan:
+			res = Expired
+		case <-cancelChan:
+			res = Cancelled
+		}
+	} else {
+		<-s.ch
+		res = OK
+	}
+	return res
+}
+
+// V() ensures that the semaphore count of *s is 1.
+func (s *binarySemaphore) V() {
+	select {
+	case s.ch <- struct{}{}:
+	default: // Don't block if the semaphore count is already 1.
+	}
+}
diff --git a/nsync/common.go b/nsync/common.go
new file mode 100644
index 0000000..251ba94
--- /dev/null
+++ b/nsync/common.go
@@ -0,0 +1,49 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "math"
+import "runtime"
+import "sync/atomic"
+import "time"
+
+// NoDeadline represents a time in the far future---a deadline that will not expire.
+var NoDeadline time.Time
+
+// init() initializes the variable NoDeadline.
+// If done inline, the godoc output is even more ugly.
+func init() {
+	NoDeadline = time.Now().Add(time.Duration(math.MaxInt64)).Add(time.Duration(math.MaxInt64))
+}
+
+// spinDelay() is used in spinloops to delay resumption of the loop.
+// Usage:
+//     var attempts uint
+//     for try_something {
+//        attempts = spinDelay(attempts)
+//     }
+func spinDelay(attempts uint) uint {
+	if attempts < 7 {
+		for i := 0; i != 1<<attempts; i++ {
+		}
+		attempts++
+	} else {
+		runtime.Gosched()
+	}
+	return attempts
+}
+
+// spinTestAndSet() spins until (*w & test) == 0.  It then atomically performs
+// *w |= set and returns the previous value of *w.  It performs an acquire
+// barrier.
+func spinTestAndSet(w *uint32, test uint32, set uint32) uint32 {
+	var attempts uint // cvSpinlock retry count
+	var old uint32 = atomic.LoadUint32(w)
+	for (old&test) != 0 || !atomic.CompareAndSwapUint32(w, old, old|set) { // acquire CAS
+		attempts = spinDelay(attempts)
+		old = atomic.LoadUint32(w)
+	}
+	return old
+}
diff --git a/nsync/cv.go b/nsync/cv.go
new file mode 100644
index 0000000..10de132
--- /dev/null
+++ b/nsync/cv.go
@@ -0,0 +1,328 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "sync"
+import "sync/atomic"
+import "time"
+
+// See also the implementation notes at the top of mu.go.
+
+// A CV is a condition variable in the style of Mesa, Java, POSIX, and Go's sync.Cond.
+// It allows a thread to wait for a condition on state protected by a mutex,
+// and to proceed with the mutex held and the condition true.
+//
+// When compared with sync.Cond:  (a) CV adds WaitWithDeadline() which allows
+// timeouts and cancellation, (b) the mutex is an explicit argument of the wait
+// calls to remind the reader that they have a side-effect on the mutex, and
+// (c) (as a result of (b)), a zero-valued CV is a valid CV with no enqueued
+// waiters, so there is no need of a call to construct a CV.
+//
+// Usage:
+//
+// After making the desired predicate true, call:
+//     cv.Signal() // If at most one thread can make use of the predicate becoming true.
+// or
+//     cv.Broadcast() // If multiple threads can make use of the predicate becoming true.
+//
+// To wait for a predicate with no deadline (assuming cv.Broadcast() is called
+// whenever the predicate becomes true):
+//      mu.Lock()
+//      for !some_predicate_protected_by_mu { // the for-loop is required.
+//              cv.Wait(&mu)
+//      }
+//      // predicate is now true
+//      mu.Unlock()
+//
+// To wait for a predicate with a deadline (assuming cv.Broadcast() is called
+// whenever the predicate becomes true):
+//      mu.Lock()
+//      for !some_predicate_protected_by_mu && cv.WaitWithDeadline(&mu, absDeadline, cancelChan) == nsync.OK {
+//      }
+//      if some_predicate_protected_by_mu { // predicate is true
+//      } else { // predicate is false, and deadline expired, or cancelChan was closed.
+//      }
+//      mu.Unlock()
+// or, if the predicate is complex and you wish to write it just once and
+// inline, you could use the following instead of the for-loop above:
+//      mu.Lock()
+//      var predIsTrue bool
+//      for outcome := OK; ; outcome = cv.WaitWithDeadline(&mu, absDeadline, cancelChan) {
+//              if predIsTrue = some_predicate_protected_by_mu; predIsTrue || outcome != nsync.OK {
+//                      break
+//              }
+//      }
+//      if predIsTrue { // predicate is true
+//      } else { // predicate is false, and deadline expired, or cancelChan was closed.
+//      }
+//      mu.Unlock()
+//
+// As the examples show, Mesa-style condition variables require that waits use
+// a loop that tests the predicate anew after each wait.  It may be surprising
+// that these are preferred over the precise wakeups offered by the condition
+// variables in Hoare monitors.  Imprecise wakeups make more efficient use of
+// the critical section, because threads can enter it while a woken thread is
+// still emerging from the scheduler, which may take thousands of cycles.
+// Further, they make the programme easier to read and debug by making the
+// predicate explicit locally at the wait, where the predicate is about to be
+// assumed; the reader does not have to infer the predicate by examining all
+// the places where wakeups may occur.
+type CV struct {
+	word    uint32 // see bits below; read and written atomically
+	waiters dll    // Head of a doubly-linked list of enqueued waiters; under mu.
+}
+
+// Bits in CV.word
+const (
+	cvSpinlock = 1 << iota // protects waiters
+	cvNonEmpty = 1 << iota // waiters list is non-empty
+)
+
+// Values returned by CV.WaitWithDeadline().
+const (
+	OK        = iota // Neither expired nor cancelled.
+	Expired   = iota // absDeadline expired.
+	Cancelled = iota // cancelChan was closed.
+)
+
+// WaitWithDeadline() atomically releases "mu" and blocks the calling thread on
+// *cv.  It then waits until awakened by a call to Signal() or Broadcast() (or
+// a spurious wakeup), or by the time reaching absDeadline, or by cancelChan
+// being closed.  In all cases, it reacquires "mu", and returns the reason for
+// the call returned (OK, Expired, or Cancelled).  Use
+// absDeadline==nsync.NoDeadline for no deadline, and cancelChan==nil for no
+// cancellation.  WaitWithDeadline() should be used in a loop, as with all
+// Mesa-style condition variables.  See examples above.
+//
+// There are two reasons for using an absolute deadline, rather than a relative
+// timeout---these are why pthread_cond_timedwait() also uses an absolute
+// deadline.  First, condition variable waits have to be used in a loop; with
+// an absolute times, the deadline does not have to be recomputed on each
+// iteration.  Second, in most real programmes, some activity (such as an RPC
+// to a server, or when guaranteeing response time in a UI), there is a
+// deadline imposed by the specification or the caller/user; relative delays
+// can shift arbitrarily with scheduling delays, and so after multiple waits
+// might extend beyond the expected deadline.  Relative delays tend to be more
+// convenient mostly in tests and trivial examples than they are in real
+// programmes.
+func (cv *CV) WaitWithDeadline(mu sync.Locker, absDeadline time.Time, cancelChan <-chan struct{}) (outcome int) {
+	var w *waiter = newWaiter()
+	atomic.StoreUint32(&w.waiting, 1)
+	cvMu, _ := mu.(*Mu)
+	w.cvMu = cvMu // If the Locker is an nsync.Mu, record its address, else record nil.
+
+	oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock|cvNonEmpty) // acquire spinlock, set non-empty
+	if (oldWord & cvNonEmpty) == 0 {
+		cv.waiters.MakeEmpty() // initialize the waiter queue if it was empty.
+	}
+	w.q.InsertAfter(&cv.waiters)
+	// Release the spin lock.
+	atomic.StoreUint32(&cv.word, oldWord|cvNonEmpty) // release store
+
+	mu.Unlock() // Release *mu.
+
+	// Prepare a time.Timer for the deadline, if any.  We use a time.Timer
+	// pre-allocated for the waiter, to avoid allocating and garbage
+	// collecting one on each wait.
+	var deadlineTimer *time.Timer
+	if absDeadline != NoDeadline {
+		deadlineTimer = w.deadlineTimer
+		if deadlineTimer.Reset(absDeadline.Sub(time.Now())) {
+			// w.deadlineTimer is guaranteed inactive and drained;
+			// see "Stop any active timer" code below.
+			panic("deadlineTimer was active")
+		}
+	}
+
+	// Wait until awoken or a timeout.
+	semOutcome := OK
+	var attempts uint
+	for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+		if semOutcome == OK {
+			semOutcome = w.sem.PWithDeadline(deadlineTimer, cancelChan)
+		}
+		if semOutcome != OK && atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+			// A timeout or cancellation occurred, and no wakeup.  Acquire the spinlock, and confirm.
+			oldWord = spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock)
+			// Check that w wasn't removed from the queue after we
+			// checked above, but before we acquired the spinlock.
+			// The call to IsInList() confirms that the waiter *w is still governed
+			// by *cv's spinlock; otherwise, some other thread is about to set w.waiting==0.
+			if atomic.LoadUint32(&w.waiting) != 0 && w.q.IsInList(&cv.waiters) { // still in waiter queue
+				// Not woken, so remove ourselves from queue, and declare a timeout or cancellation.
+				outcome = semOutcome
+				w.q.Remove()
+				atomic.StoreUint32(&w.waiting, 0) // release store
+				if cv.waiters.IsEmpty() {
+					oldWord &^= cvNonEmpty
+				}
+			}
+			// Release spinlock.
+			atomic.StoreUint32(&cv.word, oldWord) // release store
+			if atomic.LoadUint32(&w.waiting) != 0 {
+				attempts = spinDelay(attempts) // so we will ultimately yield to scheduler.
+			}
+		}
+	}
+
+	// Stop any active timer, and drain its channel.
+	if deadlineTimer != nil && semOutcome != Expired && !deadlineTimer.Stop() /*expired*/ {
+		// This receive is synchonous because time.Timer's expire+send
+		// is not atomic:  it may send after Stop() returns false!  The
+		// "semOutcome != Expired" ensures that the value wasn't
+		// consumed by the PWithDeadline() above.
+		<-deadlineTimer.C
+	}
+
+	if cvMu != nil && w.cvMu == nil { // waiter was transferred to mu's queue, and woken.
+		// Requeue mu using existing waiter struct; current thread is the designated waker.
+		cvMu.lockSlow(w, muDesigWaker)
+	} else {
+		// Traditional case: We've woken from the CV, and need to reacquire mu.
+		freeWaiter(w)
+		mu.Lock()
+	}
+	return outcome
+}
+
+// Signal() wakes at least one thread currently enqueued on *cv.
+func (cv *CV) Signal() {
+	if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
+		var toWakeList *waiter                                      // waiters that we will wake
+		oldWord := spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
+		if !cv.waiters.IsEmpty() {
+			// Point to first waiter that enqueued itself, and detach it from all others.
+			toWakeList = cv.waiters.prev.elem
+			toWakeList.q.Remove()
+			toWakeList.q.MakeEmpty()
+			if cv.waiters.IsEmpty() {
+				oldWord &^= cvNonEmpty
+			}
+		}
+		// Release spinlock.
+		atomic.StoreUint32(&cv.word, oldWord) // release store
+		if toWakeList != nil {
+			wakeWaiters(toWakeList)
+		}
+	}
+}
+
+// Broadcast() wakes all threads currently enqueued on *cv.
+func (cv *CV) Broadcast() {
+	if (atomic.LoadUint32(&cv.word) & cvNonEmpty) != 0 { // acquire load
+		var toWakeList *waiter                           // waiters that we will wake
+		spinTestAndSet(&cv.word, cvSpinlock, cvSpinlock) // acquire spinlock
+		if !cv.waiters.IsEmpty() {
+			// Point to last waiter that enqueued itself, still attached to all other waiters.
+			toWakeList = cv.waiters.next.elem
+			cv.waiters.Remove()
+			cv.waiters.MakeEmpty()
+		}
+		// Release spinlock and mark queue empty.
+		atomic.StoreUint32(&cv.word, 0) // release store
+		if toWakeList != nil {
+			wakeWaiters(toWakeList)
+		}
+	}
+}
+
+// Wait() atomically releases "mu" and blocks the caller on *cv.  It waits
+// until it is awakened by a call to Signal() or Broadcast(), or a spurious
+// wakeup.  It then reacquires "mu", and returns.  It is equivalent to a call
+// to WaitWithDeadline() with absDeadline==NoDeadline, and a nil cancelChan.
+// It should be used in a loop, as with all standard Mesa-style condition
+// variables.  See examples above.
+func (cv *CV) Wait(mu sync.Locker) {
+	cv.WaitWithDeadline(mu, NoDeadline, nil)
+}
+
+// ------------------------------------------
+
+// wakeWaiters() wakes the CV waiters in the circular list pointed to by toWakeList,
+// which may not be nil.  If the waiter is associated with an nsync.Mu (as
+// opposed to another implementation of sync.Locker), the "wakeup" may consist
+// of transferring the waiters to the nsync.Mu's queue.  Requires:
+// - Every element of the list pointed to by toWakeList is a waiter---there is
+//   no head/sentinel.
+// - Every waiter is associated with the same mutex.
+func wakeWaiters(toWakeList *waiter) {
+	var firstWaiter *waiter = toWakeList.q.prev.elem
+	var mu *Mu = firstWaiter.cvMu
+	if mu != nil { // waiter is associated with the nsync.Mu *mu.
+		// We will transfer elements of toWakeList to *mu if all of:
+		//  - mu's spinlock is not held, and
+		//  - either mu is locked, or there's more than one thread on toWakeList, and
+		//  - we acquire the spinlock on the first try.
+		// The spinlock acquisition also marks mu as having waiters.
+		var oldMuWord uint32 = atomic.LoadUint32(&mu.word)
+		var locked bool = (oldMuWord & muLock) != 0
+		var setDesigWaker uint32 // set to muDesigWaker if a thread is to be woken rather than transferred
+		if !locked {
+			setDesigWaker = muDesigWaker
+		}
+		if (oldMuWord&muSpinlock) == 0 &&
+			(locked || firstWaiter != toWakeList) &&
+			atomic.CompareAndSwapUint32(&mu.word, oldMuWord, (oldMuWord|muSpinlock|muWaiting|setDesigWaker)) { // acquire CAS
+
+			// Choose which waiters to transfer, and which to wake.
+			toTransferList := toWakeList
+			if locked { // *mu is held; all the threads get transferred.
+				toWakeList = nil
+			} else { // *mu is not held; we transfer all but the first thread, which will be woken.
+				toWakeList = firstWaiter
+				toWakeList.q.Remove()
+				toWakeList.q.MakeEmpty()
+			}
+
+			// Transfer the waiters on toTransferList to *mu's
+			// waiter queue.  We've acquired *mu's spinlock.  Queue
+			// the threads there instead of waking them.
+			for toTransferList != nil {
+				var toTransfer *waiter = toTransferList.q.prev.elem
+				if toTransfer == toTransferList { // *toTransferList was singleton; *toTransfer is last waiter
+					toTransferList = nil
+				} else {
+					toTransfer.q.Remove()
+				}
+				if toTransfer.cvMu != mu {
+					panic("multiple mutexes used with condition variable")
+				}
+				toTransfer.cvMu = nil // tell WaitWithDeadline() that we moved the waiter to *mu's queue.
+				// toTransfer.waiting is already 1, from being on CV's waiter queue.
+				if (oldMuWord & muWaiting) == 0 { // if there were previously no waiters, initialize.
+					mu.waiters.MakeEmpty()
+					oldMuWord |= muWaiting // so next iteration won't initialize again.
+				}
+				toTransfer.q.InsertAfter(&mu.waiters)
+			}
+
+			// release *mu's spinlock  (muWaiting was set by CAS above)
+			oldMuWord = atomic.LoadUint32(&mu.word)
+			for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord&^muSpinlock) { // release CAS
+				oldMuWord = atomic.LoadUint32(&mu.word)
+			}
+		} else { // Set muDesigWaker because at least one thread is to be woken.
+			oldMuWord = atomic.LoadUint32(&mu.word)
+			for !atomic.CompareAndSwapUint32(&mu.word, oldMuWord, oldMuWord|muDesigWaker) {
+				oldMuWord = atomic.LoadUint32(&mu.word)
+			}
+		}
+	}
+
+	// Wake any waiters we didn't manage to enqueue on the Mu.
+	for toWakeList != nil {
+		// Take one waiter from the toWakeList.
+		var toWake *waiter = toWakeList.q.prev.elem
+		if toWake == toWakeList { // *toWakeList was a singleton; *toWake is the last waiter
+			toWakeList = nil // tell the loop to exit
+		} else {
+			toWake.q.Remove() // get the waiter out of the list
+		}
+
+		// Wake the waiter.
+		atomic.StoreUint32(&toWake.waiting, 0) // release store
+		toWake.sem.V()
+	}
+}
diff --git a/nsync/cv_test.go b/nsync/cv_test.go
new file mode 100644
index 0000000..325f861
--- /dev/null
+++ b/nsync/cv_test.go
@@ -0,0 +1,256 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "testing"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// ---------------------------
+
+// A queue represents a FIFO queue with up to Limit elements.
+// The storage for the queue expands as necessary up to Limit.
+type queue struct {
+	Limit    int           // max value of count---should not be changed after initialization
+	nonEmpty nsync.CV      // signalled when count transitions from zero to non-zero
+	nonFull  nsync.CV      // signalled when count transitions from Limit to less than Limit
+	mu       nsync.Mu      // protects fields below
+	data     []interface{} // in use elements are data[pos, ..., (pos+count-1)%len(data)]
+	pos      int           // index of first in-use element
+	count    int           // number of elements in use
+}
+
+// Put() adds v to the end of the FIFO *q and returns true, or if the FIFO already
+// has Limit elements and continues to do so until absDeadline, do nothing and
+// return false.
+func (q *queue) Put(v interface{}, absDeadline time.Time) (added bool) {
+	q.mu.Lock()
+	for q.count == q.Limit && q.nonFull.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+	}
+	if q.count != q.Limit {
+		length := len(q.data)
+		i := q.pos + q.count
+		if q.count == length {
+			newLength := length * 2
+			if newLength == 0 {
+				newLength = 16
+			}
+			if q.Limit < newLength {
+				newLength = q.Limit
+			}
+			newData := make([]interface{}, newLength)
+			if i <= length {
+				copy(newData[:], q.data[q.pos:i])
+			} else {
+				n := copy(newData[:], q.data[q.pos:length])
+				copy(newData[n:], q.data[:i-length])
+			}
+			q.pos = 0
+			i = q.count
+			q.data = newData
+			length = newLength
+		}
+		if length <= i {
+			i -= length
+		}
+		q.data[i] = v
+		if q.count == 0 {
+			q.nonEmpty.Broadcast()
+		}
+		q.count++
+		added = true
+	}
+	q.mu.Unlock()
+	return added
+}
+
+// Get() removes the first value from the front of the FIFO *q and returns it
+// and true, or if the FIFO is empty and continues to be so until absDeadline,
+// do nothing and return nil and false.
+func (q *queue) Get(absDeadline time.Time) (v interface{}, ok bool) {
+	q.mu.Lock()
+	for q.count == 0 && q.nonEmpty.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+	}
+	if q.count != 0 {
+		v = q.data[q.pos]
+		q.data[q.pos] = nil
+		if q.count == q.Limit {
+			q.nonFull.Broadcast()
+		}
+		q.pos++
+		q.count--
+		if q.pos == len(q.data) {
+			q.pos = 0
+		}
+		ok = true
+	}
+	q.mu.Unlock()
+	return v, ok
+}
+
+// ---------------------------
+
+// producerN() Put()s count integers on *q, in the sequence start*3, (start+1)*3, (start+2)*3, ....
+func producerN(t *testing.T, q *queue, start int, count int) {
+	for i := 0; i != count; i++ {
+		if !q.Put((start+i)*3, nsync.NoDeadline) {
+			t.Fatalf("queue.Put() returned false with no deadline")
+		}
+	}
+}
+
+// consumerN() Get()s count integers from *q, and checks that they are in the
+// sequence start*3, (start+1)*3, (start+2)*3, ....
+func consumerN(t *testing.T, q *queue, start int, count int) {
+	for i := 0; i != count; i++ {
+		v, ok := q.Get(nsync.NoDeadline)
+		if !ok {
+			t.Fatalf("queue.Get() returned false with no deadline")
+		}
+		x, isInt := v.(int)
+		if !isInt {
+			t.Fatalf("queue.Get() returned non integer value; wanted int %d, got %#v", (start+i)*3, v)
+		}
+		if x != (start+i)*3 {
+			t.Fatalf("queue.Get() returned bad value; want %d, got %d", (start+i)*3, x)
+		}
+	}
+}
+
+// producerConsumerN is the number of elements passed from producer to consumer in the
+// TestCVProducerConsumerX() tests below.
+const producerConsumerN = 300000
+
+// TestCVProducerConsumer0() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**0.
+func TestCVProducerConsumer0(t *testing.T) {
+	q := queue{Limit: 1}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer1() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**1.
+func TestCVProducerConsumer1(t *testing.T) {
+	q := queue{Limit: 10}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer2() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**2.
+func TestCVProducerConsumer2(t *testing.T) {
+	q := queue{Limit: 100}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer3() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**3.
+func TestCVProducerConsumer3(t *testing.T) {
+	q := queue{Limit: 1000}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer4() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**4.
+func TestCVProducerConsumer4(t *testing.T) {
+	q := queue{Limit: 10000}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer5() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**5.
+func TestCVProducerConsumer5(t *testing.T) {
+	q := queue{Limit: 100000}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVProducerConsumer6() sends a stream of integers from a producer thread to
+// a consumer thread via a queue with Limit 10**6.
+func TestCVProducerConsumer6(t *testing.T) {
+	q := queue{Limit: 1000000}
+	go producerN(t, &q, 0, producerConsumerN)
+	consumerN(t, &q, 0, producerConsumerN)
+}
+
+// TestCVDeadline() checks timeouts on a CV WaitWithDeadline().
+func TestCVDeadline(t *testing.T) {
+	var mu nsync.Mu
+	var cv nsync.CV
+
+	// The following two values control how aggressively we police the timeout.
+	var tooEarly time.Duration = 1 * time.Millisecond
+	var tooLate time.Duration = 35 * time.Millisecond // longer, to accommodate scheduling delays
+
+	mu.Lock()
+	for i := 0; i != 50; i++ {
+		startTime := time.Now()
+		expectedEndTime := startTime.Add(87 * time.Millisecond)
+		if cv.WaitWithDeadline(&mu, expectedEndTime, nil) != nsync.Expired {
+			t.Fatalf("cv.Wait() returns non-Expired for a timeout")
+		}
+		endTime := time.Now()
+		if endTime.Before(expectedEndTime.Add(-tooEarly)) {
+			t.Errorf("cvWait() returned %v too early", expectedEndTime.Sub(endTime))
+		}
+		if endTime.After(expectedEndTime.Add(tooLate)) {
+			t.Errorf("cvWait() returned %v too late", endTime.Sub(expectedEndTime))
+		}
+	}
+	mu.Unlock()
+}
+
+// TestCVCancel() checks cancellations on a CV WaitWithDeadline().
+func TestCVCancel(t *testing.T) {
+	var mu nsync.Mu
+	var cv nsync.CV
+
+	// The loops below cancel after 87 milliseconds, like the timeout tests above.
+
+	// The following two values control how aggressively we police the timeout.
+	var tooEarly time.Duration = 1 * time.Millisecond
+	var tooLate time.Duration = 35 * time.Millisecond // longer, to accommodate scheduling delays
+
+	var futureTime time.Time = time.Now().Add(1 * time.Hour) // a future time, to test cancels with pending timeout
+
+	mu.Lock()
+	for i := 0; i != 50; i++ {
+		startTime := time.Now()
+		expectedEndTime := startTime.Add(87 * time.Millisecond)
+
+		cancel := make(chan struct{})
+		time.AfterFunc(87*time.Millisecond, func() { close(cancel) })
+
+		if cv.WaitWithDeadline(&mu, futureTime, cancel) != nsync.Cancelled {
+			t.Fatalf("cv.Wait() return non-Cancelled for a cancellation")
+		}
+		endTime := time.Now()
+		if endTime.Before(expectedEndTime.Add(-tooEarly)) {
+			t.Errorf("cvWait() returned %v too early", expectedEndTime.Sub(endTime))
+		}
+		if endTime.After(expectedEndTime.Add(tooLate)) {
+			t.Errorf("cvWait() returned %v too late", endTime.Sub(expectedEndTime))
+		}
+
+		// Check that an already cancelled wait returns immediately.
+		startTime = time.Now()
+		if cv.WaitWithDeadline(&mu, nsync.NoDeadline, cancel) != nsync.Cancelled {
+			t.Fatalf("cv.Wait() returns non-Cancelled for a cancellation")
+		}
+		endTime = time.Now()
+		if endTime.Before(startTime) {
+			t.Errorf("cvWait() returned %v too early", endTime.Sub(startTime))
+		}
+		if endTime.After(startTime.Add(tooLate)) {
+			t.Errorf("cvWait() returned %v too late", endTime.Sub(startTime))
+		}
+	}
+	mu.Unlock()
+}
diff --git a/nsync/cv_wait_example_test.go b/nsync/cv_wait_example_test.go
new file mode 100644
index 0000000..005871f
--- /dev/null
+++ b/nsync/cv_wait_example_test.go
@@ -0,0 +1,114 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Example use of Mu.Wait():  A priority queue of strings whose
+// RemoveWithDeadline() operation has a deadline.
+
+package nsync_test
+
+import "container/heap"
+import "fmt"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// ---------------------------------------
+
+// A priQueue implements heap.Interface and holds strings.
+type priQueue []string
+
+func (pq priQueue) Len() int               { return len(pq) }
+func (pq priQueue) Less(i int, j int) bool { return pq[i] < pq[j] }
+func (pq priQueue) Swap(i int, j int)      { pq[i], pq[j] = pq[j], pq[i] }
+func (pq *priQueue) Push(x interface{})    { *pq = append(*pq, x.(string)) }
+func (pq *priQueue) Pop() interface{} {
+	old := *pq
+	n := len(old)
+	s := old[n-1]
+	*pq = old[0 : n-1]
+	return s
+}
+
+// ---------------------------------------
+
+// A StringPriorityQueue is a priority queue of strings, which emits the
+// lexicographically least string available.
+type StringPriorityQueue struct {
+	nonEmpty nsync.CV // signalled when heap becomes non-empty
+	mu       nsync.Mu // protects priQueue
+	heap     priQueue
+}
+
+// Add() adds "s" to the queue *q.
+func (q *StringPriorityQueue) Add(s string) {
+	q.mu.Lock()
+	if q.heap.Len() == 0 {
+		q.nonEmpty.Broadcast()
+	}
+	heap.Push(&q.heap, s)
+	q.mu.Unlock()
+}
+
+// RemoveWithDeadline() waits until queue *q is non-empty, then removes a string from its
+// beginning, and returns it with true; or if absDeadline is reached before the
+// queue becomes non-empty, returns the empty string and false.
+func (q *StringPriorityQueue) RemoveWithDeadline(absDeadline time.Time) (s string, ok bool) {
+	q.mu.Lock()
+	for q.heap.Len() == 0 && q.nonEmpty.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
+	}
+	if q.heap.Len() != 0 {
+		s = heap.Pop(&q.heap).(string)
+		ok = true
+	}
+	q.mu.Unlock()
+	return s, ok
+}
+
+// ---------------------------------------
+
+// addAndWait() adds strings s[0, ...] to *q, with the specified delay between additions.
+func addAndWait(q *StringPriorityQueue, delay time.Duration, s ...string) {
+	for i := range s {
+		q.Add(s[i])
+		time.Sleep(delay)
+	}
+}
+
+// removeAndPrint() removes the first item from *q and outputs it on stdout,
+// or outputs "timeout: <delay>" if no value can be found before "delay" elapses.
+func removeAndPrint(q *StringPriorityQueue, delay time.Duration) {
+	if s, ok := q.RemoveWithDeadline(time.Now().Add(delay)); ok {
+		fmt.Printf("%s\n", s)
+	} else {
+		fmt.Printf("timeout %v\n", delay)
+	}
+}
+
+// ExampleMuWait() demonstrates the use of nsync.Mu's Wait() via a priority queue of strings.
+// See the routine RemoveWithDeadline(), above.
+func ExampleCVWait() {
+	var q StringPriorityQueue
+
+	go addAndWait(&q, 500*time.Millisecond, "one", "two", "three", "four", "five")
+
+	time.Sleep(1100 * time.Millisecond) // delay while "one", "two" and "three" are queued, but not yet "four"
+
+	removeAndPrint(&q, 1*time.Second)        // should get "one"
+	removeAndPrint(&q, 1*time.Second)        // should get "three" (it's lexicographically less than "two")
+	removeAndPrint(&q, 1*time.Second)        // should get "two"
+	removeAndPrint(&q, 100*time.Millisecond) // should time out because 1.1 < 0.5*3
+	removeAndPrint(&q, 1*time.Second)        // should get "four"
+	removeAndPrint(&q, 100*time.Millisecond) // should time out because 0.1 < 0.5
+	removeAndPrint(&q, 1*time.Second)        // should get "five"
+	removeAndPrint(&q, 1*time.Second)        // should time out because there's no more to fetch
+	// Output:
+	// one
+	// three
+	// two
+	// timeout 100ms
+	// four
+	// timeout 100ms
+	// five
+	// timeout 1s
+}
diff --git a/nsync/mu.go b/nsync/mu.go
new file mode 100644
index 0000000..6f419c1
--- /dev/null
+++ b/nsync/mu.go
@@ -0,0 +1,215 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The nsync package provides a mutex Mu and a Mesa-style condition variable CV.
+//
+// The nsync primitives differ from those in sync in that nsync provides
+// timed/cancellable wait on CV, and try-lock on Mu; CV's wait primitives take
+// the mutex as an explicit argument to remind the reader that they have a side
+// effect on the mutex; the zero value CV can be used without further
+// initialization; and Mu forbids a lock acquired by one thread to be released
+// by another.
+//
+// As well as Mu and CV being usable with one another, an nsync,Mu can be used
+// with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.
+package nsync
+
+import "sync/atomic"
+
+// Implementation notes
+//
+// The implementation of Mu and CV both use spinlocks to protect their waiter
+// queues.  The spinlocks are implemented with atomic operations and a delay
+// loop found in common.go.  They could use sync.Mutex, but I wished to have an
+// implementation independent of the sync package (except for the trivial use
+// of the sync.Locker interface.
+//
+// Mu and CV use the same type of doubly-linked list of waiters (see
+// waiter.go).  This allows waiters to be transferred from the CV queue to the
+// Mu queue when a thread is logically woken from the CV but would immediately
+// go to sleep on the Mu.  See the wakeWaiters() call in cv.go.
+//
+// In Mu, the "designated waker" is a thread that was waiting on Mu, has been
+// woken up, but as yet has neither acquired nor gone back to waiting.  The
+// presence of such a thread is indicated by the muDesigWaker bit in the Mu
+// word.  This bit allows the Unlock() code to avoid waking a second waiter
+// when there's already one will wake the next thread when the time comes.
+// This speeds things up when the lock is heavily contended, and the critical
+// sections are small.
+//
+// The weasel words "with high probability" in the specification of TryLock()
+// prevent clients from believing that they can determine with certainty
+// whether another thread has given up a lock yet.  This, together with the
+// requirement that a thread that acquired a mutex must release it (rather than
+// it being released by another thread), prohibits clients from using Mu as a
+// sort of semaphore.  The intent is that it be used only for traditional
+// mutual exclusion.  This leaves room for certain future optimizations, and
+// make it easier to apply detection of potential races via candidate lock-set
+// algorithms, should that ever be desired.
+//
+// The condition variable has a wait with an absolute rather than a relative
+// timeout.  This is less error prone, as described in the comment on
+// CV.WaitWithDeadline().  Alas, relative timeouts are seductive in trivial
+// examples (such as tests).  These are the first things that people try, so
+// they are likely to be requested.  If enough people complain we could give
+// them that particular piece of rope.
+//
+// The condition variable uses an internal binary semaphore that is passed a
+// *time.Timer stored in a *waiter for its expirations.  It is as invariant
+// that the timer is inactive and its channel drained when the waiter is on the
+// waiter free list.
+
+// A Mu is a mutex.  Its zero value is valid, and unlocked.
+// It is similar to sync.Mutex, but implements TryLock().
+//
+// A Mu can be "free", or held by a single thread (aka goroutine).  A thread
+// that acquires it should eventually release it.  It is not legal to acquire a
+// Mu in one thread and release it in another.
+//
+// Example usage, where p.mu is an nsync.Mu protecting the invariant p.a+p.b==0
+//      p.mu.Lock()
+//      // The current thread now has exclusive access to p.a and p.b; invariant assumed true.
+//      p.a++
+//      p.b-- // restore invariant p.a+p.b==0 before releasing p.mu
+//      p.mu.Unlock()
+type Mu struct {
+	word    uint32 // bits:  see below
+	waiters dll    // Head of a doubly-linked list of enqueued waiters; under mu.
+}
+
+// Bits in Mu.word
+const (
+	muLock       = 1 << iota // lock is held.
+	muSpinlock   = 1 << iota // spinlock is held (protects waiters).
+	muWaiting    = 1 << iota // waiter list is non-empty.
+	muDesigWaker = 1 << iota // a former waiter has been woken, and has not yet acquired or slept once more.
+)
+
+// TryLock() attempts to acquire *mu without blocking, and returns whether it is successful.
+// It returns true with high probability if *mu was free on entry.
+func (mu *Mu) TryLock() bool {
+	if atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
+		return true
+	}
+	oldWord := atomic.LoadUint32(&mu.word)
+	return (oldWord&muLock) == 0 && atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) // acquire CAS
+}
+
+// Lock() blocks until *mu is free and then acquires it.
+func (mu *Mu) Lock() {
+	if !atomic.CompareAndSwapUint32(&mu.word, 0, muLock) { // acquire CAS
+		oldWord := atomic.LoadUint32(&mu.word)
+		if (oldWord&muLock) != 0 || !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muLock) { // acquire CAS
+			mu.lockSlow(newWaiter(), 0)
+		}
+	}
+}
+
+// lockSlow() locks *mu, waiting on *w if it needs to wait.
+// "clear" should be zero if the thread has not previously slept on *mu, and
+// muDesigWaker if it has; this represents bits that lockSlow() must clear when
+// it either acquires or sleeps on *mu.
+func (mu *Mu) lockSlow(w *waiter, clear uint32) {
+	var attempts uint // attempt count; used for spinloop backoff
+	w.cvMu = nil      // not a CV wait
+	for {
+		oldWord := atomic.LoadUint32(&mu.word)
+		if (oldWord & muLock) == 0 {
+			// lock is not held; try to acquire, possibly clearing muDesigWaker
+			if atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muLock)&^clear) { // acquire CAS
+				freeWaiter(w)
+				return
+			}
+		} else if (oldWord&muSpinlock) == 0 &&
+			atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muSpinlock|muWaiting)&^clear) { // acquire CAS
+
+			// Spinlock is now held, and lock is held by someone
+			// else; muWaiting has also been set; queue ourselves.
+			atomic.StoreUint32(&w.waiting, 1)
+			if (oldWord & muWaiting) == 0 { // init the waiter list if it's empty.
+				mu.waiters.MakeEmpty()
+			}
+			w.q.InsertAfter(&mu.waiters)
+
+			// Release spinlock.  Cannot use a store here, because
+			// the current thread does not hold the mutex.  If
+			// another thread were a designated waker, the mutex
+			// holder could be concurrently unlocking, even though
+			// we hold the spinlock.
+			oldWord = atomic.LoadUint32(&mu.word)
+			for !atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord&^muSpinlock) { // release CAS
+				oldWord = atomic.LoadUint32(&mu.word)
+			}
+
+			// Wait until awoken.
+			for atomic.LoadUint32(&w.waiting) != 0 { // acquire load
+				w.sem.P()
+			}
+
+			attempts = 0
+			clear = muDesigWaker
+		}
+		attempts = spinDelay(attempts)
+	}
+}
+
+// Unlock() unlocks *mu, and wakes a waiter if there is one.
+func (mu *Mu) Unlock() {
+	// Go is a garbage-collected language, so it's legal (and slightly
+	// faster on x86) to release with an atomic add before checking for
+	// waiters.  Without GC, this would not work because another thread
+	// could acquire, decrement a reference count and deallocate the mutex
+	// before the current thread touched the mutex word again to wake
+	// waiters.
+	newWord := atomic.AddUint32(&mu.word, ^uint32(muLock-1))
+	if (newWord&(muLock|muWaiting)) == 0 || (newWord&(muLock|muDesigWaker)) == muDesigWaker {
+		return
+	}
+
+	if (newWord & muLock) != 0 {
+		panic("attempt to Unlock a free nsync.Mu")
+	}
+
+	var attempts uint // attempt count; used for backoff
+	for {
+		oldWord := atomic.LoadUint32(&mu.word)
+		if (oldWord&muWaiting) == 0 || (oldWord&muDesigWaker) == muDesigWaker {
+			return // no one to wake, or there's already a designated waker waking up.
+		} else if (oldWord&muSpinlock) == 0 &&
+			atomic.CompareAndSwapUint32(&mu.word, oldWord, oldWord|muSpinlock|muDesigWaker) { // acquire CAS
+			// The spinlock is now held, and we've set the designated wake flag, since
+			// we're likely to wake a thread that will become that designated waker.
+
+			if mu.waiters.elem != nil {
+				panic("non-nil mu.waiters.dll.elem")
+			}
+
+			// Remove a waiter from the queue, if possible.
+			var wake *waiter = mu.waiters.prev.elem
+			var clearOnRelease uint32 = muSpinlock
+			if wake != nil {
+				wake.q.Remove()
+			} else {
+				clearOnRelease |= muDesigWaker // not waking a waiter => no designated waker
+			}
+			if mu.waiters.IsEmpty() {
+				clearOnRelease |= muWaiting // no waiters left
+			}
+			// Unset the spinlock bit; set the designated wake
+			// bit---the thread we're waking is the new designated
+			// waker.  Cannot use a store here because we no longer
+			// hold the main lock.
+			oldWord = atomic.LoadUint32(&mu.word)
+			for !atomic.CompareAndSwapUint32(&mu.word, oldWord, (oldWord|muDesigWaker)&^clearOnRelease) { // release CAS
+				oldWord = atomic.LoadUint32(&mu.word)
+			}
+			if wake != nil { // Wake the waiter
+				atomic.StoreUint32(&wake.waiting, 0) // release store
+				wake.sem.V()
+			}
+			return
+		}
+		attempts = spinDelay(attempts)
+	}
+}
diff --git a/nsync/mu_test.go b/nsync/mu_test.go
new file mode 100644
index 0000000..d80b841
--- /dev/null
+++ b/nsync/mu_test.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "runtime"
+import "sync"
+import "testing"
+
+import "v.io/x/lib/nsync"
+
+// A testData is the state shared between the threads in each of the tests below.
+type testData struct {
+	nThreads  int // Number of test threads; constant after init.
+	loopCount int // Iteration count for each test thread; constant after init.
+
+	mu nsync.Mu // Protects i, id, and finishedThreads.
+	i  int      // Counter incremented by test loops.
+	id int      // id of current lock-holding thread in some tests.
+
+	mutex sync.Mutex // Protects i and id when in countingLoopMutex.
+
+	done            nsync.CV // Signalled when finishedThread==nThreads.
+	finishedThreads int      // Count of threads that have finished.
+}
+
+// threadFinished() indicates that a thread has finished its operations on testData
+// by incrementing td.finishedThreads, and signalling td.done when it reaches td.nThreads.
+// See waitForAllThreads().
+// We could use sync.WaitGroup here, but this code exercises nsync more.
+func (td *testData) threadFinished() {
+	td.mu.Lock()
+	td.finishedThreads++
+	if td.finishedThreads == td.nThreads {
+		td.done.Broadcast()
+	}
+	td.mu.Unlock()
+}
+
+// waitForAllThreads() waits until all td.nThreads have called threadFinished(),
+// and then returns.
+// We could use sync.WaitGroup here, but this code exercises nsync more.
+func (td *testData) waitForAllThreads() {
+	td.mu.Lock()
+	for td.finishedThreads != td.nThreads {
+		td.done.Wait(&td.mu)
+	}
+	td.mu.Unlock()
+}
+
+// ---------------------------------------
+
+// countingLoopMu() is the body of each thread executed by TestMuNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread.
+func countingLoopMu(td *testData, id int) {
+	var n int = td.loopCount
+	for i := 0; i != n; i++ {
+		td.mu.Lock()
+		td.id = id
+		td.i++
+		if td.id != id {
+			panic("td.id != id")
+		}
+		td.mu.Unlock()
+	}
+	td.threadFinished()
+}
+
+// TestMuNThread creates a few threads, each of which increment an
+// integer a fixed number of times, using an nsync.Mu for mutual exclusion.
+// It checks that the integer is incremented the correct number of times.
+func TestMuNThread(t *testing.T) {
+	td := testData{nThreads: 5, loopCount: 1000000}
+	for i := 0; i != td.nThreads; i++ {
+		go countingLoopMu(&td, i)
+	}
+	td.waitForAllThreads()
+	if td.i != td.nThreads*td.loopCount {
+		t.Fatalf("TestMuNThread final count inconsistent: want %d, got %d",
+			td.nThreads*td.loopCount, td.i)
+	}
+}
+
+// ---------------------------------------
+
+// countingLoopMutex() is the body of each thread executed by TestMutexNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread, here protected by a sync.Mutex.
+func countingLoopMutex(td *testData, id int) {
+	var n int = td.loopCount
+	for i := 0; i != n; i++ {
+		td.mutex.Lock()
+		td.id = id
+		td.i++
+		if td.id != id {
+			panic("td.id != id")
+		}
+		td.mutex.Unlock()
+	}
+	td.threadFinished()
+}
+
+// TestMutexNThread creates a few threads, each of which increment an
+// integer a fixed number of times, using a sync.Mutex for mutual exclusion.
+// It checks that the integer is incremented the correct number of times.
+func TestMutexNThread(t *testing.T) {
+	td := testData{nThreads: 5, loopCount: 1000000}
+	for i := 0; i != td.nThreads; i++ {
+		go countingLoopMutex(&td, i)
+	}
+	td.waitForAllThreads()
+	if td.i != td.nThreads*td.loopCount {
+		t.Fatalf("TestMutexNThread final count inconsistent: want %d, got %d",
+			td.nThreads*td.loopCount, td.i)
+	}
+}
+
+// ---------------------------------------
+
+// countingLoopTryMu() is the body of each thread executed by TestTryMuNThread().
+// *td represents the test data that the threads share, and id is an integer
+// unique to each test thread.
+func countingLoopTryMu(td *testData, id int) {
+	var n int = td.loopCount
+	for i := 0; i != n; i++ {
+		for !td.mu.TryLock() {
+			runtime.Gosched()
+		}
+		td.id = id
+		td.i++
+		if td.id != id {
+			panic("td.id != id")
+		}
+		td.mu.Unlock()
+	}
+	td.threadFinished()
+}
+
+// TestTryMuNThread() tests that acquiring an nsync.Mu with TryLock()
+// using several threads provides mutual exclusion.
+func TestTryMuNThread(t *testing.T) {
+	td := testData{nThreads: 5, loopCount: 100000}
+	for i := 0; i != td.nThreads; i++ {
+		go countingLoopTryMu(&td, i)
+	}
+	td.waitForAllThreads()
+	if td.i != td.nThreads*td.loopCount {
+		t.Fatalf("TestTryMuNThread final count inconsistent: want %d, got %d",
+			td.nThreads*td.loopCount, td.i)
+	}
+}
+
+// ---------------------------------------
+
+// BenchmarkMuUncontended() measures the performance of an uncontended nsync.Mu.
+func BenchmarkMuUncontended(b *testing.B) {
+	var mu nsync.Mu
+	for i := 0; i != b.N; i++ {
+		mu.Lock()
+		mu.Unlock()
+	}
+}
+
+// BenchmarkMutexUncontended() measures the performance of an uncontended sync.Mutex.
+func BenchmarkMutexUncontended(b *testing.B) {
+	var mu sync.Mutex
+	for i := 0; i != b.N; i++ {
+		mu.Lock()
+		mu.Unlock()
+	}
+}
diff --git a/nsync/pingpong_test.go b/nsync/pingpong_test.go
new file mode 100644
index 0000000..cfb314f
--- /dev/null
+++ b/nsync/pingpong_test.go
@@ -0,0 +1,150 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync_test
+
+import "sync"
+import "testing"
+import "time"
+
+import "v.io/x/lib/nsync"
+
+// The benchmarks in this file use various mechanisms to
+// ping-pong back and forth between two threads as they count i from
+// 0 to limit.
+// The data structure contains multiple synchronization primitives,
+// but each benchmark uses only those it needs.
+//
+// The setting of GOMAXPROCS, and the exact choices of the thread scheduler can
+// have great effect on the timings.
+type pingPong struct {
+	mu nsync.Mu
+	cv [2]nsync.CV
+
+	mutex sync.Mutex
+	cond  [2]*sync.Cond
+
+	i     int
+	limit int
+}
+
+// ---------------------------------------
+
+// mutexCVPingPong() is run by each thread in BenchmarkPingPongMutexCV().
+func (pp *pingPong) mutexCVPingPong(parity int) {
+	pp.mutex.Lock()
+	for pp.i < pp.limit {
+		for (pp.i & 1) == parity {
+			pp.cv[parity].Wait(&pp.mutex)
+		}
+		pp.i++
+		pp.cv[1-parity].Signal()
+	}
+	pp.mutex.Unlock()
+}
+
+// BenchmarkPingPongMutexCV() measures the wakeup speed of
+// sync.Mutex/nsync.CV used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMutexCV(b *testing.B) {
+	pp := pingPong{limit: b.N}
+	go pp.mutexCVPingPong(0)
+	pp.mutexCVPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCVPingPong() is run by each thread in BenchmarkPingPongMuCV().
+func (pp *pingPong) muCVPingPong(parity int) {
+	pp.mu.Lock()
+	for pp.i < pp.limit {
+		for (pp.i & 1) == parity {
+			pp.cv[parity].Wait(&pp.mu)
+		}
+		pp.i++
+		pp.cv[1-parity].Signal()
+	}
+	pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCV() measures the wakeup speed of nsync.Mu/nsync.CV used to
+// ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCV(b *testing.B) {
+	pp := pingPong{limit: b.N}
+	go pp.muCVPingPong(0)
+	pp.muCVPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCVUnexpiredDeadlinePingPong() is run by each thread in BenchmarkPingPongMuCVUnexpiredDeadline().
+func (pp *pingPong) muCVUnexpiredDeadlinePingPong(parity int) {
+	var deadlineIn1Hour time.Time = time.Now().Add(1 * time.Hour)
+	pp.mu.Lock()
+	for pp.i < pp.limit {
+		for (pp.i & 1) == parity {
+			pp.cv[parity].WaitWithDeadline(&pp.mu, deadlineIn1Hour, nil)
+		}
+		pp.i++
+		pp.cv[1-parity].Signal()
+	}
+	pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCVUnexpiredDeadline() measures the wakeup speed of nsync.Mu/nsync.CV used to
+// ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCVUnexpiredDeadline(b *testing.B) {
+	pp := pingPong{limit: b.N}
+	go pp.muCVUnexpiredDeadlinePingPong(0)
+	pp.muCVUnexpiredDeadlinePingPong(1)
+}
+
+// ---------------------------------------
+
+// mutexCondPingPong() is run by each thread in BenchmarkPingPongMutexCond().
+func (pp *pingPong) mutexCondPingPong(parity int) {
+	pp.mutex.Lock()
+	for pp.i < pp.limit {
+		for (pp.i & 1) == parity {
+			pp.cond[parity].Wait()
+		}
+		pp.i++
+		pp.cond[1-parity].Signal()
+	}
+	pp.mutex.Unlock()
+}
+
+// BenchmarkPingPongMutexCond() measures the wakeup speed of
+// sync.Mutex/sync.Cond used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMutexCond(b *testing.B) {
+	pp := pingPong{limit: b.N}
+	pp.cond[0] = sync.NewCond(&pp.mutex)
+	pp.cond[1] = sync.NewCond(&pp.mutex)
+	go pp.mutexCondPingPong(0)
+	pp.mutexCondPingPong(1)
+}
+
+// ---------------------------------------
+
+// muCondPingPong() is run by each thread in BenchmarkPingPongMuCond().
+func (pp *pingPong) muCondPingPong(parity int) {
+	pp.mu.Lock()
+	for pp.i < pp.limit {
+		for (pp.i & 1) == parity {
+			pp.cond[parity].Wait()
+		}
+		pp.i++
+		pp.cond[1-parity].Signal()
+	}
+	pp.mu.Unlock()
+}
+
+// BenchmarkPingPongMuCond() measures the wakeup speed of nsync.Mutex/sync.Cond
+// used to ping-pong back and forth between two threads.
+func BenchmarkPingPongMuCond(b *testing.B) {
+	pp := pingPong{limit: b.N}
+	pp.cond[0] = sync.NewCond(&pp.mu)
+	pp.cond[1] = sync.NewCond(&pp.mu)
+	go pp.muCondPingPong(0)
+	pp.muCondPingPong(1)
+}
diff --git a/nsync/waiter.go b/nsync/waiter.go
new file mode 100644
index 0000000..3625ecc
--- /dev/null
+++ b/nsync/waiter.go
@@ -0,0 +1,117 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nsync
+
+import "math"
+import "sync/atomic"
+import "time"
+
+// --------------------------------
+
+// A dll represents a doubly-linked list of waiters.
+type dll struct {
+	next *dll
+	prev *dll
+	elem *waiter // points to the waiter struct this dll struct is embedded in, or nil if none.
+}
+
+// MakeEmpty() makes list *l empty.
+// Requires that *l is currently not part of a non-empty list.
+func (l *dll) MakeEmpty() {
+	l.next = l
+	l.prev = l
+}
+
+// IsEmpty() returns whether list *l is empty.
+// Requires that *l is currently part of a list, or the zero dll element.
+func (l *dll) IsEmpty() bool {
+	return l.next == l
+}
+
+// InsertAfter() inserts element *e into the list after position *p.
+// Requires that *e is currently not part of a list and that *p is part of a list.
+func (e *dll) InsertAfter(p *dll) {
+	e.next = p.next
+	e.prev = p
+	e.next.prev = e
+	e.prev.next = e
+}
+
+// Remove() removes *e from the list it is currently in.
+// Requires that *e is currently part of a list.
+func (e *dll) Remove() {
+	e.next.prev = e.prev
+	e.prev.next = e.next
+}
+
+// IsInList() returns whether element e can be found in list l.
+func (e *dll) IsInList(l *dll) bool {
+	p := l.next
+	for p != e && p != l {
+		p = p.next
+	}
+	return p == e
+}
+
+// --------------------------------
+
+// A waiter represents a single waiter on a CV or a Mu.
+//
+// To wait:
+// Allocate a waiter struct *w with newWaiter(), set w.waiting=1, and
+// w.cvMu=nil or to the associated Mu if waiting on a condition variable, then
+// queue w.dll on some queue, and then wait using:
+//    for atomic.LoadUint32(&w.waiting) != 0 { w.sem.P() }
+// Return *w to the freepool by calling freeWaiter(w).
+//
+// To wakeup:
+// Remove *w from the relevant queue then:
+//  atomic.Store(&w.waiting, 0)
+//  w.sem.V()
+type waiter struct {
+	q             dll             // Doubly-linked list element.
+	sem           binarySemaphore // Thread waits on this semaphore.
+	deadlineTimer *time.Timer     // Used for waits with deadlines.
+
+	// If this waiter is waiting on a CV associated with a Mu, mu is a
+	// pointer to that Mu, otherwise nil
+	cvMu *Mu
+
+	// non-zero <=> the waiter is waiting (read and written atomically)
+	waiting uint32
+}
+
+var freeWaiters dll      // freeWaiters is a doubly-linked list of free waiter structs.
+var freeWaitersMu uint32 // spinlock protects freeWaiters
+
+// newWaiter() returns a pointer to an unused waiter struct.
+// Ensures that the enclosed timer is stopped and its channel drained.
+func newWaiter() (w *waiter) {
+	spinTestAndSet(&freeWaitersMu, 1, 1)
+	if freeWaiters.next == nil { // first time through, initialize the free list.
+		freeWaiters.MakeEmpty()
+	}
+	if !freeWaiters.IsEmpty() { // If free list is non-empty, dequeue an item.
+		var q *dll = freeWaiters.next
+		q.Remove()
+		w = q.elem
+	}
+	atomic.StoreUint32(&freeWaitersMu, 0) // release store
+	if w == nil {                         // If free list was empty, allocate an item.
+		w = new(waiter)
+		w.sem.Init()
+		w.deadlineTimer = time.NewTimer(time.Duration(math.MaxInt64))
+		w.deadlineTimer.Stop()
+		w.q.elem = w
+	}
+	return w
+}
+
+// freeWaiter() returns an unused waiter struct *w to the free pool.
+func freeWaiter(w *waiter) {
+	spinTestAndSet(&freeWaitersMu, 1, 1)
+	w.q.InsertAfter(&freeWaiters)
+	atomic.StoreUint32(&freeWaitersMu, 0) // release store
+}