blob: 7c64edaca9190793b4fb052950ed8687be43cedb [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package iobuf performs explicit memory management for data buffers used
// to perform network IO. The intent is that it is more efficient to perform
// manual allocation than to rely on the Go garbage collector to manage large
// chunks of frequently recycled memory.
//
// In this model, a Pool is a collection of contiguous memory area (of type
// <buf>) used for memory allocation. The bufs are subdivided into slices of
// type Slice.
//
// Pool: a source of memory areas.
// Slice: a contiguous region of allocated memory.
// Allocator: a Slice allocator.
// Reader: an IO reader that reads into Slices.
//
// There is an analogy with sbrk/malloc: the Pool is the source of memory (like
// sbrk), and the Allocator hands out small areas (like malloc). Allocations
// are mostly sequential within a buf, allowing sequentially-allocated Slices to
// be coalesced at some later point.
//
// For efficiency, Slice values hold reference counts to the underlying buf.
// When all references are to a buf released, the buf is recycled into its Pool.
// This does not happen automatically. The caller is responsible for calling
// slice.Release() when finished using a slice.
package iobuf
import (
"sync"
"sync/atomic"
"v.io/x/ref/internal/logger"
)
// A iobuf is a storage space for memory read from the network. The data should
// be read into the Contents field, then sliced up into Slice slices that
// correspond to header, payload, etc.
//
// iobufs are reference counted. The count includes one reference for the iobuf
// itself, plus one for each Slice.
type buf struct {
refcount int32 // Use atomic operations.
Contents []byte
pool *Pool
}
// Pool manages a pool of iobufs. The size of the pool is not fixed,
// it can grow without bound.
//
// The implementation here allocates a new iobuf whenever there is an allocation
// request and the pool is empty. For iobufs to be recycled, explicit Release()
// calls are required. However, if these Release() calls are missing, the
// program will continue to function, recycling the buffers through the gc.
// Therefore, if you forget Release() calls, you will be putting pressure on gc
// to recycle the iobufs. You can examine the <allocated> field to check how
// many iobufs have been allocated during the lifetime of the Pool.
type Pool struct {
minSize uint
mutex sync.Mutex
freelist []*buf
allocated uint64 // Total number of iobufs allocated.
}
const defaultMinSize = 1 << 12
// NewPool creates a new pool. The pool will allocate iobufs in multiples of minSize.
// If minSize is zero, the default value (4K) will be used.
func NewPool(minSize uint) *Pool {
if minSize == 0 {
minSize = defaultMinSize
}
return &Pool{minSize: minSize, freelist: []*buf{}}
}
// Close shuts down the Pool.
func (pool *Pool) Close() {
pool.mutex.Lock()
pool.freelist = nil
pool.mutex.Unlock()
}
// alloc allocates a new iobuf. The returned iobuf has at least <size> bytes of free space.
func (pool *Pool) alloc(size uint) *buf {
if size == 0 {
size = pool.minSize
} else if r := size % pool.minSize; r > 0 {
size += pool.minSize - r
}
pool.mutex.Lock()
defer pool.mutex.Unlock()
if pool.freelist == nil {
logger.Global().Info("iobuf.Pool is closed")
return nil
}
// Search for an iobuf that is large enough.
for i := len(pool.freelist) - 1; i >= 0; i-- {
iobuf := pool.freelist[i]
if uint(len(iobuf.Contents)) >= size {
pool.freelist[i] = pool.freelist[len(pool.freelist)-1]
pool.freelist = pool.freelist[:len(pool.freelist)-1]
atomic.AddInt32(&iobuf.refcount, 1)
return iobuf
}
}
// All the free buffers are too small. Allocate a fresh one.
pool.allocated++
iobuf := &buf{refcount: 1, Contents: make([]byte, size), pool: pool}
return iobuf
}
// release recycles an iobuf that has a zero refcount.
func (pool *Pool) release(iobuf *buf) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
// TODO(jyh): Ideally we would like to overwrite the iobuf so that if there
// are slices still referring to it (due to a double-Release), it will be
// more likely that the problem is noticed. Implement this if we define a
// "debug mode."
if pool.freelist != nil {
pool.freelist = append(pool.freelist, iobuf)
}
}
// release decrements the iobuf's refcount, recycling the iobuf when the count
// reaches zero.
func (iobuf *buf) release() {
refcount := atomic.AddInt32(&iobuf.refcount, -1)
if refcount < 0 {
logger.Global().Infof("Refcount is negative: %d. This is a bug in the program.", refcount)
}
if refcount == 0 {
iobuf.pool.release(iobuf)
}
}
// slice creates an Slice that refers to a slice of the iobuf contents.
func (iobuf *buf) slice(free, base, bound uint) *Slice {
atomic.AddInt32(&iobuf.refcount, 1)
return &Slice{iobuf: iobuf, free: free, base: base, Contents: iobuf.Contents[base:bound]}
}