Merge "runtime/internal/flow/conn: Fix decrementing of unopened flows wg."
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 1cad267..4681bde 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -254,7 +254,6 @@
Flags: d.Flags,
Payload: d.Payload,
})
- f.conn.unopenedFlows.Done()
}
sent += size
@@ -262,6 +261,12 @@
// opened. Note that since we've definitely sent a message now opened is surely
// true.
f.conn.mu.Lock()
+ // We need to ensure that we only call Done() exactly once, so we need to
+ // recheck f.opened, to ensure that f.close didn't decrement the wait group
+ // while we were not holding the lock.
+ if !f.opened {
+ f.conn.unopenedFlows.Done()
+ }
f.opened = true
}
f.writing = false
@@ -435,6 +440,9 @@
if !f.opened {
// Closing a flow that was never opened.
f.conn.unopenedFlows.Done()
+ // We mark the flow as opened to prevent mulitple calls to
+ // f.conn.unopenedFlows.Done().
+ f.opened = true
} else if !closedRemotely && !connClosing {
// Note: If the conn is closing there is no point in trying to
// send the flow close message as it will fail. This is racy