TBR: x.ref/lib/pubsub: debug/fix test breakage on jenkins/linux.

Change-Id: Ibc933af49ab82c13749d5e80eb6dd558f4b89271
diff --git a/lib/pubsub/config_test.go b/lib/pubsub/config_test.go
index fcad24b..b354437 100644
--- a/lib/pubsub/config_test.go
+++ b/lib/pubsub/config_test.go
@@ -200,9 +200,15 @@
 	}
 }
 
-func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, waiter *sync.WaitGroup) {
+func consumer(t *testing.T, pub *pubsub.Publisher, limit, bufsize int, errch chan error, starter, waiter *sync.WaitGroup) {
+	defer close(errch)
 	ch := make(chan pubsub.Setting, bufsize)
-	st, _ := pub.ForkStream("net", ch)
+	st, err := pub.ForkStream("net", ch)
+	if err != nil {
+		errch <- err
+		return
+	}
+	starter.Done()
 	i, i2 := 0, 0
 	if st.Latest["i"] != nil {
 		i = int(st.Latest["i"].Value().(int))
@@ -218,25 +224,28 @@
 		switch v := s.Value().(type) {
 		case int:
 			if i%2 != 0 {
-				t.Errorf("expected a float, got an int")
-				break
+				errch <- fmt.Errorf("expected a float, got an int")
+				return
 			}
 			if v != i {
-				t.Errorf("got %d, want %d", v, i)
+				errch <- fmt.Errorf("got %d, want %d", v, i)
+				return
 			}
 		case float64:
 			if i%2 != 1 {
-				t.Errorf("expected an int, got a float")
-				break
+				errch <- fmt.Errorf("expected an int, got a float")
+				return
 			}
 			if v != float64(i) {
-				t.Errorf("got %f, want %f", v, float64(i))
+				errch <- fmt.Errorf("got %f, want %f", v, float64(i))
+				return
 			}
 		}
 		i++
 	}
 	if i < limit {
-		t.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+		errch <- fmt.Errorf("didn't read enough settings: got %d, want >= %d", i, limit)
+		return
 	}
 	waiter.Done()
 }
@@ -244,7 +253,10 @@
 func testStream(t *testing.T, consumerBufSize int) {
 	in := make(chan pubsub.Setting)
 	pub := pubsub.NewPublisher()
-	stop, _ := pub.CreateStream("net", "network settings", in)
+	stop, err := pub.CreateStream("net", "network settings", in)
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	rand.Seed(time.Now().UnixNano())
 	limit := rand.Intn(5000)
@@ -262,16 +274,34 @@
 	i := <-progress
 	t.Logf("limit/2 = %d", i)
 
-	// We use a lot of buffering in this unittest to ensure that
-	// we never miss any settings.
-	go consumer(t, pub, limit, consumerBufSize, &waiter)
-	go consumer(t, pub, limit, consumerBufSize, &waiter)
+	err1 := make(chan error, 1)
+	err2 := make(chan error, 1)
+	var starter sync.WaitGroup
+	starter.Add(2)
+	go consumer(t, pub, limit, consumerBufSize, err1, &starter, &waiter)
+	go consumer(t, pub, limit, consumerBufSize, err2, &starter, &waiter)
 
 	reached := <-progress
+	// Give the consumers a chance to get going before shutting down
+	// the producer.
+	starter.Wait()
+	time.Sleep(100 * time.Millisecond)
 	pub.Shutdown()
 	shutdown := <-progress
 	t.Logf("reached %d, shut down at %d", reached, shutdown)
 
+	// This is a little annoying, we check for the presence of errors on the error
+	// channels once everything has run its course  since it's not allowed to call
+	// t.Fatal/Errorf from a separate goroutine. We wait until here so that we
+	// don't block waiting for errors that don't occur when the tests all work.
+	err = <-err1
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = <-err2
+	if err != nil {
+		t.Fatal(err)
+	}
 	// Wait for all goroutines to finish.
 	waiter.Wait()
 }