Merge "ref/runtime/internal/flow/conn: Fix the counter based flow control to send less often, and not block streams."
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index cf2cb9b..dc3ecd4 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -28,8 +28,10 @@
 	if err != nil {
 		return err
 	}
-	c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
-		c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
+	bflow := c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true)
+	bflow.worker.Release(ctx, defaultBufferSize)
+	c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG, bflow, true)
+
 	if err = c.readRemoteAuth(ctx, binding); err != nil {
 		return err
 	}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0a5b85d..0b6f0ea 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -60,6 +60,8 @@
 	flows          map[uint64]*flw
 	dischargeTimer *time.Timer
 	lastUsedTime   time.Time
+	toRelease      map[uint64]uint64
+	borrowing      map[uint64]bool
 }
 
 // Ensure that *Conn implements flow.ManagedConn.
@@ -83,6 +85,8 @@
 		nextFid:      reservedFlows,
 		flows:        map[uint64]*flw{},
 		lastUsedTime: time.Now(),
+		toRelease:    map[uint64]uint64{},
+		borrowing:    map[uint64]bool{},
 	}
 	if err := c.dialHandshake(ctx, versions); err != nil {
 		c.Close(ctx, err)
@@ -110,6 +114,8 @@
 		nextFid:      reservedFlows + 1,
 		flows:        map[uint64]*flw{},
 		lastUsedTime: time.Now(),
+		toRelease:    map[uint64]uint64{},
+		borrowing:    map[uint64]bool{},
 	}
 	if err := c.acceptHandshake(ctx, versions); err != nil {
 		c.Close(ctx, err)
@@ -211,22 +217,32 @@
 	close(c.closed)
 }
 
