ref/runtime/internal/flow/conn: Ensure that we don't leak goroutines.

Also add a library to check for leaking goroutines.

Change-Id: Iccb56dcd28813b80535b885b68383c6751534163
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index eff876d..afccf37 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -23,7 +23,8 @@
 	if err != nil {
 		return err
 	}
-	c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
+	c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
+		c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
 	if err = c.readRemoteAuth(ctx, binding); err != nil {
 		return err
 	}
@@ -59,7 +60,8 @@
 	if err != nil {
 		return err
 	}
-	c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), false)
+	c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
+		c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), false)
 	signedBinding, err := v23.GetPrincipal(ctx).Sign(binding)
 	if err != nil {
 		return err
@@ -171,7 +173,7 @@
 	byBKey  map[uint64]*Blessings
 }
 
-func newBlessingsFlow(ctx *context.T, f flow.Flow, dialed bool) *blessingsFlow {
+func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f flow.Flow, dialed bool) *blessingsFlow {
 	b := &blessingsFlow{
 		enc:     vom.NewEncoder(f),
 		dec:     vom.NewDecoder(f),
@@ -183,7 +185,8 @@
 	if !dialed {
 		b.nextKey++
 	}
-	go b.readLoop(ctx)
+	loopWG.Add(1)
+	go b.readLoop(ctx, loopWG)
 	return b
 }
 
@@ -233,7 +236,8 @@
 	return security.Blessings{}, nil, NewErrBlessingsFlowClosed(ctx)
 }
 
