blob: c26bb627b7370b9a00fba53aab92aca127fa4dfc [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"bytes"
"sync"
"v.io/v23/context"
"v.io/v23/flow"
)
var bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}
type MTUer interface {
MTU() uint64
}
// BufferingFlow wraps a Flow and buffers all its writes. It only truly writes to the
// underlying flow when the buffered data exceeds the MTU of the underlying channel, or
// Flush, Close, or WriteMsgAndClose is called.
type BufferingFlow struct {
flow.Flow
mtu uint64
mu sync.Mutex
buf *bytes.Buffer // Protected by mu.
}
func NewBufferingFlow(ctx *context.T, flw flow.Flow) *BufferingFlow {
b := &BufferingFlow{
Flow: flw,
buf: bufferPool.Get().(*bytes.Buffer),
mtu: defaultMtu,
}
b.buf.Reset()
if m, ok := flw.Conn().(MTUer); ok {
b.mtu = m.MTU()
}
return b
}
// Write buffers data until the underlying channels MTU is reached at which point
// it calls Write on the wrapped Flow.
func (b *BufferingFlow) Write(data []byte) (int, error) {
return b.WriteMsg(data)
}
// WriteMsg buffers data until the underlying channels MTU is reached at which point
// it calls WriteMsg on the wrapped Flow.
func (b *BufferingFlow) WriteMsg(data ...[]byte) (int, error) {
defer b.mu.Unlock()
b.mu.Lock()
if b.buf == nil {
return b.Flow.WriteMsg(data...)
}
wrote := 0
for _, d := range data {
if l := b.buf.Len(); l > 0 && uint64(l+len(d)) > b.mtu {
if _, err := b.Flow.WriteMsg(b.buf.Bytes()); err != nil {
return wrote, err
}
b.buf.Reset()
}
cur, err := b.buf.Write(d)
wrote += cur
if err != nil {
return wrote, err
}
}
return wrote, nil
}
// Close flushes the already written data and then closes the underlying Flow.
func (b *BufferingFlow) Close() error {
defer b.mu.Unlock()
b.mu.Lock()
if b.buf == nil {
return b.Flow.Close()
}
_, err := b.Flow.WriteMsgAndClose(b.buf.Bytes())
bufferPool.Put(b.buf)
b.buf = nil
return err
}
// WriteMsgAndClose writes all buffered data and closes the underlying Flow.
func (b *BufferingFlow) WriteMsgAndClose(data ...[]byte) (int, error) {
defer b.mu.Unlock()
b.mu.Lock()
if b.buf == nil {
return b.Flow.WriteMsgAndClose(data...)
}
wrote, err := b.WriteMsg(data...)
if err != nil {
return wrote, err
}
return wrote, b.Close()
}
// Flush writes all buffered data to the underlying Flow.
func (b *BufferingFlow) Flush() (err error) {
defer b.mu.Unlock()
b.mu.Lock()
if b.buf != nil && b.buf.Len() > 0 {
byts := b.buf.Bytes()
_, err = b.Flow.WriteMsg(byts)
b.buf.Reset()
}
return err
}