ref: Allow open messages to carry a payload so we don't have to
send two messages at the start of a flow.
This speeds up the single RPC latency between 1.5% and 9%.
MultiPart: 1/2
Change-Id: Ie4356a2c07b221cbec86c26f333f44f2ab5d4bbc
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0b6f0ea..76a5206 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -267,6 +267,12 @@
c.borrowing[msg.ID] = true
c.mu.Unlock()
c.handler.HandleFlow(f)
+ if err := f.q.put(ctx, msg.Payload); err != nil {
+ return err
+ }
+ if msg.Flags&message.CloseFlag != 0 {
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+ }
case *message.Release:
release := make([]flowcontrol.Release, 0, len(msg.Counters))
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index c2b8d6c..dd6ffc6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -90,20 +90,6 @@
sent := 0
var left []byte
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
- if !f.opened {
- // TODO(mattr): we should be able to send multiple messages
- // in a single writeMsg call.
- err := f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
- ID: f.id,
- InitialCounters: defaultBufferSize,
- BlessingsKey: f.bkey,
- DischargeKey: f.dkey,
- })
- if err != nil {
- return 0, false, err
- }
- f.opened = true
- }
size := 0
var bufs [][]byte
if len(left) > 0 {
@@ -136,7 +122,22 @@
d.Flags |= message.DisableEncryptionFlag
}
sent += size
- return size, done, f.conn.mp.writeMsg(f.ctx, d)
+
+ var err error
+ if f.opened {
+ err = f.conn.mp.writeMsg(f.ctx, d)
+ } else {
+ err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
+ ID: f.id,
+ InitialCounters: defaultBufferSize,
+ BlessingsKey: f.bkey,
+ DischargeKey: f.dkey,
+ Flags: d.Flags,
+ Payload: d.Payload,
+ })
+ f.opened = true
+ }
+ return size, done, err
})
if alsoClose || err != nil {
f.close(f.ctx, err)
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 6d44609..9435354 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -61,8 +61,19 @@
if _, err = p.rw.WriteMsg(p.writeBuf); err != nil {
return err
}
- if data, ok := m.(*message.Data); ok && (data.Flags&message.DisableEncryptionFlag != 0) {
- if _, err = p.rw.WriteMsg(data.Payload...); err != nil {
+ var payload [][]byte
+ switch msg := m.(type) {
+ case *message.Data:
+ if msg.Flags&message.DisableEncryptionFlag != 0 {
+ payload = msg.Payload
+ }
+ case *message.OpenFlow:
+ if msg.Flags&message.DisableEncryptionFlag != 0 {
+ payload = msg.Payload
+ }
+ }
+ if payload != nil {
+ if _, err = p.rw.WriteMsg(payload...); err != nil {
return err
}
}