-func (b *blessingsFlow) readLoop(ctx *context.T) {
+func (b *blessingsFlow) readLoop(ctx *context.T, loopWG *sync.WaitGroup) {
+	defer loopWG.Done()
 	for {
 		var received Blessings
 		err := b.dec.Decode(&received)
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index df8fee6..8ebd51f 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -13,6 +13,7 @@
 	"v.io/v23/security"
 	"v.io/v23/verror"
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test/goroutines"
 	"v.io/x/ref/test/testutil"
 )
 
@@ -49,6 +50,8 @@
 }
 
 func TestUnidirectional(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	dctx, shutdown := v23.Init()
 	defer shutdown()
 	actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
@@ -57,6 +60,8 @@
 	}
 	aflows := make(chan flow.Flow, 2)
 	dc, ac, _ := setupConns(t, dctx, actx, nil, aflows)
+	defer dc.Close(dctx, nil)
+	defer ac.Close(actx, nil)
 
 	df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
 	af1 := <-aflows
@@ -82,6 +87,8 @@
 }
 
 func TestBidirectional(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	dctx, shutdown := v23.Init()
 	defer shutdown()
 	actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
@@ -91,6 +98,8 @@
 	dflows := make(chan flow.Flow, 2)
 	aflows := make(chan flow.Flow, 2)
 	dc, ac, _ := setupConns(t, dctx, actx, dflows, aflows)
+	defer dc.Close(dctx, nil)
+	defer ac.Close(actx, nil)
 
 	df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
 	af1 := <-aflows
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index fd73c50..4128297 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -12,9 +12,12 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/test/goroutines"
 )
 
 func TestRemoteDialerClose(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -27,6 +30,8 @@
 }
 
 func TestRemoteAcceptorClose(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -39,6 +44,8 @@
 }
 
 func TestUnderlyingConnectionClosed(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -48,6 +55,8 @@
 }
 
 func TestDialAfterConnClose(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	d, a, _ := setupConns(t, ctx, ctx, nil, nil)
@@ -64,10 +73,12 @@
 }
 
 func TestReadWriteAfterConnClose(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	for _, dialerDials := range []bool{true, false} {
-		df, flows := setupFlow(t, ctx, ctx, dialerDials)
+		df, flows, cl := setupFlow(t, ctx, ctx, dialerDials)
 		if _, err := df.WriteMsg([]byte("hello")); err != nil {
 			t.Fatalf("write failed: %v", err)
 		}
@@ -93,14 +104,18 @@
 		if _, err := af.ReadMsg(); err == nil {
 			t.Fatalf("nil error for read after close.")
 		}
+		cl()
 	}
 }
 
 func TestFlowCancelOnWrite(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	dctx, cancel := context.WithCancel(ctx)
-	df, accept := setupFlow(t, dctx, ctx, true)
+	df, accept, cl := setupFlow(t, dctx, ctx, true)
+	defer cl()
 	done := make(chan struct{})
 	go func() {
 		if _, err := df.WriteMsg([]byte("hello")); err != nil {
@@ -122,10 +137,13 @@
 }
 
 func TestFlowCancelOnRead(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	dctx, cancel := context.WithCancel(ctx)
-	df, accept := setupFlow(t, dctx, ctx, true)
+	df, accept, cl := setupFlow(t, dctx, ctx, true)
+	defer cl()
 	done := make(chan struct{})
 	go func() {
 		if _, err := df.WriteMsg([]byte("hello")); err != nil {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 321abde..f9ac79c 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -49,7 +49,6 @@
 }
 
 // Conns are a multiplexing encrypted channels that can host Flows.
-// TODO(mattr): track and clean up all spawned goroutines.
 type Conn struct {
 	fc                       *flowcontrol.FlowController
 	mp                       *messagePipe
@@ -60,6 +59,7 @@
 	local, remote            naming.Endpoint
 	closed                   chan struct{}
 	blessingsFlow            *blessingsFlow
+	loopWG                   sync.WaitGroup
 
 	mu      sync.Mutex
 	nextFid flowID
@@ -91,6 +91,7 @@
 		c.Close(ctx, err)
 		return nil, err
 	}
+	c.loopWG.Add(1)
 	go c.readLoop(ctx)
 	return c, nil
 }
@@ -116,6 +117,7 @@
 		c.Close(ctx, err)
 		return nil, err
 	}
+	c.loopWG.Add(1)
 	go c.readLoop(ctx)
 	return c, nil
 }
@@ -154,23 +156,28 @@
 // with an error and no more flows will be sent to the FlowHandler.
 func (c *Conn) Closed() <-chan struct{} { return c.closed }
 
-// Close shuts down a conn.  This will cause the read loop
-// to exit.
+// Close shuts down a conn.
 func (c *Conn) Close(ctx *context.T, err error) {
 	c.mu.Lock()
 	var flows map[flowID]*flw
 	flows, c.flows = c.flows, nil
 	c.mu.Unlock()
+
 	if flows == nil {
-		// We've already torn this conn down.
+		// This conn is already being torn down.
+		<-c.closed
 		return
 	}
+	c.internalClose(ctx, err, flows)
+}
+
+func (c *Conn) internalClose(ctx *context.T, err error, flows map[flowID]*flw) {
 	if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
 		message := ""
 		if err != nil {
 			message = err.Error()
 		}
-		cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+		cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) {
 			return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
 		})
 		if cerr != nil {
@@ -183,8 +190,7 @@
 	if cerr := c.mp.close(); cerr != nil {
 		ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
 	}
-
-	// TODO(mattr): ensure the readLoop is finished before closing this.
+	c.loopWG.Wait()
 	close(c.closed)
 }
 
@@ -201,7 +207,7 @@
 		return
 	}
 
-	err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+	err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) {
 		err := c.mp.writeMsg(ctx, &release{
 			counters: counts,
 		})
@@ -290,5 +296,14 @@
 			break
 		}
 	}
-	c.Close(ctx, err)
+
+	c.mu.Lock()
+	var flows map[flowID]*flw
+	flows, c.flows = c.flows, nil
+	c.mu.Unlock()
+
+	c.loopWG.Done()
+	if flows != nil {
+		c.internalClose(ctx, err, flows)
+	}
 }
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 49bd59e..6851c79 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -9,14 +9,16 @@
 	"crypto/rand"
 	"io"
 	"testing"
