TBR: x.ref/lib/pubsub: debug/fix test breakage on jenkins/linux.
Change-Id: Ibc933af49ab82c13749d5e80eb6dd558f4b89271
diff --git a/lib/pubsub/config_test.go b/lib/pubsub/config_test.go
index fcad24b..b354437 100644
--- a/lib/pubsub/config_test.go
+++ b/lib/pubsub/config_test.go
@@ -200,9 +200,15 @@
}
}
-func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, errch chan error, starter, waiter *sync.WaitGroup) {
+ defer close(errch)
ch := make(chan pubsub.Setting, bufsize)
- st, _ := pub.ForkStream("net", ch)
+ st, err := pub.ForkStream("net", ch)
+ if err != nil {
+ errch <- err
+ return
+ }
+ starter.Done()
i, i2 := 0, 0
if st.Latest["i"] != nil {
i = int(st.Latest["i"].Value().(int))
@@ -218,25 +224,28 @@
switch v := s.Value().(type) {
case int:
if i%2 != 0 {
- t.Errorf("expected a float, got an int")
- break
+ errch <- fmt.Errorf("expected a float, got an int")
+ return
}
if v != i {
- t.Errorf("got %d, want %d", v, i)
+ errch <- fmt.Errorf("got %d, want %d", v, i)
+ return
}
case float64:
if i%2 != 1 {
- t.Errorf("expected an int, got a float")
- break
+ errch <- fmt.Errorf("expected an int, got a float")
+ return
}
if v != float64(i) {
- t.Errorf("got %f, want %f", v, float64(i))
+ errch <- fmt.Errorf("got %f, want %f", v, float64(i))
+ return
}
}
i++
}
if i < limit {
- t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+ errch <- fmt.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+ return
}
waiter.Done()
}
@@ -244,7 +253,10 @@
func testStream(t *testing.T, consumerBufSize int) {
in := make(chan pubsub.Setting)
pub := pubsub.NewPublisher()
- stop, _ := pub.CreateStream("net", "network settings", in)
+ stop, err := pub.CreateStream("net", "network settings", in)
+ if err != nil {
+ t.Fatal(err)
+ }
rand.Seed(time.Now().UnixNano())
limit := rand.Intn(5000)
@@ -262,16 +274,34 @@
i := <-progress
t.Logf("limit/2 = %d", i)
- // We use a lot of buffering in this unittest to ensure that
- // we never miss any settings.
- go consumer(t, pub, limit, consumerBufSize, &waiter)
- go consumer(t, pub, limit, consumerBufSize, &waiter)
+ err1 := make(chan error, 1)
+ err2 := make(chan error, 1)
+ var starter sync.WaitGroup
+ starter.Add(2)
+ go consumer(t, pub, limit, consumerBufSize, err1, &starter, &waiter)
+ go consumer(t, pub, limit, consumerBufSize, err2, &starter, &waiter)
reached := <-progress
+ // Give the consumers a chance to get going before shutting down
+ // the producer.
+ starter.Wait()
+ time.Sleep(100 * time.Millisecond)
pub.Shutdown()
shutdown := <-progress
t.Logf("reached %d, shut down at %d", reached, shutdown)
+ // This is a little annoying, we check for the presence of errors on the error
+ // channels once everything has run its course since it's not allowed to call
+ // t.Fatal/Errorf from a separate goroutine. We wait until here so that we
+ // don't block waiting for errors that don't occur when the tests all work.
+ err = <-err1
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = <-err2
+ if err != nil {
+ t.Fatal(err)
+ }
// Wait for all goroutines to finish.
waiter.Wait()
}