Merge "Make generated java files build"
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 56586d6..15ea933 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -21,6 +21,7 @@
"veyron/runtimes/google/lib/bqueue/drrqueue"
"veyron/runtimes/google/lib/iobuf"
"veyron/runtimes/google/lib/pcqueue"
+ vsync "veyron/runtimes/google/lib/sync"
"veyron/runtimes/google/lib/upcqueue"
"veyron2/ipc/stream"
@@ -39,7 +40,7 @@
localEP naming.Endpoint
vcMap *vcMap
- wpending, rpending *waitGroup
+ wpending, rpending vsync.WaitGroup
muListen sync.Mutex
acceptor *upcqueue.T // GUARDED_BY(muListen)
@@ -160,8 +161,6 @@
conn: conn,
pool: iobuf.NewPool(0),
vcMap: newVCMap(),
- wpending: newWaitGroup(),
- rpending: newWaitGroup(),
acceptor: acceptor,
listenerOpts: listenerOpts,
localEP: ep,
@@ -731,50 +730,3 @@
b.Release()
}
}
-
-// waitGroup implements a sync.WaitGroup like structure that does not require
-// all calls to Add to be made before Wait, instead calls to Add after Wait
-// will fail.
-type waitGroup struct {
- n int
- wait bool
- mu sync.Mutex
- cond *sync.Cond
-}
-
-func newWaitGroup() *waitGroup {
- w := &waitGroup{}
- w.cond = sync.NewCond(&w.mu)
- return w
-}
-
-func (w *waitGroup) TryAdd() bool {
- w.mu.Lock()
- defer w.mu.Unlock()
- if !w.wait {
- w.n++
- return true
- }
- return false
-}
-
-func (w *waitGroup) Done() {
- w.mu.Lock()
- defer w.mu.Unlock()
- w.n--
- if w.n < 0 {
- panic(fmt.Sprintf("more calls to Done than Add"))
- }
- if w.n == 0 {
- w.cond.Broadcast()
- }
-}
-
-func (w *waitGroup) Wait() {
- w.mu.Lock()
- w.wait = true
- for w.n > 0 {
- w.cond.Wait()
- }
- w.mu.Unlock()
-}
diff --git a/runtimes/google/lib/sync/doc.go b/runtimes/google/lib/sync/doc.go
new file mode 100644
index 0000000..3e184b9
--- /dev/null
+++ b/runtimes/google/lib/sync/doc.go
@@ -0,0 +1,2 @@
+// Package sync provides synchronization primitives.
+package sync
diff --git a/runtimes/google/lib/sync/semaphore.go b/runtimes/google/lib/sync/semaphore.go
index 1a4cc18..b6ea778 100644
--- a/runtimes/google/lib/sync/semaphore.go
+++ b/runtimes/google/lib/sync/semaphore.go
@@ -1,4 +1,3 @@
-// Package sync provides synchronization primitives.
package sync
import (
diff --git a/runtimes/google/lib/sync/wait_group.go b/runtimes/google/lib/sync/wait_group.go
new file mode 100644
index 0000000..3ee2ec1
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group.go
@@ -0,0 +1,57 @@
+package sync
+
+import "sync"
+
+// WaitGroup implements a sync.WaitGroup-like structure that does not require
+// all calls to Add to be made before Wait, instead calls to Add after Wait
+// will fail.
+//
+// As a result, WaitGroup cannot be "re-used" in the way that sync.WaitGroup
+// can. In the following example using sync.WaitGroup, Add, Done and Wait behave
+// in the same way in rounds 1 and 2.
+//
+// var wg sync.WaitGroup
+//
+// Round #1.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// Round #2.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// However, an equivalent program using WaitGroup would receive an error on the
+// second call to TryAdd.
+type WaitGroup struct {
+ mu sync.Mutex
+ waiting bool
+ pending sync.WaitGroup
+}
+
+// TryAdd attempts to increment the counter. If Wait has already been called,
+// TryAdd fails to increment the counter and returns false.
+// If the counter becomes zero, all goroutines blocked on Wait are released.
+func (w *WaitGroup) TryAdd() (added bool) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.waiting {
+ return false
+ }
+ w.pending.Add(1)
+ return true
+}
+
+// Done decrements the counter. If the counter goes negative, Done panics.
+func (w *WaitGroup) Done() {
+ w.pending.Done()
+}
+
+// Wait blocks until the counter is zero.
+func (w *WaitGroup) Wait() {
+ w.mu.Lock()
+ w.waiting = true
+ w.mu.Unlock()
+ w.pending.Wait()
+}
diff --git a/runtimes/google/lib/sync/wait_group_test.go b/runtimes/google/lib/sync/wait_group_test.go
new file mode 100644
index 0000000..7a6bc92
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group_test.go
@@ -0,0 +1,118 @@
+package sync
+
+import (
+ "testing"
+ "veyron/lib/testutil"
+)
+
+// TestRandom tests Wait after a random sequence of TryAdd's and Done's that
+// leaves the counter at 0.
+func TestRandom(t *testing.T) {
+ var w WaitGroup
+ N := 100
+
+ count := 0
+ for n := 0; n < N; n++ {
+ if count == 0 || testutil.Rand.Intn(2) == 0 {
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+ count++
+ continue
+ }
+ w.Done()
+ count--
+ }
+ for d := 0; d < count; d++ {
+ w.Done()
+ }
+
+ w.Wait()
+}
+
+func TestConcurrentWait(t *testing.T) {
+ for r := 0; r < 100; r++ {
+ var w WaitGroup
+
+ done := make(chan struct{}, 1)
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ go func() {
+ w.Wait()
+ // w.Wait() should not return before w.Done() sets the counter
+ // to 0.
+ // This test is not deterministic as we cannot infer the order
+ // in which Wait() and the last Done() return. Hopefully, bugs
+ // will revealed by repeating the test.
+ select {
+ case <-done:
+ default:
+ t.Fatal("Wait returned before Done.")
+ }
+ }()
+
+ for w.TryAdd() {
+ w.Done()
+ }
+ close(done)
+ w.Done()
+ }
+}
+
+func TestTryAddFailsAfterWait(t *testing.T) {
+ var w WaitGroup
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ go w.Wait()
+
+ // At some point, another goroutine will be in w.Wait() and w.TryAdd()
+ // should fail. If this doesn't happen, the test will never terminate.
+ for w.TryAdd() {
+ w.Done()
+ }
+ w.Done()
+}
+
+func TestIdempotentWait(t *testing.T) {
+ var w WaitGroup
+
+ done := make(chan struct{}, 1)
+
+ if !w.TryAdd() {
+ t.Fatal("TryAdd failed")
+ }
+
+ // w.Wait() should be idempotent.
+ for i := 0; i < 2; i++ {
+ go func() {
+ w.Wait()
+ select {
+ case <-done:
+ default:
+ t.Fatal("Wait returned before Done.")
+ }
+ }()
+ }
+
+ for w.TryAdd() {
+ w.Done()
+ }
+ close(done)
+ w.Done()
+}
+
+func TestDoneFailsBeforeAdd(t *testing.T) {
+ var w WaitGroup
+ defer func() {
+ if r := recover(); r == nil {
+ t.Fatal("Done succeeded before Add.")
+ }
+ }()
+ w.Done()
+}
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 3b65a30..d8a78d8 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -10,6 +10,7 @@
"sync"
"veyron2/vlog"
+ vsync "veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore/state"
"veyron/services/store/service"
@@ -46,7 +47,7 @@
// to this stack. The top of the stack is the end of the slice.
results []nestedChannel
// cleanup is used for testing to ensure that no goroutines are leaked.
- cleanup sync.WaitGroup
+ cleanup vsync.WaitGroup
// maxNesting is the largest value used for nestedChannel.nesting. It
// is the maximum nesting over the duration of the query while len(results)
// is just the instantaneous nesting.
@@ -207,7 +208,10 @@
results: []nestedChannel{nestedChannel{0, out}},
abort: make(chan struct{}),
}
- it.cleanup.Add(1)
+ if !it.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ return it
+ }
go evaluator.eval(&context{
sn: sn,
suffix: name.String(),
@@ -218,6 +222,7 @@
abort: it.abort,
cleanup: &it.cleanup,
})
+
return it
}
@@ -246,7 +251,7 @@
abort chan struct{}
// cleanup is used for testing to ensure that no goroutines are leaked.
// evaluator.eval implementations should call Done when finished processing.
- cleanup *sync.WaitGroup
+ cleanup *vsync.WaitGroup
}
// emit sends result on c.out. It is careful to watch for aborts. result can be
@@ -392,7 +397,11 @@
abort: c.abort,
cleanup: c.cleanup,
}
- c.cleanup.Add(1)
+ if !c.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ close(srcOut)
+ return srcOut
+ }
go src.eval(&srcContext)
return srcOut
}
@@ -543,7 +552,10 @@
abort: c.abort,
cleanup: c.cleanup,
}
- c.cleanup.Add(1)
+ if !c.cleanup.TryAdd() {
+ // The query has been aborted by a call to Cancel.
+ return false
+ }
go a.evaluator.eval(ctxt)
// If the subpipeline would produce a single result, use that single result
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 5956110..4cf8562 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -5,9 +5,9 @@
"encoding/binary"
"errors"
"io"
- "sync"
"time"
+ "veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore"
"veyron/services/store/raw"
"veyron/services/store/service"
@@ -103,7 +103,9 @@
done := make(chan error, 1)
- w.pending.Add(1)
+ if !w.pending.TryAdd() {
+ return ErrWatchClosed
+ }
// This goroutine does not leak because processRequest is always terminated.
go func() {
defer w.pending.Done()