veyron/runtimes/google/lib/sync: WaitGroup allows concurrent calls to Add and Wait.
Go's sync.WaitGroup requires all calls to Add to be made before Wait, and is
therefore difficult to use when the number of goroutines to wait for is not
known in advance.
WaitGroup does not require all calls to Add to be made before Wait, instead
calls to Add after Wait will fail.
New users of WaitGroup:
- in vif.go, WaitGroup replaces the interal waitGroup.
- in watcher.go, WaitGroup replaces sync.WaitGroup.
- in eval.go, WaitGroup replaces sync.WaitGroup.
-- abortAndWait replaces wait, and guarantees that TryAdd fails only after
the query has been aborted.
Change-Id: Ia818c5feae52b26485d324857c1d598b21c24a6e
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 56586d6..15ea933 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -21,6 +21,7 @@
"veyron/runtimes/google/lib/bqueue/drrqueue"
"veyron/runtimes/google/lib/iobuf"
"veyron/runtimes/google/lib/pcqueue"
+ vsync "veyron/runtimes/google/lib/sync"
"veyron/runtimes/google/lib/upcqueue"
"veyron2/ipc/stream"
@@ -39,7 +40,7 @@
localEP naming.Endpoint
vcMap *vcMap
- wpending, rpending *waitGroup
+ wpending, rpending vsync.WaitGroup
muListen sync.Mutex
acceptor *upcqueue.T // GUARDED_BY(muListen)
@@ -160,8 +161,6 @@
conn: conn,
pool: iobuf.NewPool(0),
vcMap: newVCMap(),
- wpending: newWaitGroup(),
- rpending: newWaitGroup(),
acceptor: acceptor,
listenerOpts: listenerOpts,
localEP: ep,
@@ -731,50 +730,3 @@
b.Release()
}
}
-
-// waitGroup implements a sync.WaitGroup like structure that does not require
-// all calls to Add to be made before Wait, instead calls to Add after Wait
-// will fail.
-type waitGroup struct {
- n int
- wait bool
- mu sync.Mutex
- cond *sync.Cond
-}
-
-func newWaitGroup() *waitGroup {
- w := &waitGroup{}
- w.cond = sync.NewCond(&w.mu)
- return w
-}
-
-func (w *waitGroup) TryAdd() bool {
- w.mu.Lock()
- defer w.mu.Unlock()
- if !w.wait {
- w.n++
- return true
- }
- return false
-}
-
-func (w *waitGroup) Done() {
- w.mu.Lock()
- defer w.mu.Unlock()
- w.n--
- if w.n < 0 {
- panic(fmt.Sprintf("more calls to Done than Add"))
- }
- if w.n == 0 {
- w.cond.Broadcast()
- }
-}
-
-func (w *waitGroup) Wait() {
- w.mu.Lock()
- w.wait = true
- for w.n > 0 {
- w.cond.Wait()
- }
- w.mu.Unlock()
-}
diff --git a/runtimes/google/lib/sync/doc.go b/runtimes/google/lib/sync/doc.go
new file mode 100644
index 0000000..3e184b9
--- /dev/null
+++ b/runtimes/google/lib/sync/doc.go
@@ -0,0 +1,2 @@
+// Package sync provides synchronization primitives.
+package sync
diff --git a/runtimes/google/lib/sync/semaphore.go b/runtimes/google/lib/sync/semaphore.go
index 1a4cc18..b6ea778 100644
--- a/runtimes/google/lib/sync/semaphore.go
+++ b/runtimes/google/lib/sync/semaphore.go
@@ -1,4 +1,3 @@
-// Package sync provides synchronization primitives.
package sync
import (
diff --git a/runtimes/google/lib/sync/wait_group.go b/runtimes/google/lib/sync/wait_group.go
new file mode 100644
index 0000000..3ee2ec1
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group.go
@@ -0,0 +1,57 @@
+package sync
+
+import "sync"
+
+// WaitGroup implements a sync.WaitGroup-like structure that does not require
+// all calls to Add to be made before Wait, instead calls to Add after Wait
+// will fail.
+//
+// As a result, WaitGroup cannot be "re-used" in the way that sync.WaitGroup
+// can. In the following example using sync.WaitGroup, Add, Done and Wait behave
+// in the same way in rounds 1 and 2.
+//
+// var wg sync.WaitGroup
+//
+// Round #1.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// Round #2.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// However, an equivalent program using WaitGroup would receive an error on the
+// second call to TryAdd.
+type WaitGroup struct {
+ mu sync.Mutex
+ waiting bool
+ pending sync.WaitGroup
+}
+
+// TryAdd attempts to increment the counter. If Wait has already been called,
+// TryAdd fails to increment the counter and returns false.
+// If the counter becomes zero, all goroutines blocked on Wait are released.
+func (w *WaitGroup) TryAdd() (added bool) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.waiting {
+ return false
+ }
+ w.pending.Add(1)
+ return true
+}
+
+// Done decrements the counter. If the counter goes negative, Done panics.
+func (w *WaitGroup) Done() {
+ w.pending.Done()
+}
+
+// Wait blocks until the counter is zero.
+func (w *WaitGroup) Wait() {
+ w.mu.Lock()
+ w.waiting = true
+ w.mu.Unlock()
+ w.pending.Wait()
+}
diff --git a/runtimes/google/lib/sync/wait_group_test.go b/runtimes/google/lib/sync/wait_group_test.go
new file mode 100644
index 0000000..7a6bc92
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group_test.go
@@ -0,0 +1,118 @@
+package sync
+
+import (
+ "testing"
+ "veyron/lib/testutil"
+)
+
+// TestRandom tests Wait after a random sequence of TryAdd's and Done's that
+// leaves the counter at 0.
+func TestRandom(t *testing.T) {
+ var w WaitGroup
+ N := 100
+
+ count := 0
+ for n := 0; n < N; n++ {
+ if count == 0 || testutil.Rand.Intn(2) == 0 {
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+ count++
+ continue
+ }
+ w.Done()
+ count--
+ }
+ for d := 0; d < count; d++ {
+ w.Done()
+ }
+
+ w.Wait()
+}
+
+func TestConcurrentWait(t *testing.T) {
+ for r := 0; r < 100; r++ {
+ var w WaitGroup
+
+ done := make(chan struct{}, 1)
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ go func() {
+ w.Wait()
+ // w.Wait() should not return before w.Done() sets the counter
+ // to 0.
+ // This test is not deterministic as we cannot infer the order
+ // in which Wait() and the last Done() return. Hopefully, bugs
+ // will revealed by repeating the test.
+ select {
+ case <-done:
+ default:
+ t.Fatal("Wait returned before Done.")
+ }
+ }()
+
+ for w.TryAdd() {
+ w.Done()
+ }
+ close(done)
+ w.Done()
+ }
+}
+
+func TestTryAddFailsAfterWait(t *testing.T) {
+ var w WaitGroup
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ go w.Wait()
+
+ // At some point, another goroutine will be in w.Wait() and w.TryAdd()
+ // should fail. If this doesn't happen, the test will never terminate.
+ for w.TryAdd() {
+ w.Done()
+ }
+ w.Done()
+}
+
+func TestIdempotentWait(t *testing.T) {
+ var w WaitGroup
+
+ done := make(chan struct{}, 1)
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ // w.Wait() should be idempotent.
+ for i := 0; i < 2; i++ {
+ go func() {
+ w.Wait()
+ select {
+ case <-done:
+ default:
+ t.Fatal("Wait returned before Done.")
+ }
+ }()
+ }
+
+ for w.TryAdd() {
+ w.Done()
+ }
+ close(done)
+ w.Done()
+}
+
+func TestDoneFailsBeforeAdd(t *testing.T) {
+ var w WaitGroup
+ defer func() {
+ if r := recover(); r == nil {
+ t.Fatal("Done succeeded before Add.")
+ }
+ }()
+ w.Done()
+}
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 3b65a30..d8a78d8 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -10,6 +10,7 @@
"sync"
"veyron2/vlog"
+ vsync "veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore/state"
"veyron/services/store/service"
@@ -46,7 +47,7 @@
// to this stack. The top of the stack is the end of the slice.
results []nestedChannel
// cleanup is used for testing to ensure that no goroutines are leaked.
- cleanup sync.WaitGroup
+ cleanup vsync.WaitGroup
// maxNesting is the largest value used for nestedChannel.nesting. It
// is the maximum nesting over the duration of the query while len(results)
// is just the instantaneous nesting.
@@ -207,7 +208,10 @@
results: []nestedChannel{nestedChannel{0, out}},
abort: make(chan struct{}),
}
- it.cleanup.Add(1)
+ if !it.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ return it
+ }
go evaluator.eval(&context{
sn: sn,
suffix: name.String(),
@@ -218,6 +222,7 @@
abort: it.abort,
cleanup: &it.cleanup,
})
+
return it
}
@@ -246,7 +251,7 @@
abort chan struct{}
// cleanup is used for testing to ensure that no goroutines are leaked.
// evaluator.eval implementations should call Done when finished processing.
- cleanup *sync.WaitGroup
+ cleanup *vsync.WaitGroup
}
// emit sends result on c.out. It is careful to watch for aborts. result can be
@@ -392,7 +397,11 @@
abort: c.abort,
cleanup: c.cleanup,
}
- c.cleanup.Add(1)
+ if !c.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ close(srcOut)
+ return srcOut
+ }
go src.eval(&srcContext)
return srcOut
}
@@ -543,7 +552,10 @@
abort: c.abort,
cleanup: c.cleanup,
}
- c.cleanup.Add(1)
+ if !c.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ return false
+ }
go a.evaluator.eval(ctxt)
// If the subpipeline would produce a single result, use that single result
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 5956110..4cf8562 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -5,9 +5,9 @@
"encoding/binary"
"errors"
"io"
- "sync"
"time"
+ "veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore"
"veyron/services/store/raw"
"veyron/services/store/service"
@@ -103,7 +103,9 @@
done := make(chan error, 1)
- w.pending.Add(1)
+ if !w.pending.TryAdd() {
+ return ErrWatchClosed
+ }
// This goroutine does not leak because processRequest is always terminated.
go func() {
defer w.pending.Done()