Merge "Make generated java files build"
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 56586d6..15ea933 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -21,6 +21,7 @@
 	"veyron/runtimes/google/lib/bqueue/drrqueue"
 	"veyron/runtimes/google/lib/iobuf"
 	"veyron/runtimes/google/lib/pcqueue"
+	vsync "veyron/runtimes/google/lib/sync"
 	"veyron/runtimes/google/lib/upcqueue"
 
 	"veyron2/ipc/stream"
@@ -39,7 +40,7 @@
 	localEP naming.Endpoint
 
 	vcMap              *vcMap
-	wpending, rpending *waitGroup
+	wpending, rpending vsync.WaitGroup
 
 	muListen     sync.Mutex
 	acceptor     *upcqueue.T          // GUARDED_BY(muListen)
@@ -160,8 +161,6 @@
 		conn:         conn,
 		pool:         iobuf.NewPool(0),
 		vcMap:        newVCMap(),
-		wpending:     newWaitGroup(),
-		rpending:     newWaitGroup(),
 		acceptor:     acceptor,
 		listenerOpts: listenerOpts,
 		localEP:      ep,
@@ -731,50 +730,3 @@
 		b.Release()
 	}
 }
-
-// waitGroup implements a sync.WaitGroup like structure that does not require
-// all calls to Add to be made before Wait, instead calls to Add after Wait
-// will fail.
-type waitGroup struct {
-	n    int
-	wait bool
-	mu   sync.Mutex
-	cond *sync.Cond
-}
-
-func newWaitGroup() *waitGroup {
-	w := &waitGroup{}
-	w.cond = sync.NewCond(&w.mu)
-	return w
-}
-
-func (w *waitGroup) TryAdd() bool {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	if !w.wait {
-		w.n++
-		return true
-	}
-	return false
-}
-
-func (w *waitGroup) Done() {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	w.n--
-	if w.n < 0 {
-		panic(fmt.Sprintf("more calls to Done than Add"))
-	}
-	if w.n == 0 {
-		w.cond.Broadcast()
-	}
-}
-
-func (w *waitGroup) Wait() {
-	w.mu.Lock()
-	w.wait = true
-	for w.n > 0 {
-		w.cond.Wait()
-	}
-	w.mu.Unlock()
-}
diff --git a/runtimes/google/lib/sync/doc.go b/runtimes/google/lib/sync/doc.go
new file mode 100644
index 0000000..3e184b9
--- /dev/null
+++ b/runtimes/google/lib/sync/doc.go
@@ -0,0 +1,2 @@
+// Package sync provides synchronization primitives.
+package sync
diff --git a/runtimes/google/lib/sync/semaphore.go b/runtimes/google/lib/sync/semaphore.go
index 1a4cc18..b6ea778 100644
--- a/runtimes/google/lib/sync/semaphore.go
+++ b/runtimes/google/lib/sync/semaphore.go
@@ -1,4 +1,3 @@
-// Package sync provides synchronization primitives.
 package sync
 
 import (
diff --git a/runtimes/google/lib/sync/wait_group.go b/runtimes/google/lib/sync/wait_group.go
new file mode 100644
index 0000000..3ee2ec1
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group.go
@@ -0,0 +1,57 @@
+package sync
+
+import "sync"
+
+// WaitGroup implements a sync.WaitGroup-like structure that does not require
+// all calls to Add to be made before Wait, instead calls to Add after Wait
+// will fail.
+//
+// As a result, WaitGroup cannot be "re-used" in the way that sync.WaitGroup
+// can. In the following example using sync.WaitGroup, Add, Done and Wait behave
+// in the same way in rounds 1 and 2.
+//
+// var wg sync.WaitGroup
+//
+// Round #1.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// Round #2.
+// wg.Add(1)
+// go wg.Done()
+// wg.Wait()
+//
+// However, an equivalent program using WaitGroup would receive an error on the
+// second call to TryAdd.
+type WaitGroup struct {
+	mu      sync.Mutex
+	waiting bool
+	pending sync.WaitGroup
+}
+
+// TryAdd attempts to increment the counter. If Wait has already been called,
+// TryAdd fails to increment the counter and returns false.
+// If the counter becomes zero, all goroutines blocked on Wait are released.
+func (w *WaitGroup) TryAdd() (added bool) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.waiting {
+		return false
+	}
+	w.pending.Add(1)
+	return true
+}
+
+// Done decrements the counter. If the counter goes negative, Done panics.
+func (w *WaitGroup) Done() {
+	w.pending.Done()
+}
+
+// Wait blocks until the counter is zero.
+func (w *WaitGroup) Wait() {
+	w.mu.Lock()
+	w.waiting = true
+	w.mu.Unlock()
+	w.pending.Wait()
+}
diff --git a/runtimes/google/lib/sync/wait_group_test.go b/runtimes/google/lib/sync/wait_group_test.go
new file mode 100644
index 0000000..7a6bc92
--- /dev/null
+++ b/runtimes/google/lib/sync/wait_group_test.go
@@ -0,0 +1,118 @@
+package sync
+
+import (
+	"testing"
+	"veyron/lib/testutil"
+)
+
+// TestRandom tests Wait after a random sequence of TryAdd's and Done's that
+// leaves the counter at 0.
+func TestRandom(t *testing.T) {
+	var w WaitGroup
+	N := 100
+
+	count := 0
+	for n := 0; n < N; n++ {
+		if count == 0 || testutil.Rand.Intn(2) == 0 {
+			if !w.TryAdd() {
+				t.Fatal("TryAdd failed")
+			}
+			count++
+			continue
+		}
+		w.Done()
+		count--
+	}
+	for d := 0; d < count; d++ {
+		w.Done()
+	}
+
+	w.Wait()
+}
+
+func TestConcurrentWait(t *testing.T) {
+	for r := 0; r < 100; r++ {
+		var w WaitGroup
+
+		done := make(chan struct{}, 1)
+
+		if !w.TryAdd() {
+			t.Fatal("TryAdd failed")
+		}
+
+		go func() {
+			w.Wait()
+			// w.Wait() should not return before w.Done() sets the counter
+			// to 0.
+			// This test is not deterministic as we cannot infer the order
+			// in which Wait() and the last Done() return. Hopefully, bugs
+			// will revealed by repeating the test.
+			select {
+			case <-done:
+			default:
+				t.Fatal("Wait returned before Done.")
+			}
+		}()
+
+		for w.TryAdd() {
+			w.Done()
+		}
+		close(done)
+		w.Done()
+	}
+}
+
+func TestTryAddFailsAfterWait(t *testing.T) {
+	var w WaitGroup
+
+	if !w.TryAdd() {
+		t.Fatal("TryAdd failed")
+	}
+
+	go w.Wait()
+
+	// At some point, another goroutine will be in w.Wait() and w.TryAdd()
+	// should fail. If this doesn't happen, the test will never terminate.
+	for w.TryAdd() {
+		w.Done()
+	}
+	w.Done()
+}
+
+func TestIdempotentWait(t *testing.T) {
+	var w WaitGroup
+
+	done := make(chan struct{}, 1)
+
+	if !w.TryAdd() {
+		t.Fatal("TryAdd failed")
+	}
+
+	// w.Wait() should be idempotent.
+	for i := 0; i < 2; i++ {
+		go func() {
+			w.Wait()
+			select {
+			case <-done:
+			default:
+				t.Fatal("Wait returned before Done.")
+			}
+		}()
+	}
+
+	for w.TryAdd() {
+		w.Done()
+	}
+	close(done)
+	w.Done()
+}
+
+func TestDoneFailsBeforeAdd(t *testing.T) {
+	var w WaitGroup
+	defer func() {
+		if r := recover(); r == nil {
+			t.Fatal("Done succeeded before Add.")
+		}
+	}()
+	w.Done()
+}
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 3b65a30..d8a78d8 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -10,6 +10,7 @@
 	"sync"
 	"veyron2/vlog"
 
