ref: Allow open messages to carry a payload so we don't have to
send two messages at the start of a flow.

This speeds up the single RPC latency between 1.5% and 9%.

MultiPart: 1/2

Change-Id: Ie4356a2c07b221cbec86c26f333f44f2ab5d4bbc
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0b6f0ea..76a5206 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -267,6 +267,12 @@
 		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
 		c.handler.HandleFlow(f)
+		if err := f.q.put(ctx, msg.Payload); err != nil {
+			return err
+		}
+		if msg.Flags&message.CloseFlag != 0 {
+			f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+		}
 
 	case *message.Release:
 		release := make([]flowcontrol.Release, 0, len(msg.Counters))
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index c2b8d6c..dd6ffc6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -90,20 +90,6 @@
 	sent := 0
 	var left []byte
 	err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
-		if !f.opened {
-			// TODO(mattr): we should be able to send multiple messages
-			// in a single writeMsg call.
-			err := f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
-				ID:              f.id,
-				InitialCounters: defaultBufferSize,
-				BlessingsKey:    f.bkey,
-				DischargeKey:    f.dkey,
-			})
-			if err != nil {
-				return 0, false, err
-			}
-			f.opened = true
-		}
 		size := 0
 		var bufs [][]byte
 		if len(left) > 0 {
@@ -136,7 +122,22 @@
 			d.Flags |= message.DisableEncryptionFlag
 		}
 		sent += size
-		return size, done, f.conn.mp.writeMsg(f.ctx, d)
+
+		var err error
+		if f.opened {
+			err = f.conn.mp.writeMsg(f.ctx, d)
+		} else {
+			err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
+				ID:              f.id,
+				InitialCounters: defaultBufferSize,
+				BlessingsKey:    f.bkey,
+				DischargeKey:    f.dkey,
+				Flags:           d.Flags,
+				Payload:         d.Payload,
+			})
+			f.opened = true
+		}
+		return size, done, err
 	})
 	if alsoClose || err != nil {
 		f.close(f.ctx, err)
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 6d44609..9435354 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -61,8 +61,19 @@
 	if _, err = p.rw.WriteMsg(p.writeBuf); err != nil {
 		return err
 	}
-	if data, ok := m.(*message.Data); ok && (data.Flags&message.DisableEncryptionFlag != 0) {
-		if _, err = p.rw.WriteMsg(data.Payload...); err != nil {
+	var payload [][]byte
+	switch msg := m.(type) {
+	case *message.Data:
+		if msg.Flags&message.DisableEncryptionFlag != 0 {
+			payload = msg.Payload
+		}
+	case *message.OpenFlow:
+		if msg.Flags&message.DisableEncryptionFlag != 0 {
+			payload = msg.Payload
+		}
+	}
+	if payload != nil {
+		if _, err = p.rw.WriteMsg(payload...); err != nil {
 			return err
 		}
 	}