| // Copyright 2014 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package http2 |
| |
| import "fmt" |
| |
| // frameWriteMsg is a request to write a frame. |
| type frameWriteMsg struct { |
| // write is the interface value that does the writing, once the |
| // writeScheduler (below) has decided to select this frame |
| // to write. The write functions are all defined in write.go. |
| write writeFramer |
| |
| stream *stream // used for prioritization. nil for non-stream frames. |
| |
| // done, if non-nil, must be a buffered channel with space for |
| // 1 message and is sent the return value from write (or an |
| // earlier error) when the frame has been written. |
| done chan error |
| } |
| |
| // for debugging only: |
| func (wm frameWriteMsg) String() string { |
| var streamID uint32 |
| if wm.stream != nil { |
| streamID = wm.stream.id |
| } |
| var des string |
| if s, ok := wm.write.(fmt.Stringer); ok { |
| des = s.String() |
| } else { |
| des = fmt.Sprintf("%T", wm.write) |
| } |
| return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des) |
| } |
| |
| // writeScheduler tracks pending frames to write, priorities, and decides |
| // the next one to use. It is not thread-safe. |
| type writeScheduler struct { |
| // zero are frames not associated with a specific stream. |
| // They're sent before any stream-specific freams. |
| zero writeQueue |
| |
| // maxFrameSize is the maximum size of a DATA frame |
| // we'll write. Must be non-zero and between 16K-16M. |
| maxFrameSize uint32 |
| |
| // sq contains the stream-specific queues, keyed by stream ID. |
| // when a stream is idle, it's deleted from the map. |
| sq map[uint32]*writeQueue |
| |
| // canSend is a slice of memory that's reused between frame |
| // scheduling decisions to hold the list of writeQueues (from sq) |
| // which have enough flow control data to send. After canSend is |
| // built, the best is selected. |
| canSend []*writeQueue |
| |
| // pool of empty queues for reuse. |
| queuePool []*writeQueue |
| } |
| |
| func (ws *writeScheduler) putEmptyQueue(q *writeQueue) { |
| if len(q.s) != 0 { |
| panic("queue must be empty") |
| } |
| ws.queuePool = append(ws.queuePool, q) |
| } |
| |
| func (ws *writeScheduler) getEmptyQueue() *writeQueue { |
| ln := len(ws.queuePool) |
| if ln == 0 { |
| return new(writeQueue) |
| } |
| q := ws.queuePool[ln-1] |
| ws.queuePool = ws.queuePool[:ln-1] |
| return q |
| } |
| |
| func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 } |
| |
| func (ws *writeScheduler) add(wm frameWriteMsg) { |
| st := wm.stream |
| if st == nil { |
| ws.zero.push(wm) |
| } else { |
| ws.streamQueue(st.id).push(wm) |
| } |
| } |
| |
| func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue { |
| if q, ok := ws.sq[streamID]; ok { |
| return q |
| } |
| if ws.sq == nil { |
| ws.sq = make(map[uint32]*writeQueue) |
| } |
| q := ws.getEmptyQueue() |
| ws.sq[streamID] = q |
| return q |
| } |
| |
| // take returns the most important frame to write and removes it from the scheduler. |
| // It is illegal to call this if the scheduler is empty or if there are no connection-level |
| // flow control bytes available. |
| func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) { |
| if ws.maxFrameSize == 0 { |
| panic("internal error: ws.maxFrameSize not initialized or invalid") |
| } |
| |
| // If there any frames not associated with streams, prefer those first. |
| // These are usually SETTINGS, etc. |
| if !ws.zero.empty() { |
| return ws.zero.shift(), true |
| } |
| if len(ws.sq) == 0 { |
| return |
| } |
| |
| // Next, prioritize frames on streams that aren't DATA frames (no cost). |
| for id, q := range ws.sq { |
| if q.firstIsNoCost() { |
| return ws.takeFrom(id, q) |
| } |
| } |
| |
| // Now, all that remains are DATA frames with non-zero bytes to |
| // send. So pick the best one. |
| if len(ws.canSend) != 0 { |
| panic("should be empty") |
| } |
| for _, q := range ws.sq { |
| if n := ws.streamWritableBytes(q); n > 0 { |
| ws.canSend = append(ws.canSend, q) |
| } |
| } |
| if len(ws.canSend) == 0 { |
| return |
| } |
| defer ws.zeroCanSend() |
| |
| // TODO: find the best queue |
| q := ws.canSend[0] |
| |
| return ws.takeFrom(q.streamID(), q) |
| } |
| |
| // zeroCanSend is defered from take. |
| func (ws *writeScheduler) zeroCanSend() { |
| for i := range ws.canSend { |
| ws.canSend[i] = nil |
| } |
| ws.canSend = ws.canSend[:0] |
| } |
| |
| // streamWritableBytes returns the number of DATA bytes we could write |
| // from the given queue's stream, if this stream/queue were |
| // selected. It is an error to call this if q's head isn't a |
| // *writeData. |
| func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 { |
| wm := q.head() |
| ret := wm.stream.flow.available() // max we can write |
| if ret == 0 { |
| return 0 |
| } |
| if int32(ws.maxFrameSize) < ret { |
| ret = int32(ws.maxFrameSize) |
| } |
| if ret == 0 { |
| panic("internal error: ws.maxFrameSize not initialized or invalid") |
| } |
| wd := wm.write.(*writeData) |
| if len(wd.p) < int(ret) { |
| ret = int32(len(wd.p)) |
| } |
| return ret |
| } |
| |
| func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) { |
| wm = q.head() |
| // If the first item in this queue costs flow control tokens |
| // and we don't have enough, write as much as we can. |
| if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 { |
| allowed := wm.stream.flow.available() // max we can write |
| if allowed == 0 { |
| // No quota available. Caller can try the next stream. |
| return frameWriteMsg{}, false |
| } |
| if int32(ws.maxFrameSize) < allowed { |
| allowed = int32(ws.maxFrameSize) |
| } |
| // TODO: further restrict the allowed size, because even if |
| // the peer says it's okay to write 16MB data frames, we might |
| // want to write smaller ones to properly weight competing |
| // streams' priorities. |
| |
| if len(wd.p) > int(allowed) { |
| wm.stream.flow.take(allowed) |
| chunk := wd.p[:allowed] |
| wd.p = wd.p[allowed:] |
| // Make up a new write message of a valid size, rather |
| // than shifting one off the queue. |
| return frameWriteMsg{ |
| stream: wm.stream, |
| write: &writeData{ |
| streamID: wd.streamID, |
| p: chunk, |
| // even if the original had endStream set, there |
| // arebytes remaining because len(wd.p) > allowed, |
| // so we know endStream is false: |
| endStream: false, |
| }, |
| // our caller is blocking on the final DATA frame, not |
| // these intermediates, so no need to wait: |
| done: nil, |
| }, true |
| } |
| wm.stream.flow.take(int32(len(wd.p))) |
| } |
| |
| q.shift() |
| if q.empty() { |
| ws.putEmptyQueue(q) |
| delete(ws.sq, id) |
| } |
| return wm, true |
| } |
| |
| func (ws *writeScheduler) forgetStream(id uint32) { |
| q, ok := ws.sq[id] |
| if !ok { |
| return |
| } |
| delete(ws.sq, id) |
| |
| // But keep it for others later. |
| for i := range q.s { |
| q.s[i] = frameWriteMsg{} |
| } |
| q.s = q.s[:0] |
| ws.putEmptyQueue(q) |
| } |
| |
| type writeQueue struct { |
| s []frameWriteMsg |
| } |
| |
| // streamID returns the stream ID for a non-empty stream-specific queue. |
| func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id } |
| |
| func (q *writeQueue) empty() bool { return len(q.s) == 0 } |
| |
| func (q *writeQueue) push(wm frameWriteMsg) { |
| q.s = append(q.s, wm) |
| } |
| |
| // head returns the next item that would be removed by shift. |
| func (q *writeQueue) head() frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| return q.s[0] |
| } |
| |
| func (q *writeQueue) shift() frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| wm := q.s[0] |
| // TODO: less copy-happy queue. |
| copy(q.s, q.s[1:]) |
| q.s[len(q.s)-1] = frameWriteMsg{} |
| q.s = q.s[:len(q.s)-1] |
| return wm |
| } |
| |
| func (q *writeQueue) firstIsNoCost() bool { |
| if df, ok := q.s[0].write.(*writeData); ok { |
| return len(df.p) == 0 |
| } |
| return true |
| } |