+	vsync "veyron/runtimes/google/lib/sync"
 	"veyron/services/store/memstore/state"
 	"veyron/services/store/service"
 
@@ -46,7 +47,7 @@
 	// to this stack.  The top of the stack is the end of the slice.
 	results []nestedChannel
 	// cleanup is used for testing to ensure that no goroutines are leaked.
-	cleanup sync.WaitGroup
+	cleanup vsync.WaitGroup
 	// maxNesting is the largest value used for nestedChannel.nesting.  It
 	// is the maximum nesting over the duration of the query while len(results)
 	// is just the instantaneous nesting.
@@ -207,7 +208,10 @@
 		results: []nestedChannel{nestedChannel{0, out}},
 		abort:   make(chan struct{}),
 	}
-	it.cleanup.Add(1)
+	if !it.cleanup.TryAdd() {
+		// The query has been aborted by a call to Cancel.
+		return it
+	}
 	go evaluator.eval(&context{
 		sn:       sn,
 		suffix:   name.String(),
@@ -218,6 +222,7 @@
 		abort:    it.abort,
 		cleanup:  &it.cleanup,
 	})
+
 	return it
 }
 
@@ -246,7 +251,7 @@
 	abort chan struct{}
 	// cleanup is used for testing to ensure that no goroutines are leaked.
 	// evaluator.eval implementations should call Done when finished processing.
-	cleanup *sync.WaitGroup
+	cleanup *vsync.WaitGroup
 }
 
 // emit sends result on c.out.  It is careful to watch for aborts.  result can be
@@ -392,7 +397,11 @@
 		abort:    c.abort,
 		cleanup:  c.cleanup,
 	}
-	c.cleanup.Add(1)
+	if !c.cleanup.TryAdd() {
+		// The query has been aborted by a call to Cancel.
+		close(srcOut)
+		return srcOut
+	}
 	go src.eval(&srcContext)
 	return srcOut
 }
@@ -543,7 +552,10 @@
 			abort:    c.abort,
 			cleanup:  c.cleanup,
 		}
-		c.cleanup.Add(1)
+		if !c.cleanup.TryAdd() {
+			// The query has been aborted by a call to Cancel.
+			return false
+		}
 		go a.evaluator.eval(ctxt)
 
 		// If the subpipeline would produce a single result, use that single result
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 5956110..4cf8562 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -5,9 +5,9 @@
 	"encoding/binary"
 	"errors"
 	"io"
-	"sync"
 	"time"
 
+	"veyron/runtimes/google/lib/sync"
 	"veyron/services/store/memstore"
 	"veyron/services/store/raw"
 	"veyron/services/store/service"
@@ -103,7 +103,9 @@
 
 	done := make(chan error, 1)
 
-	w.pending.Add(1)
+	if !w.pending.TryAdd() {
+		return ErrWatchClosed
+	}
 	// This goroutine does not leak because processRequest is always terminated.
 	go func() {
 		defer w.pending.Done()