Merge "runtime/internal/flow/conn: Send Mtu and SharedTokens as setup options. https://github.com/vanadium/issues/issues/998"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 8e0c1f3..8c9b5ba 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -119,6 +119,8 @@
 		Versions:          versions,
 		PeerLocalEndpoint: c.local,
 		PeerNaClPublicKey: pk,
+		Mtu:               defaultMtu,
+		SharedTokens:      DefaultBytesBufferedPerFlow,
 	}
 	if c.remote != nil {
 		lSetup.PeerRemoteEndpoint = c.remote
@@ -152,6 +154,15 @@
 	if c.local == nil {
 		c.local = rSetup.PeerRemoteEndpoint
 	}
+	if rSetup.Mtu != 0 {
+		c.mtu = rSetup.Mtu
+	} else {
+		c.mtu = defaultMtu
+	}
+	c.lshared = lSetup.SharedTokens
+	if rSetup.SharedTokens != 0 && rSetup.SharedTokens < c.lshared {
+		c.lshared = rSetup.SharedTokens
+	}
 	if rSetup.PeerNaClPublicKey == nil {
 		return nil, nil, NewErrMissingSetupOption(ctx, "peerNaClPublicKey")
 	}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index a479aec..55a8197 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -38,7 +38,7 @@
 )
 
 const (
-	mtu                         = 1 << 16
+	defaultMtu                  = 1 << 16
 	defaultChannelTimeout       = 30 * time.Minute
 	DefaultBytesBufferedPerFlow = 1 << 20
 	proxyOverhead               = 32
@@ -88,6 +88,7 @@
 	unopenedFlows sync.WaitGroup
 	cancel        context.CancelFunc
 	handler       FlowHandler
+	mtu           uint64
 
 	mu sync.Mutex // All the variables below here are protected by mu.
 
@@ -158,22 +159,19 @@
 		channelTimeout = defaultChannelTimeout
 	}
 	c := &Conn{
-		mp:           newMessagePipe(conn),
-		handler:      handler,
-		lBlessings:   lBlessings,
-		local:        endpointCopy(local),
-		remote:       endpointCopy(remote),
-		closed:       make(chan struct{}),
-		lameDucked:   make(chan struct{}),
-		nextFid:      reservedFlows,
-		flows:        map[uint64]*flw{},
-		lastUsedTime: time.Now(),
-		toRelease:    map[uint64]uint64{},
-		borrowing:    map[uint64]bool{},
-		cancel:       cancel,
-		// TODO(mattr): We should negotiate the shared counter pool size with the
-		// other end.
-		lshared:              DefaultBytesBufferedPerFlow,
+		mp:                   newMessagePipe(conn),
+		handler:              handler,
+		lBlessings:           lBlessings,
+		local:                endpointCopy(local),
+		remote:               endpointCopy(remote),
+		closed:               make(chan struct{}),
+		lameDucked:           make(chan struct{}),
+		nextFid:              reservedFlows,
+		flows:                map[uint64]*flw{},
+		lastUsedTime:         time.Now(),
+		toRelease:            map[uint64]uint64{},
+		borrowing:            map[uint64]bool{},
+		cancel:               cancel,
 		outstandingBorrowed:  make(map[uint64]uint64),
 		activeWriters:        make([]writer, numPriorities),
 		acceptChannelTimeout: channelTimeout,
@@ -250,7 +248,6 @@
 		toRelease:            map[uint64]uint64{},
 		borrowing:            map[uint64]bool{},
 		cancel:               cancel,
-		lshared:              DefaultBytesBufferedPerFlow,
 		outstandingBorrowed:  make(map[uint64]uint64),
 		activeWriters:        make([]writer, numPriorities),
 		acceptChannelTimeout: channelTimeout,
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 96bfb8e..60236a6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -142,7 +142,7 @@
 // the number of shared counters for the conn if we are sending on a just
 // dialed flow.
 func (f *flw) tokensLocked() (int, func(int)) {
-	max := uint64(mtu)
+	max := f.conn.mtu
 	// When	our flow is proxied (i.e. encapsulated), the proxy has added overhead
 	// when forwarding the message. This means we must reduce our mtu to ensure
 	// that dialer framing reaches the acceptor without being truncated by the
diff --git a/runtime/internal/flow/conn/flowcontrol_test.go b/runtime/internal/flow/conn/flowcontrol_test.go
index 81af175..5529940 100644
--- a/runtime/internal/flow/conn/flowcontrol_test.go
+++ b/runtime/internal/flow/conn/flowcontrol_test.go
@@ -116,18 +116,18 @@
 	defer wg.Wait()
 	for _, f := range flows {
 		go func(fl flow.Flow) {
-			if _, err := fl.WriteMsg(randData[:mtu*nmessages]); err != nil {
+			if _, err := fl.WriteMsg(randData[:defaultMtu*nmessages]); err != nil {
 				panic(err)
 			}
 			wg.Done()
 		}(f)
 		go func() {
 			fl := <-accept
-			buf := make([]byte, mtu*nmessages)
+			buf := make([]byte, defaultMtu*nmessages)
 			if _, err := io.ReadFull(fl, buf); err != nil {
 				panic(err)
 			}
-			if !bytes.Equal(buf, randData[:mtu*nmessages]) {
+			if !bytes.Equal(buf, randData[:defaultMtu*nmessages]) {
 				t.Fatal("unequal data")
 			}
 			wg.Done()
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 529079f..8e50e12 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -30,7 +30,7 @@
 func newMessagePipe(rw flow.MsgReadWriteCloser) *messagePipe {
 	return &messagePipe{
 		rw:       rw,
-		writeBuf: make([]byte, mtu),
+		writeBuf: make([]byte, defaultMtu),
 		cipher:   &crypto.NullControlCipher{},
 	}
 }