Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | // Package iobuf performs explicit memory management for data buffers used |
| 2 | // to perform network IO. The intent is that it is more efficient to perform |
| 3 | // manual allocation than to rely on the Go garbage collector to manage large |
| 4 | // chunks of frequently recycled memory. |
| 5 | // |
| 6 | // In this model, a Pool is a collection of contiguous memory area (of type |
| 7 | // <buf>) used for memory allocation. The bufs are subdivided into slices of |
| 8 | // type Slice. |
| 9 | // |
| 10 | // Pool: a source of memory areas. |
| 11 | // Slice: a contiguous region of allocated memory. |
| 12 | // Allocator: a Slice allocator. |
| 13 | // Reader: an IO reader that reads into Slices. |
| 14 | // |
| 15 | // There is an analogy with sbrk/malloc: the Pool is the source of memory (like |
| 16 | // sbrk), and the Allocator hands out small areas (like malloc). Allocations |
| 17 | // are mostly sequential within a buf, allowing sequentially-allocated Slices to |
| 18 | // be coalesced at some later point. |
| 19 | // |
| 20 | // For efficiency, Slice values hold reference counts to the underlying buf. |
| 21 | // When all references are to a buf released, the buf is recycled into its Pool. |
| 22 | // This does not happen automatically. The caller is responsible for calling |
| 23 | // slice.Release() when finished using a slice. |
| 24 | package iobuf |
| 25 | |
| 26 | import ( |
| 27 | "sync" |
| 28 | "sync/atomic" |
| 29 | |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 30 | "v.io/core/veyron2/vlog" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 31 | ) |
| 32 | |
| 33 | // A iobuf is a storage space for memory read from the network. The data should |
| 34 | // be read into the Contents field, then sliced up into Slice slices that |
| 35 | // correspond to header, payload, etc. |
| 36 | // |
| 37 | // iobufs are reference counted. The count includes one reference for the iobuf |
| 38 | // itself, plus one for each Slice. |
| 39 | type buf struct { |
| 40 | refcount int32 // Use atomic operations. |
| 41 | Contents []byte |
| 42 | pool *Pool |
| 43 | } |
| 44 | |
| 45 | // Pool manages a pool of iobufs. The size of the pool is not fixed, |
| 46 | // it can grow without bound. |
| 47 | // |
| 48 | // The implementation here allocates a new iobuf whenever there is an allocation |
| 49 | // request and the pool is empty. For iobufs to be recycled, explicit Release() |
| 50 | // calls are required. However, if these Release() calls are missing, the |
| 51 | // program will continue to function, recycling the buffers through the gc. |
| 52 | // Therefore, if you forget Release() calls, you will be putting pressure on gc |
| 53 | // to recycle the iobufs. You can examine the <allocated> field to check how |
| 54 | // many iobufs have been allocated during the lifetime of the Pool. |
| 55 | type Pool struct { |
| 56 | minSize uint |
| 57 | mutex sync.Mutex |
| 58 | freelist []*buf |
| 59 | allocated uint64 // Total number of iobufs allocated. |
| 60 | } |
| 61 | |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 62 | const defaultMinSize = 1 << 12 |
| 63 | |
| 64 | // NewPool creates a new pool. The pool will allocate iobufs in multiples of minSize. |
| 65 | // If minSize is zero, the default value (4K) will be used. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 66 | func NewPool(minSize uint) *Pool { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 67 | if minSize == 0 { |
| 68 | minSize = defaultMinSize |
| 69 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 70 | return &Pool{minSize: minSize, freelist: []*buf{}} |
| 71 | } |
| 72 | |
| 73 | // Close shuts down the Pool. |
| 74 | func (pool *Pool) Close() { |
| 75 | pool.mutex.Lock() |
| 76 | pool.freelist = nil |
| 77 | pool.mutex.Unlock() |
| 78 | } |
| 79 | |
| 80 | // alloc allocates a new iobuf. The returned iobuf has at least <size> bytes of free space. |
| 81 | func (pool *Pool) alloc(size uint) *buf { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 82 | if size == 0 { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 83 | size = pool.minSize |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 84 | } else if r := size % pool.minSize; r > 0 { |
| 85 | size += pool.minSize - r |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 86 | } |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 87 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 88 | pool.mutex.Lock() |
| 89 | defer pool.mutex.Unlock() |
| 90 | if pool.freelist == nil { |
| 91 | vlog.Info("iobuf.Pool is closed") |
| 92 | return nil |
| 93 | } |
| 94 | |
| 95 | // Search for an iobuf that is large enough. |
| 96 | for i := len(pool.freelist) - 1; i >= 0; i-- { |
| 97 | iobuf := pool.freelist[i] |
| 98 | if uint(len(iobuf.Contents)) >= size { |
| 99 | pool.freelist[i] = pool.freelist[len(pool.freelist)-1] |
| 100 | pool.freelist = pool.freelist[:len(pool.freelist)-1] |
| 101 | atomic.AddInt32(&iobuf.refcount, 1) |
| 102 | return iobuf |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | // All the free buffers are too small. Allocate a fresh one. |
| 107 | pool.allocated++ |
| 108 | iobuf := &buf{refcount: 1, Contents: make([]byte, size), pool: pool} |
| 109 | return iobuf |
| 110 | } |
| 111 | |
| 112 | // release recycles an iobuf that has a zero refcount. |
| 113 | func (pool *Pool) release(iobuf *buf) { |
| 114 | pool.mutex.Lock() |
| 115 | defer pool.mutex.Unlock() |
| 116 | // TODO(jyh): Ideally we would like to overwrite the iobuf so that if there |
| 117 | // are slices still referring to it (due to a double-Release), it will be |
| 118 | // more likely that the problem is noticed. Implement this if we define a |
| 119 | // "debug mode." |
| 120 | if pool.freelist != nil { |
| 121 | pool.freelist = append(pool.freelist, iobuf) |
| 122 | } |
| 123 | } |
| 124 | |
| 125 | // release decrements the iobuf's refcount, recycling the iobuf when the count |
| 126 | // reaches zero. |
| 127 | func (iobuf *buf) release() { |
| 128 | refcount := atomic.AddInt32(&iobuf.refcount, -1) |
| 129 | if refcount < 0 { |
| 130 | vlog.Infof("Refcount is negative: %d. This is a bug in the program.", refcount) |
| 131 | } |
| 132 | if refcount == 0 { |
| 133 | iobuf.pool.release(iobuf) |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | // slice creates an Slice that refers to a slice of the iobuf contents. |
| 138 | func (iobuf *buf) slice(free, base, bound uint) *Slice { |
| 139 | atomic.AddInt32(&iobuf.refcount, 1) |
| 140 | return &Slice{iobuf: iobuf, free: free, base: base, Contents: iobuf.Contents[base:bound]} |
| 141 | } |