+	"time"
 
 	"v.io/v23"
-	"v.io/v23/context"
-	"v.io/v23/flow"
 	_ "v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/test"
+	"v.io/x/ref/test/goroutines"
 )
 
+const leakWaitTime = 100 * time.Millisecond
+
 var randData []byte
 
 func init() {
@@ -27,7 +29,15 @@
 	}
 }
 
-func testWrite(t *testing.T, ctx *context.T, want []byte, df flow.Flow, flows <-chan flow.Flow) {
+func TestLargeWrite(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
+	ctx, shutdown := v23.Init()
+	df, flows, cl := setupFlow(t, ctx, ctx, true)
+	defer cl()
+	defer shutdown()
+
+	want := randData
 	finished := make(chan struct{})
 	go func(x []byte) {
 		mid := len(x) / 2
@@ -66,10 +76,3 @@
 	<-df.Closed()
 	<-af.Closed()
 }
-
-func TestLargeWrite(t *testing.T) {
-	ctx, shutdown := v23.Init()
-	defer shutdown()
-	df, flows := setupFlow(t, ctx, ctx, true)
-	testWrite(t, ctx, randData, df, flows)
-}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index a8f832f..fd301bc 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -5,6 +5,8 @@
 package conn
 
 import (
+	"strconv"
+
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/security"
@@ -32,7 +34,7 @@
 		id:     id,
 		dialed: dialed,
 		conn:   c,
-		worker: c.fc.NewWorker(flowPriority),
+		worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
 		q:      newReadQ(),
 		bkey:   bkey,
 		dkey:   dkey,
diff --git a/runtime/internal/flow/conn/readq_test.go b/runtime/internal/flow/conn/readq_test.go
index 865b246..baea620 100644
--- a/runtime/internal/flow/conn/readq_test.go
+++ b/runtime/internal/flow/conn/readq_test.go
@@ -9,6 +9,7 @@
 	"testing"
 
 	"v.io/v23"
+	"v.io/x/ref/test/goroutines"
 )
 
 func mkBufs(in ...string) [][]byte {
@@ -20,6 +21,8 @@
 }
 
 func TestReadqRead(t *testing.T) {
+	defer goroutines.NoLeaks(t, 0)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
@@ -45,6 +48,8 @@
 }
 
 func TestReadqGet(t *testing.T) {
+	defer goroutines.NoLeaks(t, 0)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
@@ -69,6 +74,8 @@
 }
 
 func TestReadqMixed(t *testing.T) {
+	defer goroutines.NoLeaks(t, 0)()
+
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 140515a..8cf9968 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -142,26 +142,19 @@
 	return <-dch, <-ach, w
 }
 
-func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
-	d, accepted := setupFlows(t, dctx, actx, dialFromDialer, 1)
-	return d[0], accepted
-}
-
-func setupFlows(t *testing.T, dctx, actx *context.T, dialFromDialer bool, n int) (dialed []flow.Flow, accepted <-chan flow.Flow) {
-	dflows, aflows := make(chan flow.Flow, n), make(chan flow.Flow, n)
+func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow, close func()) {
+	dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
 	d, a, _ := setupConns(t, dctx, actx, dflows, aflows)
 	if !dialFromDialer {
 		d, a = a, d
+		dctx, actx = actx, dctx
 		aflows, dflows = dflows, aflows
 	}
-	dialed = make([]flow.Flow, n)
-	for i := 0; i < n; i++ {
-		var err error
-		if dialed[i], err = d.Dial(dctx, testBFP); err != nil {
-			t.Fatalf("Unexpected error: %v", err)
-		}
+	df, err := d.Dial(dctx, testBFP)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
 	}
-	return dialed, aflows
+	return df, aflows, func() { d.Close(dctx, nil); a.Close(actx, nil) }
 }
 
 func testBFP(
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index 8221895..a27d48f 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -41,6 +41,7 @@
 // producers do not overwhelm consumers.  Only one Worker
 // will be executing at a time.
 type Worker struct {
+	name     string
 	fc       *FlowController
 	priority int
 	work     chan struct{}
@@ -51,6 +52,10 @@
 	next, prev *Worker // Used as a list when in an active queue.
 }
 
+func (w *Worker) String() string {
+	return fmt.Sprintf("%s(%p)", w.name, w)
+}
+
 // Run runs r potentially multiple times.
 // Only one worker's r function will run at a time for a given FlowController.
 // A single worker's Run function should not be called concurrently from multiple
@@ -71,7 +76,7 @@
 	for {
 		next := w.fc.nextWorkerLocked()
 		if w.fc.writing == w {
-			// We're already schedule to write, but we should bail
+			// We're already scheduled to write, but we should bail
 			// out if we're canceled.
 			select {
 			case <-ctx.Done():
@@ -82,8 +87,9 @@
 		for w.fc.writing != w && err == nil {
 			w.fc.mu.Unlock()
 			if next != nil {
-				next.work <- struct{}{}
+				next.notify()
 			}
+			ctx.VI(4).Infof("worker waiting: %s\nfc: %s", w, w.fc)
 			select {
 			case <-ctx.Done():
 				err = ctx.Err()
@@ -92,6 +98,7 @@
 			w.fc.mu.Lock()
 		}
 		if err != nil {
+			w.fc.writing = nil
 			break
 		}
 
@@ -138,7 +145,7 @@
 	next := w.fc.nextWorkerLocked()
 	w.fc.mu.Unlock()
 	if next != nil {
-		next.work <- struct{}{}
+		next.notify()
 	}
 	return err
 }
@@ -169,7 +176,7 @@
 	next := w.fc.nextWorkerLocked()
 	w.fc.mu.Unlock()
 	if next != nil {
-		next.work <- struct{}{}
+		next.notify()
 	}
 }
 
@@ -180,6 +187,13 @@
 	return w.counters.released > 0 || (!w.counters.everReleased && w.fc.shared > 0)
 }
 
+func (w *Worker) notify() {
+	select {
+	case w.work <- struct{}{}:
+	default:
+	}
+}
+
 // FlowController manages multiple Workers to ensure only one runs at a time.
 // The workers also obey counters so that producers don't overwhelm consumers.
 type FlowController struct {
@@ -204,11 +218,12 @@
 // execute is controlled by priority.  Higher priority
 // workers that are ready will run before any lower priority
 // workers.
-func (fc *FlowController) NewWorker(priority int) *Worker {
+func (fc *FlowController) NewWorker(name string, priority int) *Worker {
 	w := &Worker{
+		name:     name,
 		fc:       fc,
 		priority: priority,
-		work:     make(chan struct{}),
+		work:     make(chan struct{}, 1),
 		counters: &counterState{},
 	}
 	w.next, w.prev = w, w
@@ -233,7 +248,7 @@
 	next := fc.nextWorkerLocked()
 	fc.mu.Unlock()
 	if next != nil {
-		next.work <- struct{}{}
+		next.notify()
 	}
 	return nil
 }
@@ -241,11 +256,12 @@
 // Run runs the given runner on a non-flow controlled Worker.  This
 // worker does not wait for any flow control tokens and is limited
 // only by the MTU.
-func (fc *FlowController) Run(ctx *context.T, p int, r Runner) error {
+func (fc *FlowController) Run(ctx *context.T, name string, p int, r Runner) error {
 	w := &Worker{
+		name:     name,
 		fc:       fc,
 		priority: p,
-		work:     make(chan struct{}),
+		work:     make(chan struct{}, 1),
 	}
 	w.next, w.prev = w, w
 	return w.Run(ctx, r)
@@ -313,13 +329,15 @@
 	fmt.Fprintf(buf, "FlowController %p: \n", fc)
 
 	fc.mu.Lock()
-	fmt.Fprintf(buf, "writing: %p\n", fc.writing)
+	if fc.writing != nil {
+		fmt.Fprintf(buf, "writing: %s\n", fc.writing)
+	}
 	fmt.Fprintln(buf, "active:")
 	for p, head := range fc.active {
-		fmt.Fprintf(buf, "  %v: %p", p, head)
+		fmt.Fprintf(buf, "  %v: %s", p, head)
 		if head != nil {
 			for cur := head.next; cur != head; cur = cur.next {
-				fmt.Fprintf(buf, " %p", cur)
+				fmt.Fprintf(buf, " %s", cur)
 			}
 		}
 		fmt.Fprintln(buf, "")
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol_test.go b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
index ce42db5..3d9a297 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol_test.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
@@ -45,7 +45,7 @@
 	wg.Add(workers)
 	for i := 0; i < workers; i++ {
 		go func(idx int) {
-			el := fc.NewWorker(0)
+			el := fc.NewWorker(fmt.Sprintf("%d", idx), 0)
 			go el.Release(ctx, messages*5) // Try to make races happen
 			j := 0
 			el.Run(ctx, func(tokens int) (used int, done bool, err error) {
@@ -86,7 +86,7 @@
 
 	work := make(chan interface{})
 	worker := func(p int) *Worker {
-		w := fc.NewWorker(p)
+		w := fc.NewWorker(fmt.Sprintf("%d", p), p)
 		go w.Run(ctx, func(t int) (int, bool, error) {
 			work <- w
 			return t, false, nil
@@ -122,7 +122,7 @@
 	work := make(chan interface{})
 
 	worker := func(p int) *Worker {
-		w := fc.NewWorker(p)
+		w := fc.NewWorker(fmt.Sprintf("%d", p), p)
 		go w.Run(ctx, func(t int) (int, bool, error) {
 			work <- w
 			return t, false, nil
@@ -152,7 +152,7 @@
 	fc := New(mtu, mtu)
 
 	ready, wait := make(chan struct{}), make(chan struct{})
-	w := fc.NewWorker(0)
+	w := fc.NewWorker("", 0)
 	go w.Run(ctx, func(t int) (int, bool, error) {
 		close(ready)
 		<-wait
@@ -174,22 +174,22 @@
 	work := make(chan interface{})
 	ready, wait := make(chan struct{}), make(chan struct{})
 	// Start one worker running
-	go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+	go fc.Run(ctx, "0", 0, func(t int) (int, bool, error) {
 		close(ready)
 		<-wait
 		return t, true, nil
 	})
 	<-ready
 	// Now queue up sever workers and make sure they execute in order.
-	go fc.Run(ctx, 2, func(t int) (int, bool, error) {
+	go fc.Run(ctx, "2", 2, func(t int) (int, bool, error) {
 		work <- "c"
 		return t, true, nil
 	})
-	go fc.Run(ctx, 1, func(t int) (int, bool, error) {
+	go fc.Run(ctx, "1", 1, func(t int) (int, bool, error) {
 		work <- "b"
 		return t, true, nil
 	})
-	go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+	go fc.Run(ctx, "0", 0, func(t int) (int, bool, error) {
 		work <- "a"
 		return t, true, nil
 	})
@@ -249,7 +249,7 @@
 		wg.Add(workers)
 		for i := 0; i < workers; i++ {
 			go func(idx int) {
-				w := fc.NewWorker(0)
+				w := fc.NewWorker(fmt.Sprintf("%d", idx), 0)
 				w.Release(ctx, len(testdata))
 				t := testdata
 				err := w.Run(ctx, func(tokens int) (used int, done bool, err error) {
diff --git a/test/goroutines/goroutines.go b/test/goroutines/goroutines.go
new file mode 100644
index 0000000..fd914e5
--- /dev/null
+++ b/test/goroutines/goroutines.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package goroutines
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"io"
+	"regexp"
+	"runtime"
+	"strconv"
+	"strings"
+	"time"
+)
+
+var goroutineHeaderRE = regexp.MustCompile(`^goroutine (\d+) \[([^\]]+)\]:$`)
+var stackFileRE = regexp.MustCompile(`^\s+([^:]+):(\d+)(?: \+0x([0-9A-Fa-f]+))?$`)
+
+type Goroutine struct {
+	ID      int
+	State   string
+	Stack   []*Frame
+	Creator *Frame
+}
+
+// Get gets a set of currently running goroutines and parses them into a
+// structured representation.
+func Get() ([]*Goroutine, error) {
+	bufsize, read := 1<<20, 0
+	buf := make([]byte, bufsize)
+	for {
+		read = runtime.Stack(buf, true)
+		if read < bufsize {
+			buf = buf[:read]
+			break
+		}
+		bufsize *= 2
+		buf = make([]byte, bufsize)
+	}
+	return Parse(buf)
+}
+
+// Parse parses a stack trace into a structure representation.
+func Parse(buf []byte) ([]*Goroutine, error) {
+	scanner := bufio.NewScanner(bytes.NewReader(buf))
+	var out []*Goroutine
+	for scanner.Scan() {
+		if len(scanner.Bytes()) == 0 {
+			continue
+		}
+		g, err := parseGoroutine(scanner)
+		if err != nil {
+			return out, fmt.Errorf("Error %v parsing trace:\n%s", err, string(buf))
+		}
+		out = append(out, g)
+	}
+	return out, scanner.Err()
+}
+
+func parseGoroutine(scanner *bufio.Scanner) (*Goroutine, error) {
+	g := &Goroutine{}
+	matches := goroutineHeaderRE.FindSubmatch(scanner.Bytes())
+	if len(matches) != 3 {
+		return nil, fmt.Errorf("Could not parse goroutine header from: %s", scanner.Text())
+	}
+	id, err := strconv.ParseInt(string(matches[1]), 10, 64)
+	if err != nil {
+		return nil, err
+	}
+	g.ID = int(id)
+	g.State = string(matches[2])
+
+	for scanner.Scan() {
+		if len(scanner.Bytes()) == 0 {
+			break
+		}
+		frame, err := parseFrame(scanner)
+		if err != nil {
+			return nil, err
+		}
+		if strings.HasPrefix(frame.Call, "created by ") {
+			frame.Call = frame.Call[len("created by "):]
+			g.Creator = frame
+			break
+		}
+		g.Stack = append(g.Stack, frame)
+	}
+	return g, nil
+}
+
+func (g *Goroutine) writeTo(w io.Writer) {
+	fmt.Fprintf(w, "goroutine %d [%s]:\n", g.ID, g.State)
+	for _, f := range g.Stack {
+		f.writeTo(w)
+	}
+	if g.Creator != nil {
+		fmt.Fprint(w, "created by ")
+		g.Creator.writeTo(w)
+	}
+}
+
+// Frame represents a single stack frame.
+type Frame struct {
+	Call   string
+	File   string
+	Line   int
+	Offset int
+}
+
+func parseFrame(scanner *bufio.Scanner) (*Frame, error) {
+	f := &Frame{Call: scanner.Text()}
+	if !scanner.Scan() {
+		return nil, fmt.Errorf("Frame lacked a second line %s", f.Call)
+	}
+	matches := stackFileRE.FindSubmatch(scanner.Bytes())
+	if len(matches) < 4 {
+		return nil, fmt.Errorf("Could not parse file reference from %s", scanner.Text())
+	}
+	f.File = string(matches[1])
+	line, err := strconv.ParseInt(string(matches[2]), 10, 64)
+	if err != nil {
+		return nil, err
+	}
+	f.Line = int(line)
+	if len(matches[3]) > 0 {
+		offset, err := strconv.ParseInt(string(matches[3]), 16, 64)
+		if err != nil {
+			return nil, err
+		}
+		f.Offset = int(offset)
+	}
+	return f, nil
+}
+
+func (f *Frame) writeTo(w io.Writer) {
+	fmt.Fprintln(w, f.Call)
+	if f.Offset != 0 {
+		fmt.Fprintf(w, "\t%s:%d +0x%x\n", f.File, f.Line, f.Offset)
+	} else {
+		fmt.Fprintf(w, "\t%s:%d\n", f.File, f.Line)
+	}
+}
+
+// Format formats Goroutines back into the normal string representation.
+func Format(gs ...*Goroutine) []byte {
+	var buf bytes.Buffer
+	for i, g := range gs {
+		if i != 0 {
+			buf.WriteRune('\n')
+		}
+		g.writeTo(&buf)
+	}
+	return buf.Bytes()
+}
+
+// ErrorReporter is used by NoLeaks to report errors.  testing.T implements
+// this interface and is normally passed.
+type ErrorReporter interface {
+	Errorf(format string, args ...interface{})
+}
+
+// NoLeaks helps test that a test isn't leaving extra goroutines after it finishes.
+//
+// The normal way to use it is:
+// func TestFoo(t *testing.T) {
+//   defer goroutines.NoLeaks(t, time.Second)()
+//
+//   ... Normal test code here ...
+//
+// }
+//
+// The test will fail if there are goroutines running at the end of the test
+// that weren't running at the beginning.
+// Since testing for goroutines being finished can be racy, the detector
+// can wait the specified duration for the set of goroutines to return to the
+// initial set.
+func NoLeaks(t ErrorReporter, wait time.Duration) func() {
+	gs, err := Get()
+	if err != nil {
+		return func() {} // If we can't parse correctly we let the test pass.
+	}
+	bycreator := map[string]int{}
+	for _, g := range gs {
+		key := ""
+		if g.Creator != nil {
+			key = g.Creator.Call
+		}
+		bycreator[key]++
+	}
+	return func() {
+		var left []*Goroutine
+		backoff := 10 * time.Millisecond
+		start := time.Now()
+		until := start.Add(wait)
+		for {
+			cgs, err := Get()
+			if err != nil {
+				return // If we can't parse correctly we let the test pass.
+			}
+			left = left[:0]
+			cbycreator := map[string]int{}
+			for _, g := range cgs {
+				key := ""
+				if g.Creator != nil {
+					key = g.Creator.Call
+				}
+				cbycreator[key]++
+				if cbycreator[key] > bycreator[key] {
+					left = append(left, g)
+				}
+			}
+			if len(left) == 0 {
+				return
+			}
+			if time.Now().After(until) {
+				t.Errorf("%d extra Goroutines outstanding:\n %s", len(left),
+					string(Format(left...)))
+				return
+			}
+			time.Sleep(backoff)
+			if backoff = backoff * 2; backoff > time.Second {
+				backoff = time.Second
+			}
+		}
+	}
+}
diff --git a/test/goroutines/goroutines_test.go b/test/goroutines/goroutines_test.go
new file mode 100644
index 0000000..64e0079
--- /dev/null
+++ b/test/goroutines/goroutines_test.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package goroutines
+
+import (
+	"bytes"
+	"runtime"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+)
+
+func wrappedWaitForIt(wg *sync.WaitGroup, wait chan struct{}, n int64) {
+	if n == 0 {
+		waitForIt(wg, wait)
+	} else {
+		wrappedWaitForIt(wg, wait, n-1)
+	}
+}
+
+func waitForIt(wg *sync.WaitGroup, wait chan struct{}) {
+	wg.Done()
+	<-wait
+}
+
+func runGoA(wg *sync.WaitGroup, wait chan struct{}) {
+	go waitForIt(wg, wait)
+}
+
+func runGoB(wg *sync.WaitGroup, wait chan struct{}) {
+	go wrappedWaitForIt(wg, wait, 3)
+}
+
+func runGoC(wg *sync.WaitGroup, wait chan struct{}) {
+	go func() {
+		wg.Done()
+		<-wait
+	}()
+}
+
+func TestGet(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(3)
+	wait := make(chan struct{})
+	runGoA(&wg, wait)
+	runGoB(&wg, wait)
+	runGoC(&wg, wait)
+	wg.Wait()
+	gs, err := Get()
+	if err != nil {
+		t.Fatal(err)
+	}
+	close(wait)
+
+	if len(gs) < 4 {
+		t.Errorf("Got %d goroutines, expected at least 4", len(gs))
+	}
+	bycreator := map[string]*Goroutine{}
+	for _, g := range gs {
+		key := ""
+		if g.Creator != nil {
+			key = g.Creator.Call
+		}
+		bycreator[key] = g
+	}
+	a := bycreator["v.io/x/ref/test/goroutines.runGoA"]
+	if a == nil {
+		t.Errorf("runGoA is missing")
+	} else if len(a.Stack) != 1 {
+		t.Errorf("got %d expected 1: %#v", len(a.Stack), a.Stack)
+	} else if !strings.HasPrefix(a.Stack[0].Call, "v.io/x/ref/test/goroutines.waitForIt") {
+		t.Errorf("got %s, wanted it to start with v.io/x/ref/test/goroutines.waitForIt",
+			a.Stack[0].Call)
+	}
+	b := bycreator["v.io/x/ref/test/goroutines.runGoB"]
+	if b == nil {
+		t.Errorf("runGoB is missing")
+	} else if len(b.Stack) != 5 {
+		t.Errorf("got %d expected 1: %#v", len(b.Stack), b.Stack)
+	}
+	c := bycreator["v.io/x/ref/test/goroutines.runGoC"]
+	if c == nil {
+		t.Errorf("runGoC is missing")
+	} else if len(c.Stack) != 1 {
+		t.Errorf("got %d expected 1: %#v", len(c.Stack), c.Stack)
+	} else if !strings.HasPrefix(c.Stack[0].Call, "v.io/x/ref/test/goroutines.funcĀ·") {
+		t.Errorf("got %s, wanted it to start with v.io/x/ref/test/goroutines.funcĀ·",
+			c.Stack[0].Call)
+	}
+}
+
+func TestFormat(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(3)
+	wait := make(chan struct{})
+	runGoA(&wg, wait)
+	runGoB(&wg, wait)
+	runGoC(&wg, wait)
+	wg.Wait()
+
+	buf := make([]byte, 1<<20)
+	buf = buf[:runtime.Stack(buf, true)]
+	close(wait)
+
+	gs, err := Parse(buf)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if formatted := Format(gs...); !bytes.Equal(buf, formatted) {
+		t.Errorf("got:\n%s\nwanted:\n%s\n", string(formatted), string(buf))
+	}
+}
+
+type fakeErrorReporter struct {
+	calls     int
+	extra     int
+	formatted string
+}
+
+func (f *fakeErrorReporter) Errorf(format string, args ...interface{}) {
+	f.calls++
+	f.extra = args[0].(int)
+	f.formatted = args[1].(string)
+}
+
+func TestNoLeaks(t *testing.T) {
+	er := &fakeErrorReporter{}
+	f := NoLeaks(er, 100*time.Millisecond)
+
+	var wg sync.WaitGroup
+	wg.Add(3)
+	wait := make(chan struct{})
+	runGoA(&wg, wait)
+	runGoB(&wg, wait)
+	runGoC(&wg, wait)
+	wg.Wait()
+
+	f()
+	if er.calls != 1 {
+		t.Errorf("got %d, wanted 1: %s", er.calls, er.formatted)
+	}
+	if er.extra != 3 {
+		t.Errorf("got %d, wanted 3: %s", er.extra, er.formatted)
+	}
+	close(wait)
+
+	*er = fakeErrorReporter{}
+	f()
+	if er.calls != 0 {
+		t.Errorf("got %d, wanted 0: %s", er.calls, er.formatted)
+	}
+	if er.extra != 0 {
+		t.Errorf("got %d, wanted 0: %s", er.extra, er.formatted)
+	}
+}