Merge "runtime/internal/flow/conn: Fix decrementing of unopened flows wg."
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 1cad267..4681bde 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -254,7 +254,6 @@
 				Flags:           d.Flags,
 				Payload:         d.Payload,
 			})
-			f.conn.unopenedFlows.Done()
 		}
 		sent += size
 
@@ -262,6 +261,12 @@
 		// opened.  Note that since we've definitely sent a message now opened is surely
 		// true.
 		f.conn.mu.Lock()
+		// We need to ensure that we only call Done() exactly once, so we need to
+		// recheck f.opened, to ensure that f.close didn't decrement the wait group
+		// while we were not holding the lock.
+		if !f.opened {
+			f.conn.unopenedFlows.Done()
+		}
 		f.opened = true
 	}
 	f.writing = false
@@ -435,6 +440,9 @@
 		if !f.opened {
 			// Closing a flow that was never opened.
 			f.conn.unopenedFlows.Done()
+			// We mark the flow as opened to prevent mulitple calls to
+			// f.conn.unopenedFlows.Done().
+			f.opened = true
 		} else if !closedRemotely && !connClosing {
 			// Note: If the conn is closing there is no point in trying to
 			// send the flow close message as it will fail.  This is racy