ref/runtime/internal/flow/conn: Ensure that we don't leak goroutines.
Also add a library to check for leaking goroutines.
Change-Id: Iccb56dcd28813b80535b885b68383c6751534163
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index eff876d..afccf37 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -23,7 +23,8 @@
if err != nil {
return err
}
- c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
+ c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
+ c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
if err = c.readRemoteAuth(ctx, binding); err != nil {
return err
}
@@ -59,7 +60,8 @@
if err != nil {
return err
}
- c.blessingsFlow = newBlessingsFlow(ctx, c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), false)
+ c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
+ c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), false)
signedBinding, err := v23.GetPrincipal(ctx).Sign(binding)
if err != nil {
return err
@@ -171,7 +173,7 @@
byBKey map[uint64]*Blessings
}
-func newBlessingsFlow(ctx *context.T, f flow.Flow, dialed bool) *blessingsFlow {
+func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f flow.Flow, dialed bool) *blessingsFlow {
b := &blessingsFlow{
enc: vom.NewEncoder(f),
dec: vom.NewDecoder(f),
@@ -183,7 +185,8 @@
if !dialed {
b.nextKey++
}
- go b.readLoop(ctx)
+ loopWG.Add(1)
+ go b.readLoop(ctx, loopWG)
return b
}
@@ -233,7 +236,8 @@
return security.Blessings{}, nil, NewErrBlessingsFlowClosed(ctx)
}
-func (b *blessingsFlow) readLoop(ctx *context.T) {
+func (b *blessingsFlow) readLoop(ctx *context.T, loopWG *sync.WaitGroup) {
+ defer loopWG.Done()
for {
var received Blessings
err := b.dec.Decode(&received)
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index df8fee6..8ebd51f 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -13,6 +13,7 @@
"v.io/v23/security"
"v.io/v23/verror"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test/goroutines"
"v.io/x/ref/test/testutil"
)
@@ -49,6 +50,8 @@
}
func TestUnidirectional(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
dctx, shutdown := v23.Init()
defer shutdown()
actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
@@ -57,6 +60,8 @@
}
aflows := make(chan flow.Flow, 2)
dc, ac, _ := setupConns(t, dctx, actx, nil, aflows)
+ defer dc.Close(dctx, nil)
+ defer ac.Close(actx, nil)
df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
af1 := <-aflows
@@ -82,6 +87,8 @@
}
func TestBidirectional(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
dctx, shutdown := v23.Init()
defer shutdown()
actx, err := v23.WithPrincipal(dctx, testutil.NewPrincipal("acceptor"))
@@ -91,6 +98,8 @@
dflows := make(chan flow.Flow, 2)
aflows := make(chan flow.Flow, 2)
dc, ac, _ := setupConns(t, dctx, actx, dflows, aflows)
+ defer dc.Close(dctx, nil)
+ defer ac.Close(actx, nil)
df1 := dialFlow(t, dctx, dc, v23.GetPrincipal(dctx).BlessingStore().Default())
af1 := <-aflows
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index fd73c50..4128297 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -12,9 +12,12 @@
"v.io/v23"
"v.io/v23/context"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test/goroutines"
)
func TestRemoteDialerClose(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -27,6 +30,8 @@
}
func TestRemoteAcceptorClose(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -39,6 +44,8 @@
}
func TestUnderlyingConnectionClosed(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
d, a, w := setupConns(t, ctx, ctx, nil, nil)
@@ -48,6 +55,8 @@
}
func TestDialAfterConnClose(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
d, a, _ := setupConns(t, ctx, ctx, nil, nil)
@@ -64,10 +73,12 @@
}
func TestReadWriteAfterConnClose(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
for _, dialerDials := range []bool{true, false} {
- df, flows := setupFlow(t, ctx, ctx, dialerDials)
+ df, flows, cl := setupFlow(t, ctx, ctx, dialerDials)
if _, err := df.WriteMsg([]byte("hello")); err != nil {
t.Fatalf("write failed: %v", err)
}
@@ -93,14 +104,18 @@
if _, err := af.ReadMsg(); err == nil {
t.Fatalf("nil error for read after close.")
}
+ cl()
}
}
func TestFlowCancelOnWrite(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
dctx, cancel := context.WithCancel(ctx)
- df, accept := setupFlow(t, dctx, ctx, true)
+ df, accept, cl := setupFlow(t, dctx, ctx, true)
+ defer cl()
done := make(chan struct{})
go func() {
if _, err := df.WriteMsg([]byte("hello")); err != nil {
@@ -122,10 +137,13 @@
}
func TestFlowCancelOnRead(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
ctx, shutdown := v23.Init()
defer shutdown()
dctx, cancel := context.WithCancel(ctx)
- df, accept := setupFlow(t, dctx, ctx, true)
+ df, accept, cl := setupFlow(t, dctx, ctx, true)
+ defer cl()
done := make(chan struct{})
go func() {
if _, err := df.WriteMsg([]byte("hello")); err != nil {
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 321abde..f9ac79c 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -49,7 +49,6 @@
}
// Conns are a multiplexing encrypted channels that can host Flows.
-// TODO(mattr): track and clean up all spawned goroutines.
type Conn struct {
fc *flowcontrol.FlowController
mp *messagePipe
@@ -60,6 +59,7 @@
local, remote naming.Endpoint
closed chan struct{}
blessingsFlow *blessingsFlow
+ loopWG sync.WaitGroup
mu sync.Mutex
nextFid flowID
@@ -91,6 +91,7 @@
c.Close(ctx, err)
return nil, err
}
+ c.loopWG.Add(1)
go c.readLoop(ctx)
return c, nil
}
@@ -116,6 +117,7 @@
c.Close(ctx, err)
return nil, err
}
+ c.loopWG.Add(1)
go c.readLoop(ctx)
return c, nil
}
@@ -154,23 +156,28 @@
// with an error and no more flows will be sent to the FlowHandler.
func (c *Conn) Closed() <-chan struct{} { return c.closed }
-// Close shuts down a conn. This will cause the read loop
-// to exit.
+// Close shuts down a conn.
func (c *Conn) Close(ctx *context.T, err error) {
c.mu.Lock()
var flows map[flowID]*flw
flows, c.flows = c.flows, nil
c.mu.Unlock()
+
if flows == nil {
- // We've already torn this conn down.
+ // This conn is already being torn down.
+ <-c.closed
return
}
+ c.internalClose(ctx, err, flows)
+}
+
+func (c *Conn) internalClose(ctx *context.T, err error, flows map[flowID]*flw) {
if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
message := ""
if err != nil {
message = err.Error()
}
- cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) {
return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
})
if cerr != nil {
@@ -183,8 +190,7 @@
if cerr := c.mp.close(); cerr != nil {
ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
}
-
- // TODO(mattr): ensure the readLoop is finished before closing this.
+ c.loopWG.Wait()
close(c.closed)
}
@@ -201,7 +207,7 @@
return
}
- err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) {
err := c.mp.writeMsg(ctx, &release{
counters: counts,
})
@@ -290,5 +296,14 @@
break
}
}
- c.Close(ctx, err)
+
+ c.mu.Lock()
+ var flows map[flowID]*flw
+ flows, c.flows = c.flows, nil
+ c.mu.Unlock()
+
+ c.loopWG.Done()
+ if flows != nil {
+ c.internalClose(ctx, err, flows)
+ }
}
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 49bd59e..6851c79 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -9,14 +9,16 @@
"crypto/rand"
"io"
"testing"
+ "time"
"v.io/v23"
- "v.io/v23/context"
- "v.io/v23/flow"
_ "v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/test"
+ "v.io/x/ref/test/goroutines"
)
+const leakWaitTime = 100 * time.Millisecond
+
var randData []byte
func init() {
@@ -27,7 +29,15 @@
}
}
-func testWrite(t *testing.T, ctx *context.T, want []byte, df flow.Flow, flows <-chan flow.Flow) {
+func TestLargeWrite(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
+ ctx, shutdown := v23.Init()
+ df, flows, cl := setupFlow(t, ctx, ctx, true)
+ defer cl()
+ defer shutdown()
+
+ want := randData
finished := make(chan struct{})
go func(x []byte) {
mid := len(x) / 2
@@ -66,10 +76,3 @@
<-df.Closed()
<-af.Closed()
}
-
-func TestLargeWrite(t *testing.T) {
- ctx, shutdown := v23.Init()
- defer shutdown()
- df, flows := setupFlow(t, ctx, ctx, true)
- testWrite(t, ctx, randData, df, flows)
-}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index a8f832f..fd301bc 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -5,6 +5,8 @@
package conn
import (
+ "strconv"
+
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/security"
@@ -32,7 +34,7 @@
id: id,
dialed: dialed,
conn: c,
- worker: c.fc.NewWorker(flowPriority),
+ worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
q: newReadQ(),
bkey: bkey,
dkey: dkey,
diff --git a/runtime/internal/flow/conn/readq_test.go b/runtime/internal/flow/conn/readq_test.go
index 865b246..baea620 100644
--- a/runtime/internal/flow/conn/readq_test.go
+++ b/runtime/internal/flow/conn/readq_test.go
@@ -9,6 +9,7 @@
"testing"
"v.io/v23"
+ "v.io/x/ref/test/goroutines"
)
func mkBufs(in ...string) [][]byte {
@@ -20,6 +21,8 @@
}
func TestReadqRead(t *testing.T) {
+ defer goroutines.NoLeaks(t, 0)()
+
ctx, shutdown := v23.Init()
defer shutdown()
@@ -45,6 +48,8 @@
}
func TestReadqGet(t *testing.T) {
+ defer goroutines.NoLeaks(t, 0)()
+
ctx, shutdown := v23.Init()
defer shutdown()
@@ -69,6 +74,8 @@
}
func TestReadqMixed(t *testing.T) {
+ defer goroutines.NoLeaks(t, 0)()
+
ctx, shutdown := v23.Init()
defer shutdown()
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 140515a..8cf9968 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -142,26 +142,19 @@
return <-dch, <-ach, w
}
-func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow) {
- d, accepted := setupFlows(t, dctx, actx, dialFromDialer, 1)
- return d[0], accepted
-}
-
-func setupFlows(t *testing.T, dctx, actx *context.T, dialFromDialer bool, n int) (dialed []flow.Flow, accepted <-chan flow.Flow) {
- dflows, aflows := make(chan flow.Flow, n), make(chan flow.Flow, n)
+func setupFlow(t *testing.T, dctx, actx *context.T, dialFromDialer bool) (dialed flow.Flow, accepted <-chan flow.Flow, close func()) {
+ dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
d, a, _ := setupConns(t, dctx, actx, dflows, aflows)
if !dialFromDialer {
d, a = a, d
+ dctx, actx = actx, dctx
aflows, dflows = dflows, aflows
}
- dialed = make([]flow.Flow, n)
- for i := 0; i < n; i++ {
- var err error
- if dialed[i], err = d.Dial(dctx, testBFP); err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
+ df, err := d.Dial(dctx, testBFP)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
}
- return dialed, aflows
+ return df, aflows, func() { d.Close(dctx, nil); a.Close(actx, nil) }
}
func testBFP(
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index 8221895..a27d48f 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -41,6 +41,7 @@
// producers do not overwhelm consumers. Only one Worker
// will be executing at a time.
type Worker struct {
+ name string
fc *FlowController
priority int
work chan struct{}
@@ -51,6 +52,10 @@
next, prev *Worker // Used as a list when in an active queue.
}
+func (w *Worker) String() string {
+ return fmt.Sprintf("%s(%p)", w.name, w)
+}
+
// Run runs r potentially multiple times.
// Only one worker's r function will run at a time for a given FlowController.
// A single worker's Run function should not be called concurrently from multiple
@@ -71,7 +76,7 @@
for {
next := w.fc.nextWorkerLocked()
if w.fc.writing == w {
- // We're already schedule to write, but we should bail
+ // We're already scheduled to write, but we should bail
// out if we're canceled.
select {
case <-ctx.Done():
@@ -82,8 +87,9 @@
for w.fc.writing != w && err == nil {
w.fc.mu.Unlock()
if next != nil {
- next.work <- struct{}{}
+ next.notify()
}
+ ctx.VI(4).Infof("worker waiting: %s\nfc: %s", w, w.fc)
select {
case <-ctx.Done():
err = ctx.Err()
@@ -92,6 +98,7 @@
w.fc.mu.Lock()
}
if err != nil {
+ w.fc.writing = nil
break
}
@@ -138,7 +145,7 @@
next := w.fc.nextWorkerLocked()
w.fc.mu.Unlock()
if next != nil {
- next.work <- struct{}{}
+ next.notify()
}
return err
}
@@ -169,7 +176,7 @@
next := w.fc.nextWorkerLocked()
w.fc.mu.Unlock()
if next != nil {
- next.work <- struct{}{}
+ next.notify()
}
}
@@ -180,6 +187,13 @@
return w.counters.released > 0 || (!w.counters.everReleased && w.fc.shared > 0)
}
+func (w *Worker) notify() {
+ select {
+ case w.work <- struct{}{}:
+ default:
+ }
+}
+
// FlowController manages multiple Workers to ensure only one runs at a time.
// The workers also obey counters so that producers don't overwhelm consumers.
type FlowController struct {
@@ -204,11 +218,12 @@
// execute is controlled by priority. Higher priority
// workers that are ready will run before any lower priority
// workers.
-func (fc *FlowController) NewWorker(priority int) *Worker {
+func (fc *FlowController) NewWorker(name string, priority int) *Worker {
w := &Worker{
+ name: name,
fc: fc,
priority: priority,
- work: make(chan struct{}),
+ work: make(chan struct{}, 1),
counters: &counterState{},
}
w.next, w.prev = w, w
@@ -233,7 +248,7 @@
next := fc.nextWorkerLocked()
fc.mu.Unlock()
if next != nil {
- next.work <- struct{}{}
+ next.notify()
}
return nil
}
@@ -241,11 +256,12 @@
// Run runs the given runner on a non-flow controlled Worker. This
// worker does not wait for any flow control tokens and is limited
// only by the MTU.
-func (fc *FlowController) Run(ctx *context.T, p int, r Runner) error {
+func (fc *FlowController) Run(ctx *context.T, name string, p int, r Runner) error {
w := &Worker{
+ name: name,
fc: fc,
priority: p,
- work: make(chan struct{}),
+ work: make(chan struct{}, 1),
}
w.next, w.prev = w, w
return w.Run(ctx, r)
@@ -313,13 +329,15 @@
fmt.Fprintf(buf, "FlowController %p: \n", fc)
fc.mu.Lock()
- fmt.Fprintf(buf, "writing: %p\n", fc.writing)
+ if fc.writing != nil {
+ fmt.Fprintf(buf, "writing: %s\n", fc.writing)
+ }
fmt.Fprintln(buf, "active:")
for p, head := range fc.active {
- fmt.Fprintf(buf, " %v: %p", p, head)
+ fmt.Fprintf(buf, " %v: %s", p, head)
if head != nil {
for cur := head.next; cur != head; cur = cur.next {
- fmt.Fprintf(buf, " %p", cur)
+ fmt.Fprintf(buf, " %s", cur)
}
}
fmt.Fprintln(buf, "")
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol_test.go b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
index ce42db5..3d9a297 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol_test.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol_test.go
@@ -45,7 +45,7 @@
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(idx int) {
- el := fc.NewWorker(0)
+ el := fc.NewWorker(fmt.Sprintf("%d", idx), 0)
go el.Release(ctx, messages*5) // Try to make races happen
j := 0
el.Run(ctx, func(tokens int) (used int, done bool, err error) {
@@ -86,7 +86,7 @@
work := make(chan interface{})
worker := func(p int) *Worker {
- w := fc.NewWorker(p)
+ w := fc.NewWorker(fmt.Sprintf("%d", p), p)
go w.Run(ctx, func(t int) (int, bool, error) {
work <- w
return t, false, nil
@@ -122,7 +122,7 @@
work := make(chan interface{})
worker := func(p int) *Worker {
- w := fc.NewWorker(p)
+ w := fc.NewWorker(fmt.Sprintf("%d", p), p)
go w.Run(ctx, func(t int) (int, bool, error) {
work <- w
return t, false, nil
@@ -152,7 +152,7 @@
fc := New(mtu, mtu)
ready, wait := make(chan struct{}), make(chan struct{})
- w := fc.NewWorker(0)
+ w := fc.NewWorker("", 0)
go w.Run(ctx, func(t int) (int, bool, error) {
close(ready)
<-wait
@@ -174,22 +174,22 @@
work := make(chan interface{})
ready, wait := make(chan struct{}), make(chan struct{})
// Start one worker running
- go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+ go fc.Run(ctx, "0", 0, func(t int) (int, bool, error) {
close(ready)
<-wait
return t, true, nil
})
<-ready
// Now queue up sever workers and make sure they execute in order.
- go fc.Run(ctx, 2, func(t int) (int, bool, error) {
+ go fc.Run(ctx, "2", 2, func(t int) (int, bool, error) {
work <- "c"
return t, true, nil
})
- go fc.Run(ctx, 1, func(t int) (int, bool, error) {
+ go fc.Run(ctx, "1", 1, func(t int) (int, bool, error) {
work <- "b"
return t, true, nil
})
- go fc.Run(ctx, 0, func(t int) (int, bool, error) {
+ go fc.Run(ctx, "0", 0, func(t int) (int, bool, error) {
work <- "a"
return t, true, nil
})
@@ -249,7 +249,7 @@
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(idx int) {
- w := fc.NewWorker(0)
+ w := fc.NewWorker(fmt.Sprintf("%d", idx), 0)
w.Release(ctx, len(testdata))
t := testdata
err := w.Run(ctx, func(tokens int) (used int, done bool, err error) {
diff --git a/test/goroutines/goroutines.go b/test/goroutines/goroutines.go
new file mode 100644
index 0000000..fd914e5
--- /dev/null
+++ b/test/goroutines/goroutines.go
@@ -0,0 +1,229 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package goroutines
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "regexp"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var goroutineHeaderRE = regexp.MustCompile(`^goroutine (\d+) \[([^\]]+)\]:$`)
+var stackFileRE = regexp.MustCompile(`^\s+([^:]+):(\d+)(?: \+0x([0-9A-Fa-f]+))?$`)
+
+type Goroutine struct {
+ ID int
+ State string
+ Stack []*Frame
+ Creator *Frame
+}
+
+// Get gets a set of currently running goroutines and parses them into a
+// structured representation.
+func Get() ([]*Goroutine, error) {
+ bufsize, read := 1<<20, 0
+ buf := make([]byte, bufsize)
+ for {
+ read = runtime.Stack(buf, true)
+ if read < bufsize {
+ buf = buf[:read]
+ break
+ }
+ bufsize *= 2
+ buf = make([]byte, bufsize)
+ }
+ return Parse(buf)
+}
+
+// Parse parses a stack trace into a structure representation.
+func Parse(buf []byte) ([]*Goroutine, error) {
+ scanner := bufio.NewScanner(bytes.NewReader(buf))
+ var out []*Goroutine
+ for scanner.Scan() {
+ if len(scanner.Bytes()) == 0 {
+ continue
+ }
+ g, err := parseGoroutine(scanner)
+ if err != nil {
+ return out, fmt.Errorf("Error %v parsing trace:\n%s", err, string(buf))
+ }
+ out = append(out, g)
+ }
+ return out, scanner.Err()
+}
+
+func parseGoroutine(scanner *bufio.Scanner) (*Goroutine, error) {
+ g := &Goroutine{}
+ matches := goroutineHeaderRE.FindSubmatch(scanner.Bytes())
+ if len(matches) != 3 {
+ return nil, fmt.Errorf("Could not parse goroutine header from: %s", scanner.Text())
+ }
+ id, err := strconv.ParseInt(string(matches[1]), 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ g.ID = int(id)
+ g.State = string(matches[2])
+
+ for scanner.Scan() {
+ if len(scanner.Bytes()) == 0 {
+ break
+ }
+ frame, err := parseFrame(scanner)
+ if err != nil {
+ return nil, err
+ }
+ if strings.HasPrefix(frame.Call, "created by ") {
+ frame.Call = frame.Call[len("created by "):]
+ g.Creator = frame
+ break
+ }
+ g.Stack = append(g.Stack, frame)
+ }
+ return g, nil
+}
+
+func (g *Goroutine) writeTo(w io.Writer) {
+ fmt.Fprintf(w, "goroutine %d [%s]:\n", g.ID, g.State)
+ for _, f := range g.Stack {
+ f.writeTo(w)
+ }
+ if g.Creator != nil {
+ fmt.Fprint(w, "created by ")
+ g.Creator.writeTo(w)
+ }
+}
+
+// Frame represents a single stack frame.
+type Frame struct {
+ Call string
+ File string
+ Line int
+ Offset int
+}
+
+func parseFrame(scanner *bufio.Scanner) (*Frame, error) {
+ f := &Frame{Call: scanner.Text()}
+ if !scanner.Scan() {
+ return nil, fmt.Errorf("Frame lacked a second line %s", f.Call)
+ }
+ matches := stackFileRE.FindSubmatch(scanner.Bytes())
+ if len(matches) < 4 {
+ return nil, fmt.Errorf("Could not parse file reference from %s", scanner.Text())
+ }
+ f.File = string(matches[1])
+ line, err := strconv.ParseInt(string(matches[2]), 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ f.Line = int(line)
+ if len(matches[3]) > 0 {
+ offset, err := strconv.ParseInt(string(matches[3]), 16, 64)
+ if err != nil {
+ return nil, err
+ }
+ f.Offset = int(offset)
+ }
+ return f, nil
+}
+
+func (f *Frame) writeTo(w io.Writer) {
+ fmt.Fprintln(w, f.Call)
+ if f.Offset != 0 {
+ fmt.Fprintf(w, "\t%s:%d +0x%x\n", f.File, f.Line, f.Offset)
+ } else {
+ fmt.Fprintf(w, "\t%s:%d\n", f.File, f.Line)
+ }
+}
+
+// Format formats Goroutines back into the normal string representation.
+func Format(gs ...*Goroutine) []byte {
+ var buf bytes.Buffer
+ for i, g := range gs {
+ if i != 0 {
+ buf.WriteRune('\n')
+ }
+ g.writeTo(&buf)
+ }
+ return buf.Bytes()
+}
+
+// ErrorReporter is used by NoLeaks to report errors. testing.T implements
+// this interface and is normally passed.
+type ErrorReporter interface {
+ Errorf(format string, args ...interface{})
+}
+
+// NoLeaks helps test that a test isn't leaving extra goroutines after it finishes.
+//
+// The normal way to use it is:
+// func TestFoo(t *testing.T) {
+// defer goroutines.NoLeaks(t, time.Second)()
+//
+// ... Normal test code here ...
+//
+// }
+//
+// The test will fail if there are goroutines running at the end of the test
+// that weren't running at the beginning.
+// Since testing for goroutines being finished can be racy, the detector
+// can wait the specified duration for the set of goroutines to return to the
+// initial set.
+func NoLeaks(t ErrorReporter, wait time.Duration) func() {
+ gs, err := Get()
+ if err != nil {
+ return func() {} // If we can't parse correctly we let the test pass.
+ }
+ bycreator := map[string]int{}
+ for _, g := range gs {
+ key := ""
+ if g.Creator != nil {
+ key = g.Creator.Call
+ }
+ bycreator[key]++
+ }
+ return func() {
+ var left []*Goroutine
+ backoff := 10 * time.Millisecond
+ start := time.Now()
+ until := start.Add(wait)
+ for {
+ cgs, err := Get()
+ if err != nil {
+ return // If we can't parse correctly we let the test pass.
+ }
+ left = left[:0]
+ cbycreator := map[string]int{}
+ for _, g := range cgs {
+ key := ""
+ if g.Creator != nil {
+ key = g.Creator.Call
+ }
+ cbycreator[key]++
+ if cbycreator[key] > bycreator[key] {
+ left = append(left, g)
+ }
+ }
+ if len(left) == 0 {
+ return
+ }
+ if time.Now().After(until) {
+ t.Errorf("%d extra Goroutines outstanding:\n %s", len(left),
+ string(Format(left...)))
+ return
+ }
+ time.Sleep(backoff)
+ if backoff = backoff * 2; backoff > time.Second {
+ backoff = time.Second
+ }
+ }
+ }
+}
diff --git a/test/goroutines/goroutines_test.go b/test/goroutines/goroutines_test.go
new file mode 100644
index 0000000..64e0079
--- /dev/null
+++ b/test/goroutines/goroutines_test.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package goroutines
+
+import (
+ "bytes"
+ "runtime"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func wrappedWaitForIt(wg *sync.WaitGroup, wait chan struct{}, n int64) {
+ if n == 0 {
+ waitForIt(wg, wait)
+ } else {
+ wrappedWaitForIt(wg, wait, n-1)
+ }
+}
+
+func waitForIt(wg *sync.WaitGroup, wait chan struct{}) {
+ wg.Done()
+ <-wait
+}
+
+func runGoA(wg *sync.WaitGroup, wait chan struct{}) {
+ go waitForIt(wg, wait)
+}
+
+func runGoB(wg *sync.WaitGroup, wait chan struct{}) {
+ go wrappedWaitForIt(wg, wait, 3)
+}
+
+func runGoC(wg *sync.WaitGroup, wait chan struct{}) {
+ go func() {
+ wg.Done()
+ <-wait
+ }()
+}
+
+func TestGet(t *testing.T) {
+ var wg sync.WaitGroup
+ wg.Add(3)
+ wait := make(chan struct{})
+ runGoA(&wg, wait)
+ runGoB(&wg, wait)
+ runGoC(&wg, wait)
+ wg.Wait()
+ gs, err := Get()
+ if err != nil {
+ t.Fatal(err)
+ }
+ close(wait)
+
+ if len(gs) < 4 {
+ t.Errorf("Got %d goroutines, expected at least 4", len(gs))
+ }
+ bycreator := map[string]*Goroutine{}
+ for _, g := range gs {
+ key := ""
+ if g.Creator != nil {
+ key = g.Creator.Call
+ }
+ bycreator[key] = g
+ }
+ a := bycreator["v.io/x/ref/test/goroutines.runGoA"]
+ if a == nil {
+ t.Errorf("runGoA is missing")
+ } else if len(a.Stack) != 1 {
+ t.Errorf("got %d expected 1: %#v", len(a.Stack), a.Stack)
+ } else if !strings.HasPrefix(a.Stack[0].Call, "v.io/x/ref/test/goroutines.waitForIt") {
+ t.Errorf("got %s, wanted it to start with v.io/x/ref/test/goroutines.waitForIt",
+ a.Stack[0].Call)
+ }
+ b := bycreator["v.io/x/ref/test/goroutines.runGoB"]
+ if b == nil {
+ t.Errorf("runGoB is missing")
+ } else if len(b.Stack) != 5 {
+ t.Errorf("got %d expected 1: %#v", len(b.Stack), b.Stack)
+ }
+ c := bycreator["v.io/x/ref/test/goroutines.runGoC"]
+ if c == nil {
+ t.Errorf("runGoC is missing")
+ } else if len(c.Stack) != 1 {
+ t.Errorf("got %d expected 1: %#v", len(c.Stack), c.Stack)
+ } else if !strings.HasPrefix(c.Stack[0].Call, "v.io/x/ref/test/goroutines.funcĀ·") {
+ t.Errorf("got %s, wanted it to start with v.io/x/ref/test/goroutines.funcĀ·",
+ c.Stack[0].Call)
+ }
+}
+
+func TestFormat(t *testing.T) {
+ var wg sync.WaitGroup
+ wg.Add(3)
+ wait := make(chan struct{})
+ runGoA(&wg, wait)
+ runGoB(&wg, wait)
+ runGoC(&wg, wait)
+ wg.Wait()
+
+ buf := make([]byte, 1<<20)
+ buf = buf[:runtime.Stack(buf, true)]
+ close(wait)
+
+ gs, err := Parse(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if formatted := Format(gs...); !bytes.Equal(buf, formatted) {
+ t.Errorf("got:\n%s\nwanted:\n%s\n", string(formatted), string(buf))
+ }
+}
+
+type fakeErrorReporter struct {
+ calls int
+ extra int
+ formatted string
+}
+
+func (f *fakeErrorReporter) Errorf(format string, args ...interface{}) {
+ f.calls++
+ f.extra = args[0].(int)
+ f.formatted = args[1].(string)
+}
+
+func TestNoLeaks(t *testing.T) {
+ er := &fakeErrorReporter{}
+ f := NoLeaks(er, 100*time.Millisecond)
+
+ var wg sync.WaitGroup
+ wg.Add(3)
+ wait := make(chan struct{})
+ runGoA(&wg, wait)
+ runGoB(&wg, wait)
+ runGoC(&wg, wait)
+ wg.Wait()
+
+ f()
+ if er.calls != 1 {
+ t.Errorf("got %d, wanted 1: %s", er.calls, er.formatted)
+ }
+ if er.extra != 3 {
+ t.Errorf("got %d, wanted 3: %s", er.extra, er.formatted)
+ }
+ close(wait)
+
+ *er = fakeErrorReporter{}
+ f()
+ if er.calls != 0 {
+ t.Errorf("got %d, wanted 0: %s", er.calls, er.formatted)
+ }
+ if er.extra != 0 {
+ t.Errorf("got %d, wanted 0: %s", er.extra, er.formatted)
+ }
+}