Merge "ref/runtime/internal/flow/conn: Fix the counter based flow control to send less often, and not block streams."
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index cf2cb9b..dc3ecd4 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -28,8 +28,10 @@
if err != nil {
return err
}
- c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG,
- c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true), true)
+ bflow := c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true)
+ bflow.worker.Release(ctx, defaultBufferSize)
+ c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG, bflow, true)
+
if err = c.readRemoteAuth(ctx, binding); err != nil {
return err
}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0a5b85d..0b6f0ea 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -60,6 +60,8 @@
flows map[uint64]*flw
dischargeTimer *time.Timer
lastUsedTime time.Time
+ toRelease map[uint64]uint64
+ borrowing map[uint64]bool
}
// Ensure that *Conn implements flow.ManagedConn.
@@ -83,6 +85,8 @@
nextFid: reservedFlows,
flows: map[uint64]*flw{},
lastUsedTime: time.Now(),
+ toRelease: map[uint64]uint64{},
+ borrowing: map[uint64]bool{},
}
if err := c.dialHandshake(ctx, versions); err != nil {
c.Close(ctx, err)
@@ -110,6 +114,8 @@
nextFid: reservedFlows + 1,
flows: map[uint64]*flw{},
lastUsedTime: time.Now(),
+ toRelease: map[uint64]uint64{},
+ borrowing: map[uint64]bool{},
}
if err := c.acceptHandshake(ctx, versions); err != nil {
c.Close(ctx, err)
@@ -211,22 +217,32 @@
close(c.closed)
}
-func (c *Conn) release(ctx *context.T) {
- counts := map[uint64]uint64{}
+func (c *Conn) release(ctx *context.T, fid, count uint64) {
+ var toRelease map[uint64]uint64
+ var release bool
c.mu.Lock()
- for fid, f := range c.flows {
- if release := f.q.release(); release > 0 {
- counts[fid] = uint64(release)
- }
+ c.toRelease[fid] += count
+ if c.borrowing[fid] {
+ c.toRelease[invalidFlowID] += count
+ release = c.toRelease[invalidFlowID] > defaultBufferSize/2
+ } else {
+ release = c.toRelease[fid] > defaultBufferSize/2
+ }
+ if release {
+ toRelease = c.toRelease
+ c.toRelease = make(map[uint64]uint64, len(c.toRelease))
+ c.borrowing = make(map[uint64]bool, len(c.borrowing))
}
c.mu.Unlock()
- if len(counts) == 0 {
+
+ if toRelease == nil {
return
}
+ delete(toRelease, invalidFlowID)
err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) {
err := c.mp.writeMsg(ctx, &message.Release{
- Counters: counts,
+ Counters: toRelease,
})
return 0, true, err
})
@@ -246,6 +262,9 @@
}
c.mu.Lock()
f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
+ f.worker.Release(ctx, int(msg.InitialCounters))
+ c.toRelease[msg.ID] = defaultBufferSize
+ c.borrowing[msg.ID] = true
c.mu.Unlock()
c.handler.HandleFlow(f)
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 820cba9..c2b8d6c 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -37,7 +37,7 @@
dialed: dialed,
conn: c,
worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
- q: newReadQ(),
+ q: newReadQ(c, id),
bkey: bkey,
dkey: dkey,
opened: preopen,
@@ -57,11 +57,7 @@
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
f.conn.markUsed()
- var release bool
- if n, release, err = f.q.read(f.ctx, p); release {
- f.conn.release(f.ctx)
- }
- if err != nil {
+ if n, err = f.q.read(f.ctx, p); err != nil {
f.close(f.ctx, err)
}
return
@@ -73,14 +69,10 @@
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
f.conn.markUsed()
- var release bool
// TODO(mattr): Currently we only ever release counters when some flow
// reads. We may need to do it more or less often. Currently
// we'll send counters whenever a new flow is opened.
- if buf, release, err = f.q.get(f.ctx); release {
- f.conn.release(f.ctx)
- }
- if err != nil {
+ if buf, err = f.q.get(f.ctx); err != nil {
f.close(f.ctx, err)
}
return
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index cd61bc7..54db134 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -16,19 +16,21 @@
bufs [][]byte
b, e int
- size int
- nbufs int
- toRelease int
- notify chan struct{}
+ id uint64
+ size int
+ nbufs int
+ notify chan struct{}
+ conn *Conn
}
const initialReadqBufferSize = 10
-func newReadQ() *readq {
+func newReadQ(conn *Conn, id uint64) *readq {
return &readq{
- bufs: make([][]byte, initialReadqBufferSize),
- notify: make(chan struct{}, 1),
- toRelease: defaultBufferSize,
+ bufs: make([][]byte, initialReadqBufferSize),
+ notify: make(chan struct{}, 1),
+ conn: conn,
+ id: id,
}
}
@@ -68,38 +70,40 @@
return nil
}
-func (r *readq) read(ctx *context.T, data []byte) (n int, release bool, err error) {
- defer r.mu.Unlock()
+func (r *readq) read(ctx *context.T, data []byte) (n int, err error) {
r.mu.Lock()
- if err := r.waitLocked(ctx); err != nil {
- return 0, false, err
+ if err = r.waitLocked(ctx); err == nil {
+ buf := r.bufs[r.b]
+ n = copy(data, buf)
+ buf = buf[n:]
+ if len(buf) > 0 {
+ r.bufs[r.b] = buf
+ } else {
+ r.nbufs -= 1
+ r.b = (r.b + 1) % len(r.bufs)
+ }
+ r.size -= n
}
- buf := r.bufs[r.b]
- n = copy(data, buf)
- buf = buf[n:]
- if len(buf) > 0 {
- r.bufs[r.b] = buf
- } else {
- r.nbufs -= 1
- r.b = (r.b + 1) % len(r.bufs)
+ r.mu.Unlock()
+ if r.conn != nil {
+ r.conn.release(ctx, r.id, uint64(n))
}
- r.size -= n
- r.toRelease += n
- return n, r.toRelease > defaultBufferSize/2, nil
+ return
}
-func (r *readq) get(ctx *context.T) (out []byte, release bool, err error) {
- defer r.mu.Unlock()
+func (r *readq) get(ctx *context.T) (out []byte, err error) {
r.mu.Lock()
- if err := r.waitLocked(ctx); err != nil {
- return nil, false, err
+ if err = r.waitLocked(ctx); err == nil {
+ out = r.bufs[r.b]
+ r.b = (r.b + 1) % len(r.bufs)
+ r.size -= len(out)
+ r.nbufs -= 1
}
- out = r.bufs[r.b]
- r.b = (r.b + 1) % len(r.bufs)
- r.size -= len(out)
- r.nbufs -= 1
- r.toRelease += len(out)
- return out, r.toRelease > defaultBufferSize/2, nil
+ r.mu.Unlock()
+ if r.conn != nil {
+ r.conn.release(ctx, r.id, uint64(len(out)))
+ }
+ return
}
func (r *readq) waitLocked(ctx *context.T) (err error) {
@@ -124,7 +128,6 @@
r.mu.Lock()
if r.e != -1 {
r.e = -1
- r.toRelease = 0
close(r.notify)
}
r.mu.Unlock()
@@ -144,10 +147,3 @@
}
r.bufs, r.b, r.e = nb, 0, copied
}
-
-func (r *readq) release() (out int) {
- r.mu.Lock()
- out, r.toRelease = r.toRelease, 0
- r.mu.Unlock()
- return out
-}
diff --git a/runtime/internal/flow/conn/readq_test.go b/runtime/internal/flow/conn/readq_test.go
index baea620..a236118 100644
--- a/runtime/internal/flow/conn/readq_test.go
+++ b/runtime/internal/flow/conn/readq_test.go
@@ -26,7 +26,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
- r := newReadQ()
+ r := newReadQ(nil, 1)
r.put(ctx, mkBufs("one", "two"))
r.put(ctx, mkBufs("thre", "reallong"))
r.close(ctx)
@@ -34,7 +34,7 @@
read := make([]byte, 4)
want := []string{"one", "two", "thre", "real", "long"}
for _, w := range want {
- n, _, err := r.read(ctx, read)
+ n, err := r.read(ctx, read)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -42,7 +42,7 @@
t.Errorf("got: %s, want %s", got, w)
}
}
- if _, _, err := r.read(ctx, read); err != io.EOF {
+ if _, err := r.read(ctx, read); err != io.EOF {
t.Errorf("expected EOF got %v", err)
}
}
@@ -53,14 +53,14 @@
ctx, shutdown := v23.Init()
defer shutdown()
- r := newReadQ()
+ r := newReadQ(nil, 1)
r.put(ctx, mkBufs("one", "two"))
r.put(ctx, mkBufs("thre", "reallong"))
r.close(ctx)
want := []string{"one", "two", "thre", "reallong"}
for _, w := range want {
- out, _, err := r.get(ctx)
+ out, err := r.get(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -68,7 +68,7 @@
t.Errorf("got: %s, want %s", got, w)
}
}
- if _, _, err := r.get(ctx); err != io.EOF {
+ if _, err := r.get(ctx); err != io.EOF {
t.Errorf("expected EOF got %v", err)
}
}
@@ -79,7 +79,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
- r := newReadQ()
+ r := newReadQ(nil, 1)
r.put(ctx, mkBufs("one", "two"))
r.put(ctx, mkBufs("thre", "reallong"))
r.close(ctx)
@@ -94,10 +94,10 @@
read = make([]byte, 4)
)
if i%2 == 0 {
- out, _, err = r.get(ctx)
+ out, err = r.get(ctx)
got = string(out)
} else {
- n, _, err = r.read(ctx, read)
+ n, err = r.read(ctx, read)
got = string(read[:n])
}
if err != nil {
@@ -107,10 +107,10 @@
t.Errorf("got: %s, want %s", got, w)
}
}
- if _, _, err := r.get(ctx); err != io.EOF {
+ if _, err := r.get(ctx); err != io.EOF {
t.Errorf("expected EOF got %v", err)
}
- if _, _, err := r.read(ctx, nil); err != io.EOF {
+ if _, err := r.read(ctx, nil); err != io.EOF {
t.Errorf("expected EOF got %v", err)
}
}
diff --git a/runtime/internal/flow/flowcontrol/flowcontrol.go b/runtime/internal/flow/flowcontrol/flowcontrol.go
index a27d48f..7dbd840 100644
--- a/runtime/internal/flow/flowcontrol/flowcontrol.go
+++ b/runtime/internal/flow/flowcontrol/flowcontrol.go
@@ -89,7 +89,7 @@
if next != nil {
next.notify()
}
- ctx.VI(4).Infof("worker waiting: %s\nfc: %s", w, w.fc)
+ ctx.VI(4).Infof("worker waiting: %s %#v\nfc: %s %d", w, w.counters, w.fc, w.fc.shared)
select {
case <-ctx.Done():
err = ctx.Err()