Merge "runtime/internal/flow/conn: Send Mtu and SharedTokens as setup options. https://github.com/vanadium/issues/issues/998"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 8e0c1f3..8c9b5ba 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -119,6 +119,8 @@
Versions: versions,
PeerLocalEndpoint: c.local,
PeerNaClPublicKey: pk,
+ Mtu: defaultMtu,
+ SharedTokens: DefaultBytesBufferedPerFlow,
}
if c.remote != nil {
lSetup.PeerRemoteEndpoint = c.remote
@@ -152,6 +154,15 @@
if c.local == nil {
c.local = rSetup.PeerRemoteEndpoint
}
+ if rSetup.Mtu != 0 {
+ c.mtu = rSetup.Mtu
+ } else {
+ c.mtu = defaultMtu
+ }
+ c.lshared = lSetup.SharedTokens
+ if rSetup.SharedTokens != 0 && rSetup.SharedTokens < c.lshared {
+ c.lshared = rSetup.SharedTokens
+ }
if rSetup.PeerNaClPublicKey == nil {
return nil, nil, NewErrMissingSetupOption(ctx, "peerNaClPublicKey")
}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index a479aec..55a8197 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -38,7 +38,7 @@
)
const (
- mtu = 1 << 16
+ defaultMtu = 1 << 16
defaultChannelTimeout = 30 * time.Minute
DefaultBytesBufferedPerFlow = 1 << 20
proxyOverhead = 32
@@ -88,6 +88,7 @@
unopenedFlows sync.WaitGroup
cancel context.CancelFunc
handler FlowHandler
+ mtu uint64
mu sync.Mutex // All the variables below here are protected by mu.
@@ -158,22 +159,19 @@
channelTimeout = defaultChannelTimeout
}
c := &Conn{
- mp: newMessagePipe(conn),
- handler: handler,
- lBlessings: lBlessings,
- local: endpointCopy(local),
- remote: endpointCopy(remote),
- closed: make(chan struct{}),
- lameDucked: make(chan struct{}),
- nextFid: reservedFlows,
- flows: map[uint64]*flw{},
- lastUsedTime: time.Now(),
- toRelease: map[uint64]uint64{},
- borrowing: map[uint64]bool{},
- cancel: cancel,
- // TODO(mattr): We should negotiate the shared counter pool size with the
- // other end.
- lshared: DefaultBytesBufferedPerFlow,
+ mp: newMessagePipe(conn),
+ handler: handler,
+ lBlessings: lBlessings,
+ local: endpointCopy(local),
+ remote: endpointCopy(remote),
+ closed: make(chan struct{}),
+ lameDucked: make(chan struct{}),
+ nextFid: reservedFlows,
+ flows: map[uint64]*flw{},
+ lastUsedTime: time.Now(),
+ toRelease: map[uint64]uint64{},
+ borrowing: map[uint64]bool{},
+ cancel: cancel,
outstandingBorrowed: make(map[uint64]uint64),
activeWriters: make([]writer, numPriorities),
acceptChannelTimeout: channelTimeout,
@@ -250,7 +248,6 @@
toRelease: map[uint64]uint64{},
borrowing: map[uint64]bool{},
cancel: cancel,
- lshared: DefaultBytesBufferedPerFlow,
outstandingBorrowed: make(map[uint64]uint64),
activeWriters: make([]writer, numPriorities),
acceptChannelTimeout: channelTimeout,
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 96bfb8e..60236a6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -142,7 +142,7 @@
// the number of shared counters for the conn if we are sending on a just
// dialed flow.
func (f *flw) tokensLocked() (int, func(int)) {
- max := uint64(mtu)
+ max := f.conn.mtu
// When our flow is proxied (i.e. encapsulated), the proxy has added overhead
// when forwarding the message. This means we must reduce our mtu to ensure
// that dialer framing reaches the acceptor without being truncated by the
diff --git a/runtime/internal/flow/conn/flowcontrol_test.go b/runtime/internal/flow/conn/flowcontrol_test.go
index 81af175..5529940 100644
--- a/runtime/internal/flow/conn/flowcontrol_test.go
+++ b/runtime/internal/flow/conn/flowcontrol_test.go
@@ -116,18 +116,18 @@
defer wg.Wait()
for _, f := range flows {
go func(fl flow.Flow) {
- if _, err := fl.WriteMsg(randData[:mtu*nmessages]); err != nil {
+ if _, err := fl.WriteMsg(randData[:defaultMtu*nmessages]); err != nil {
panic(err)
}
wg.Done()
}(f)
go func() {
fl := <-accept
- buf := make([]byte, mtu*nmessages)
+ buf := make([]byte, defaultMtu*nmessages)
if _, err := io.ReadFull(fl, buf); err != nil {
panic(err)
}
- if !bytes.Equal(buf, randData[:mtu*nmessages]) {
+ if !bytes.Equal(buf, randData[:defaultMtu*nmessages]) {
t.Fatal("unequal data")
}
wg.Done()
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 529079f..8e50e12 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -30,7 +30,7 @@
func newMessagePipe(rw flow.MsgReadWriteCloser) *messagePipe {
return &messagePipe{
rw: rw,
- writeBuf: make([]byte, mtu),
+ writeBuf: make([]byte, defaultMtu),
cipher: &crypto.NullControlCipher{},
}
}