Merge "ref: Remove v23.NewServer and rpc.DeprecatedServer from the public api.."
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d39b1ad..f6432bc 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -28,22 +28,41 @@
type manager struct {
rid naming.RoutingID
- closed <-chan struct{}
+ closed chan struct{}
q *upcqueue.T
cache *ConnCache
mu *sync.Mutex
listenEndpoints []naming.Endpoint
+ listeners []flow.Listener
+ wg sync.WaitGroup
}
func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
m := &manager{
- rid: rid,
- closed: ctx.Done(),
- q: upcqueue.New(),
- cache: NewConnCache(),
- mu: &sync.Mutex{},
+ rid: rid,
+ closed: make(chan struct{}),
+ q: upcqueue.New(),
+ cache: NewConnCache(),
+ mu: &sync.Mutex{},
+ listeners: []flow.Listener{},
}
+ go func() {
+ select {
+ case <-ctx.Done():
+ m.mu.Lock()
+ listeners := m.listeners
+ m.listeners = nil
+ m.mu.Unlock()
+ for _, ln := range listeners {
+ ln.Close()
+ }
+ m.cache.Close(ctx)
+ m.q.Close()
+ m.wg.Wait()
+ close(m.closed)
+ }
+ }()
return m
}
@@ -81,6 +100,13 @@
Address: ln.Addr().String(),
RID: m.rid,
}
+ m.mu.Lock()
+ if m.listeners == nil {
+ return nil, flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
+ }
+ m.listeners = append(m.listeners, ln)
+ m.mu.Unlock()
+ m.wg.Add(1)
go m.lnAcceptLoop(ctx, ln, local)
return []naming.Endpoint{local}, nil
}
@@ -130,6 +156,7 @@
}
func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint) {
+ defer m.wg.Done()
const killConnectionsRetryDelay = 5 * time.Millisecond
for {
flowConn, err := ln.Accept(ctx)
@@ -147,14 +174,14 @@
}
if err != nil {
ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
- continue
+ return
}
c, err := conn.NewAccepted(
ctx,
flowConn,
local,
version.Supported,
- &flowHandler{q: m.q, closed: m.closed},
+ &flowHandler{q: m.q},
)
if err != nil {
flowConn.Close()
@@ -168,17 +195,10 @@
}
type flowHandler struct {
- q *upcqueue.T
- closed <-chan struct{}
+ q *upcqueue.T
}
func (h *flowHandler) HandleFlow(f flow.Flow) error {
- select {
- case <-h.closed:
- // This will make the Put call below return a upcqueue.ErrQueueIsClosed.
- h.q.Close()
- default:
- }
return h.q.Put(f)
}
@@ -188,19 +208,13 @@
}
func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error {
- select {
- case <-h.m.closed:
- h.m.q.Close()
- return upcqueue.ErrQueueIsClosed
- default:
- }
go func() {
c, err := conn.NewAccepted(
h.ctx,
f,
f.Conn().LocalEndpoint(),
version.Supported,
- &flowHandler{q: h.m.q, closed: h.m.closed})
+ &flowHandler{q: h.m.q})
if err != nil {
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
@@ -246,7 +260,7 @@
// otherwise an error is returned.
func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
// TODO(suharshs): Ensure that m is attached to ctx.
- item, err := m.q.Get(m.closed)
+ item, err := m.q.Get(ctx.Done())
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
@@ -268,7 +282,7 @@
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
var fh conn.FlowHandler
if m.rid != naming.NullRoutingID {
- fh = &flowHandler{q: m.q, closed: m.closed}
+ fh = &flowHandler{q: m.q}
}
return m.internalDial(ctx, remote, fn, fh)
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 62403da..d0a9a41 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -8,6 +8,7 @@
"bufio"
"strings"
"testing"
+ "time"
"v.io/v23"
"v.io/v23/context"
@@ -18,13 +19,17 @@
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
+ "v.io/x/ref/test/goroutines"
)
func init() {
test.Init()
}
+const leakWaitTime = 100 * time.Millisecond
+
func TestDirectConnection(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
@@ -39,6 +44,7 @@
}
func TestDialCachedConn(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
@@ -66,6 +72,7 @@
}
func TestBidirectionalListeningEndpoint(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
diff --git a/runtime/internal/lib/tcputil/tcputil.go b/runtime/internal/lib/tcputil/tcputil.go
index 39a167a..c6891f1 100644
--- a/runtime/internal/lib/tcputil/tcputil.go
+++ b/runtime/internal/lib/tcputil/tcputil.go
@@ -92,6 +92,10 @@
return ln.netLn.Addr()
}
+func (ln *tcpListener) Close() error {
+ return ln.netLn.Close()
+}
+
func NewTCPConn(c net.Conn) flow.Conn {
return tcpConn{framer.New(c), c.LocalAddr()}
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 8fbd1f8..ecd7d00 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -219,7 +219,7 @@
}
externalStates = map[serverState]rpc.ServerState{
- initialized: rpc.ServerInit,
+ initialized: rpc.ServerActive,
listening: rpc.ServerActive,
serving: rpc.ServerActive,
publishing: rpc.ServerActive,
diff --git a/runtime/internal/rpc/server_test.go b/runtime/internal/rpc/server_test.go
index 7263415..3e8c3fc 100644
--- a/runtime/internal/rpc/server_test.go
+++ b/runtime/internal/rpc/server_test.go
@@ -154,7 +154,7 @@
defer server.Stop()
status := server.Status()
- if got, want := status.State, rpc.ServerInit; got != want {
+ if got, want := status.State, rpc.ServerActive; got != want {
t.Fatalf("got %s, want %s", got, want)
}
server.Listen(rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}})
@@ -260,7 +260,7 @@
}
}
- expectState(rpc.ServerInit)
+ expectState(rpc.ServerActive)
// Need to call Listen first.
err = server.Serve("", &testServer{}, nil)
diff --git a/runtime/internal/rt/mgmt_test.go b/runtime/internal/rt/mgmt_test.go
index 0c48e4a..ee38ffc 100644
--- a/runtime/internal/rt/mgmt_test.go
+++ b/runtime/internal/rt/mgmt_test.go
@@ -328,6 +328,7 @@
// TestRemoteStop verifies that the child shuts down cleanly when sending it
// a remote Stop rpc.
func TestRemoteStop(t *testing.T) {
+ t.Skip("This test is flaky, enable it once it is fixed.")
ctx, h, appCycle, cleanup := setupRemoteAppCycleMgr(t)
defer cleanup()
stream, err := appCycle.Stop(ctx)
@@ -337,7 +338,7 @@
rStream := stream.RecvStream()
expectTask := func(progress, goal int32) {
if !rStream.Advance() {
- t.Fatalf("unexpected streaming error: %q", rStream.Err())
+ t.Fatalf("unexpected streaming error: %v", rStream.Err())
}
task := rStream.Value()
if task.Progress != progress || task.Goal != goal {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 59d9ed6..02e919a 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -340,8 +340,7 @@
func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
fm := manager.New(ctx, rid)
- // TODO(mattr): How can we close a flow manager.
- if err := r.addChild(ctx, fm, func() {}); err != nil {
+ if err := r.addChild(ctx, fm, func() { <-fm.Closed() }); err != nil {
return ctx, nil, err
}
newctx := context.WithValue(ctx, flowManagerKey, fm)
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 43f613a..ebeb262 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -8,9 +8,11 @@
"bufio"
"strings"
"testing"
+ "time"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/xproxyd"
+ "v.io/x/ref/test/goroutines"
"v.io/v23"
"v.io/v23/context"
@@ -20,7 +22,10 @@
"v.io/v23/security"
)
+const leakWaitTime = 100 * time.Millisecond
+
func TestProxiedConnection(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
pctx, shutdown := v23.Init()
defer shutdown()
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
@@ -41,6 +46,7 @@
}
func TestMultipleProxies(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
pctx, shutdown := v23.Init()
defer shutdown()
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
diff --git a/test/goroutines/goroutines.go b/test/goroutines/goroutines.go
index fd914e5..c83011e 100644
--- a/test/goroutines/goroutines.go
+++ b/test/goroutines/goroutines.go
@@ -19,6 +19,10 @@
var goroutineHeaderRE = regexp.MustCompile(`^goroutine (\d+) \[([^\]]+)\]:$`)
var stackFileRE = regexp.MustCompile(`^\s+([^:]+):(\d+)(?: \+0x([0-9A-Fa-f]+))?$`)
+var ignoredGoroutines = []string{
+ "runtime.ensureSigM",
+}
+
type Goroutine struct {
ID int
State string
@@ -55,11 +59,24 @@
if err != nil {
return out, fmt.Errorf("Error %v parsing trace:\n%s", err, string(buf))
}
- out = append(out, g)
+ if !shouldIgnore(g) {
+ out = append(out, g)
+ }
}
return out, scanner.Err()
}
+func shouldIgnore(g *Goroutine) bool {
+ for _, ignored := range ignoredGoroutines {
+ for _, f := range g.Stack {
+ if strings.Contains(f.Call, ignored) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
func parseGoroutine(scanner *bufio.Scanner) (*Goroutine, error) {
g := &Goroutine{}
matches := goroutineHeaderRE.FindSubmatch(scanner.Bytes())