Merge "runtime/internal/flow/conn: Fix flow control accounting when flows close.."
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16ac858..030ce8e 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -85,7 +85,10 @@
// Resolve of the discharge server name. The two resolve calls may be to
// the same mounttable.
c.loopWG.Add(1)
- go c.refreshDischarges(ctx)
+ go func() {
+ c.refreshDischarges(ctx)
+ c.loopWG.Done()
+ }()
return nil
}
@@ -105,7 +108,6 @@
lAuth := &message.Auth{
ChannelBinding: signedBinding,
}
- c.loopWG.Add(1)
if lAuth.BlessingsKey, lAuth.DischargeKey, err = c.refreshDischarges(ctx); err != nil {
return err
}
@@ -211,7 +213,6 @@
}
func (c *Conn) refreshDischarges(ctx *context.T) (bkey, dkey uint64, err error) {
- defer c.loopWG.Done()
dis := slib.PrepareDischarges(ctx, c.lBlessings,
security.DischargeImpetus{}, time.Minute)
// Schedule the next update.
@@ -221,6 +222,7 @@
c.loopWG.Add(1)
c.dischargeTimer = time.AfterFunc(dur, func() {
c.refreshDischarges(ctx)
+ c.loopWG.Done()
})
}
c.mu.Unlock()
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 80d0034..4806bef 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -340,7 +340,7 @@
}
c.status = Closing
- go func() {
+ go func(c *Conn) {
if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
msg := ""
if err != nil {
@@ -373,7 +373,7 @@
c.status = Closed
close(c.closed)
c.mu.Unlock()
- }()
+ }(c)
}
func (c *Conn) release(ctx *context.T, fid, count uint64) {
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index e4127a3..1cad267 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
import (
"io"
+ "sync"
"time"
"v.io/v23/context"
@@ -47,6 +48,9 @@
// borrowed indicates the number of tokens we have borrowed from the shared pool for
// sending on newly dialed flows.
borrowed uint64
+ // borrowCond is a condition variable that we can use to wait for shared
+ // counters to be released.
+ borrowCond *sync.Cond
// borrowing indicates whether this flow is using borrowed counters for a newly
// dialed flow. This will be set to false after we first receive a
// release from the remote end. This is always false for accepted flows.
@@ -60,14 +64,15 @@
func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, remote naming.Endpoint, dialed, preopen bool) *flw {
f := &flw{
- id: id,
- dialed: dialed,
- conn: c,
- q: newReadQ(c, id),
- bkey: bkey,
- dkey: dkey,
- opened: preopen,
- borrowing: dialed,
+ id: id,
+ dialed: dialed,
+ conn: c,
+ q: newReadQ(c, id),
+ bkey: bkey,
+ dkey: dkey,
+ opened: preopen,
+ borrowing: dialed,
+ borrowCond: sync.NewCond(&c.mu),
// It's important that this channel has a non-zero buffer. Sometimes this
// flow will be notifying itself, so if there's no buffer a deadlock will
// occur.
@@ -144,12 +149,16 @@
return int(max), func(used int) {
f.conn.lshared -= uint64(used)
f.borrowed += uint64(used)
+ f.ctx.VI(2).Infof("deducting %d borrowed tokens on flow %d(%p), total: %d", used, f.id, f, f.borrowed)
}
}
if f.released < max {
max = f.released
}
- return int(max), func(used int) { f.released -= uint64(used) }
+ return int(max), func(used int) {
+ f.released -= uint64(used)
+ f.ctx.VI(2).Infof("flow %d(%p) deducting %d tokens, %d left", f.id, f, used, f.released)
+ }
}
// releaseLocked releases some counters from a remote reader to the local
@@ -161,12 +170,16 @@
if f.borrowed < tokens {
n = f.borrowed
}
+ f.ctx.VI(2).Infof("Returning %d tokens borrowed by %d(%p)", f.borrowed, f.id, f)
tokens -= n
f.borrowed -= n
f.conn.lshared += n
+ f.borrowCond.Broadcast()
}
f.released += tokens
+ f.ctx.VI(2).Infof("Tokens release to %d(%p): %d => %d", f.id, f, tokens, f.released)
if f.writing {
+ f.ctx.VI(2).Infof("Activating writing flow %d(%p) now that we have tokens.", f.id, f)
f.conn.activateWriterLocked(f)
f.conn.notifyNextWriterLocked(nil)
}
@@ -176,6 +189,7 @@
if err = f.checkBlessings(); err != nil {
return 0, err
}
+ f.ctx.VI(2).Infof("starting write on flow %d(%p)", f.id, f)
select {
// Catch cancellations early. If we caught a cancel when waiting
// our turn below its possible that we were notified simultaneously.
@@ -212,6 +226,7 @@
if tokens == 0 {
// Oops, we really don't have data to send, probably because we've exhausted
// the remote buffer. deactivate ourselves but keep trying.
+ f.ctx.VI(2).Infof("Deactivating write on flow %d(%p) due to lack of tokens", f.id, f)
f.conn.deactivateWriterLocked(f)
continue
}
@@ -250,6 +265,7 @@
f.opened = true
}
f.writing = false
+ f.ctx.VI(2).Infof("finishing write on %d(%p): %v", f.id, f, err)
f.conn.deactivateWriterLocked(f)
f.conn.notifyNextWriterLocked(f)
f.conn.mu.Unlock()
@@ -394,8 +410,19 @@
}
func (f *flw) close(ctx *context.T, err error) {
+ closedRemotely := verror.ErrorID(err) == ErrFlowClosedRemotely.ID
+ f.conn.mu.Lock()
+ if closedRemotely {
+ // When the other side closes a flow, it implicitly releases all the
+ // counters used by that flow. That means we should release the shared
+ // counter to be used on other new flows.
+ f.conn.lshared += f.borrowed
+ f.borrowed = 0
+ }
+ f.borrowCond.Broadcast()
+ f.conn.mu.Unlock()
if f.q.close(ctx) {
- eid := verror.ErrorID(err)
+ f.ctx.VI(2).Infof("closing %d(%p): %v", f.id, f, err)
f.cancel()
// After cancel has been called no new writes will begin for this
// flow. There may be a write in progress, but it must finish
@@ -403,13 +430,12 @@
// can simply use sendMessageLocked to send the close flow
// message.
f.conn.mu.Lock()
- delete(f.conn.flows, f.id)
connClosing := f.conn.status == Closing
var serr error
if !f.opened {
// Closing a flow that was never opened.
f.conn.unopenedFlows.Done()
- } else if eid != ErrFlowClosedRemotely.ID && !connClosing {
+ } else if !closedRemotely && !connClosing {
// Note: If the conn is closing there is no point in trying to
// send the flow close message as it will fail. This is racy
// with the connection closing, but there are no ill-effects
@@ -419,6 +445,18 @@
Flags: message.CloseFlag,
})
}
+ if f.borrowed > 0 && f.conn.status < Closing {
+ f.conn.loopWG.Add(1)
+ go func() {
+ defer f.conn.loopWG.Done()
+ f.conn.mu.Lock()
+ for f.borrowed > 0 && f.conn.status < Closing {
+ f.borrowCond.Wait()
+ }
+ delete(f.conn.flows, f.id)
+ f.conn.mu.Unlock()
+ }()
+ }
f.conn.mu.Unlock()
if serr != nil {
ctx.Errorf("Could not send close flow message: %v", err)
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index ff1f83c..186a40e 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -18,9 +18,11 @@
"v.io/x/ref/runtime/internal/flow/flowtest"
_ "v.io/x/ref/runtime/internal/flow/protocols/local"
inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/test/goroutines"
)
func TestCache(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
@@ -31,7 +33,9 @@
RID: naming.FixedRoutingID(0x5555),
Blessings: []string{"A", "B", "C"},
}
- conn := makeConnAndFlow(t, ctx, remote).c
+ caf := makeConnAndFlow(t, ctx, remote)
+ defer caf.stop(ctx)
+ conn := caf.c
if err := c.Insert(conn, remote.Protocol, remote.Address); err != nil {
t.Fatal(err)
}
@@ -72,7 +76,9 @@
RID: naming.FixedRoutingID(0x1111),
Blessings: []string{"ridonly"},
}
- ridConn := makeConnAndFlow(t, ctx, ridEP).c
+ caf = makeConnAndFlow(t, ctx, ridEP)
+ defer caf.stop(ctx)
+ ridConn := caf.c
if err := c.InsertWithRoutingID(ridConn); err != nil {
t.Fatal(err)
}
@@ -90,7 +96,9 @@
RID: naming.FixedRoutingID(0x2222),
Blessings: []string{"other"},
}
- otherConn := makeConnAndFlow(t, ctx, otherEP).c
+ caf = makeConnAndFlow(t, ctx, otherEP)
+ defer caf.stop(ctx)
+ otherConn := caf.c
// Looking up a not yet inserted endpoint should fail.
if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings); err != nil || got != nil {
@@ -117,7 +125,9 @@
}
// Insert a duplicate conn to ensure that replaced conns still get closed.
- dupConn := makeConnAndFlow(t, ctx, remote).c
+ caf = makeConnAndFlow(t, ctx, remote)
+ defer caf.stop(ctx)
+ dupConn := caf.c
if err := c.Insert(dupConn, remote.Protocol, remote.Address); err != nil {
t.Fatal(err)
}
@@ -136,17 +146,20 @@
c.Close(ctx)
// Now the connections should be closed.
<-conn.Closed()
+ <-ridConn.Closed()
<-dupConn.Closed()
<-otherConn.Closed()
}
func TestLRU(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
// Ensure that the least recently created conns are killed by KillConnections.
c := NewConnCache()
- conns := nConnAndFlows(t, ctx, 10)
+ conns, stop := nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -178,7 +191,8 @@
// Ensure that writing to conns marks conns as more recently used.
c = NewConnCache()
- conns = nConnAndFlows(t, ctx, 10)
+ conns, stop = nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -213,7 +227,8 @@
// Ensure that reading from conns marks conns as more recently used.
c = NewConnCache()
- conns = nConnAndFlows(t, ctx, 10)
+ conns, stop = nConnAndFlows(t, ctx, 10)
+ defer stop()
for _, conn := range conns {
addr := conn.c.RemoteEndpoint().Addr()
if err := c.Insert(conn.c, addr.Network(), addr.String()); err != nil {
@@ -267,6 +282,7 @@
type connAndFlow struct {
c *connpackage.Conn
+ a *connpackage.Conn
f flow.Flow
}
@@ -284,7 +300,12 @@
}
}
-func nConnAndFlows(t *testing.T, ctx *context.T, n int) []connAndFlow {
+func (c connAndFlow) stop(ctx *context.T) {
+ c.c.Close(ctx, nil)
+ c.a.Close(ctx, nil)
+}
+
+func nConnAndFlows(t *testing.T, ctx *context.T, n int) ([]connAndFlow, func()) {
cfs := make([]connAndFlow, n)
for i := 0; i < n; i++ {
cfs[i] = makeConnAndFlow(t, ctx, &inaming.Endpoint{
@@ -292,7 +313,11 @@
RID: naming.FixedRoutingID(uint64(i + 1)), // We need to have a nonzero rid for bidi.
})
}
- return cfs
+ return cfs, func() {
+ for _, conn := range cfs {
+ conn.stop(ctx)
+ }
+ }
}
func makeConnAndFlow(t *testing.T, ctx *context.T, ep naming.Endpoint) connAndFlow {
@@ -318,7 +343,7 @@
ach <- a
}()
conn := <-dch
- <-ach
+ aconn := <-ach
f, err := conn.Dial(ctx, flowtest.AllowAllPeersAuthorizer{}, nil)
if err != nil {
t.Fatal(err)
@@ -328,7 +353,7 @@
t.Fatal(err)
}
<-fh.ch
- return connAndFlow{conn, f}
+ return connAndFlow{conn, aconn, f}
}
type fh struct {