-func (c *Conn) release(ctx *context.T) {
-	counts := map[uint64]uint64{}
+func (c *Conn) release(ctx *context.T, fid, count uint64) {
+	var toRelease map[uint64]uint64
+	var release bool
 	c.mu.Lock()
-	for fid, f := range c.flows {
-		if release := f.q.release(); release > 0 {
-			counts[fid] = uint64(release)
-		}
+	c.toRelease[fid] += count
+	if c.borrowing[fid] {
+		c.toRelease[invalidFlowID] += count
+		release = c.toRelease[invalidFlowID] > defaultBufferSize/2
+	} else {
+		release = c.toRelease[fid] > defaultBufferSize/2
+	}
+	if release {
+		toRelease = c.toRelease
+		c.toRelease = make(map[uint64]uint64, len(c.toRelease))
+		c.borrowing = make(map[uint64]bool, len(c.borrowing))
 	}
 	c.mu.Unlock()
-	if len(counts) == 0 {
+
+	if toRelease == nil {
 		return
 	}
+	delete(toRelease, invalidFlowID)
 
 	err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) {
 		err := c.mp.writeMsg(ctx, &message.Release{
-			Counters: counts,
+			Counters: toRelease,
 		})
 		return 0, true, err
 	})
@@ -246,6 +262,9 @@
 		}
 		c.mu.Lock()
 		f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
+		f.worker.Release(ctx, int(msg.InitialCounters))
+		c.toRelease[msg.ID] = defaultBufferSize
+		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
 		c.handler.HandleFlow(f)
 
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 820cba9..c2b8d6c 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -37,7 +37,7 @@
 		dialed: dialed,
 		conn:   c,
 		worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
-		q:      newReadQ(),
+		q:      newReadQ(c, id),
 		bkey:   bkey,
 		dkey:   dkey,
 		opened: preopen,
@@ -57,11 +57,7 @@
 // or each other.
 func (f *flw) Read(p []byte) (n int, err error) {
 	f.conn.markUsed()
-	var release bool
-	if n, release, err = f.q.read(f.ctx, p); release {
-		f.conn.release(f.ctx)
-	}
-	if err != nil {
+	if n, err = f.q.read(f.ctx, p); err != nil {
 		f.close(f.ctx, err)
 	}
 	return
@@ -73,14 +69,10 @@
 // or each other.
 func (f *flw) ReadMsg() (buf []byte, err error) {
 	f.conn.markUsed()
-	var release bool
 	// TODO(mattr): Currently we only ever release counters when some flow
 	// reads.  We may need to do it more or less often.  Currently
 	// we'll send counters whenever a new flow is opened.
-	if buf, release, err = f.q.get(f.ctx); release {
-		f.conn.release(f.ctx)
-	}
-	if err != nil {
+	if buf, err = f.q.get(f.ctx); err != nil {
 		f.close(f.ctx, err)
 	}
 	return
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index cd61bc7..54db134 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -16,19 +16,21 @@
 	bufs [][]byte
 	b, e int
 
-	size      int
-	nbufs     int
-	toRelease int
-	notify    chan struct{}
+	id     uint64
+	size   int
+	nbufs  int
+	notify chan struct{}
+	conn   *Conn
 }
 
 const initialReadqBufferSize = 10
 
-func newReadQ() *readq {
+func newReadQ(conn *Conn, id uint64) *readq {
 	return &readq{
-		bufs:      make([][]byte, initialReadqBufferSize),
-		notify:    make(chan struct{}, 1),
-		toRelease: defaultBufferSize,
+		bufs:   make([][]byte, initialReadqBufferSize),
+		notify: make(chan struct{}, 1),
+		conn:   conn,
+		id:     id,
 	}
 }
 
@@ -68,38 +70,40 @@
 	return nil
 }
 
-func (r *readq) read(ctx *context.T, data []byte) (n int, release bool, err error) {
-	defer r.mu.Unlock()
+func (r *readq) read(ctx *context.T, data []byte) (n int, err error) {
 	r.mu.Lock()
-	if err := r.waitLocked(ctx); err != nil {
-		return 0, false, err
+	if err = r.waitLocked(ctx); err == nil {
+		buf := r.bufs[r.b]
+		n = copy(data, buf)
+		buf = buf[n:]
+		if len(buf) > 0 {
+			r.bufs[r.b] = buf
+		} else {
+			r.nbufs -= 1
+			r.b = (r.b + 1) % len(r.bufs)
+		}
+		r.size -= n
 	}
-	buf := r.bufs[r.b]
-	n = copy(data, buf)
-	buf = buf[n:]
-	if len(buf) > 0 {
-		r.bufs[r.b] = buf
-	} else {
-		r.nbufs -= 1
-		r.b = (r.b + 1) % len(r.bufs)
+	r.mu.Unlock()
+	if r.conn != nil {
+		r.conn.release(ctx, r.id, uint64(n))
 	}
-	r.size -= n
-	r.toRelease += n
-	return n, r.toRelease > defaultBufferSize/2, nil
+	return
 }
 
-func (r *readq) get(ctx *context.T) (out []byte, release bool, err error) {
-	defer r.mu.Unlock()
+func (r *readq) get(ctx *context.T) (out []byte, err error) {
 	r.mu.Lock()
-	if err := r.waitLocked(ctx); err != nil {
-		return nil, false, err
+	if err = r.waitLocked(ctx); err == nil {
+		out = r.bufs[r.b]
+		r.b = (r.b + 1) % len(r.bufs)
+		r.size -= len(out)
+		r.nbufs -= 1
 	}
-	out = r.bufs[r.b]
-	r.b = (r.b + 1) % len(r.bufs)
-	r.size -= len(out)
-	r.nbufs -= 1
-	r.toRelease += len(out)
-	return out, r.toRelease > defaultBufferSize/2, nil
+	r.mu.Unlock()
+	if r.conn != nil {
+		r.conn.release(ctx, r.id, uint64(len(out)))
+	}
+	return
 }
 
 func (r *readq) waitLocked(ctx *context.T) (err error) {
@@ -124,7 +128,6 @@
 	r.mu.Lock()
 	if r.e != -1 {
 		r.e = -1
-		r.toRelease = 0
 		close(r.notify)
 	}
 	r.mu.Unlock()
@@ -144,10 +147,3 @@
 	}
 	r.bufs, r.b, r.e = nb, 0, copied
 }
-
-func (r *readq) release() (out int) {
-	r.mu.Lock()
-	out, r.toRelease = r.toRelease, 0
-	r.mu.Unlock()
-	return out
-}
diff --git a/runtime/internal/flow/conn/readq_test.go b/runtime/internal/flow/conn/readq_test.go
index baea620..a236118 100644
--- a/runtime/internal/flow/conn/readq_test.go
+++ b/runtime/internal/flow/conn/readq_test.go
@@ -26,7 +26,7 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	r := newReadQ()
+	r := newReadQ(nil, 1)
 	r.put(ctx, mkBufs("one", "two"))
 	r.put(ctx, mkBufs("thre", "reallong"))
 	r.close(ctx)
@@ -34,7 +34,7 @@
 	read := make([]byte, 4)
 	want := []string{"one", "two", "thre", "real", "long"}
 	for _, w := range want {
-		n, _, err := r.read(ctx, read)
+		n, err := r.read(ctx, read)
 		if err != nil {
 			t.Fatalf("unexpected error: %v", err)
 		}
@@ -42,7 +42,7 @@
 			t.Errorf("got: %s, want %s", got, w)
 		}
 	}
-	if _, _, err := r.read(ctx, read); err != io.EOF {
+	if _, err := r.read(ctx, read); err != io.EOF {
 		t.Errorf("expected EOF got %v", err)
 	}
 }
@@ -53,14 +53,14 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	r := newReadQ()
+	r := newReadQ(nil, 1)
 	r.put(ctx, mkBufs("one", "two"))
 	r.put(ctx, mkBufs("thre", "reallong"))
 	r.close(ctx)
 
 	want := []string{"one", "two", "thre", "reallong"}
 	for _, w := range want {
-		out, _, err := r.get(ctx)
+		out, err := r.get(ctx)
 		if err != nil {
 			t.Fatalf("unexpected error: %v", err)
 		}
@@ -68,7 +68,7 @@
 			t.Errorf("got: %s, want %s", got, w)
 		}
 	}
-	if _, _, err := r.get(ctx); err != io.EOF {
+	if _, err := r.get(ctx); err != io.EOF {
 		t.Errorf("expected EOF got %v", err)
 	}
 }
@@ -79,7 +79,7 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	r := newReadQ()
+	r := newReadQ(nil, 1)
 	r.put(ctx, mkBufs("one", "two"))
 	r.put(ctx, mkBufs("thre", "reallong"))
 	r.close(ctx)
@@ -94,10 +94,10 @@
 			read = make([]byte, 4)
 		)
 		if i%2 == 0 {
-			out, _, err = r.get(ctx)
+			out, err = r.get(ctx)
 			got = string(out)
 		} else {
-			n, _, err = r.read(ctx, read)
+			n, err = r.read(ctx, read)
 			got = string(read[:n])
 		}
 		if err != nil {
@@ -107,10 +107,10 @@
 			t.Errorf("got: %s, want %s", got, w)
 		}
 	}
-	if _, _, err := r.get(ctx); err != io.EOF {
+	if _, err := r.get(ctx); err != io.EOF {
 		t.Errorf("expected EOF got %v", err)
 	}
-	if _, _, err := r.read(ctx, nil); err != io.EOF {
+	if _, err := r.read(ctx, nil); err != io.EOF {
 		t.Errorf("expected EOF got %v", err)
 	}
 }
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index a27d48f..7dbd840 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -89,7 +89,7 @@
 			if next != nil {
 				next.notify()
 			}
-			ctx.VI(4).Infof("worker waiting: %s\nfc: %s", w, w.fc)
+			ctx.VI(4).Infof("worker waiting: %s %#v\nfc: %s %d", w, w.counters, w.fc, w.fc.shared)
 			select {
 			case <-ctx.Done():
 				err = ctx.Err()