Commit 3d6ef091 authored by Steven Allen's avatar Steven Allen

rewrite to have a fun to use interface

Before, this library assumed that we wanted to use pools for things other than
byte slices. In practice, this isn't the case and supporting such use-cases made
this library annoying to use.
parent 2792e41d
// Package mpool provides a sync.Pool equivalent that buckets incoming // Package pool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32. // requests to one of 32 sub-pools, one for each power of 2, 0-32.
// //
// import "github.com/libp2p/go-msgio/mpool" // import (pool "github.com/libp2p/go-buffer-pool")
// var p mpool.Pool // var p pool.BufferPool
// //
// small := make([]byte, 1024) // small := make([]byte, 1024)
// large := make([]byte, 4194304) // large := make([]byte, 4194304)
// p.Put(1024, small) // p.Put(small)
// p.Put(4194304, large) // p.Put(large)
// //
// small2 := p.Get(1024).([]byte) // small2 := p.Get(1024)
// large2 := p.Get(4194304).([]byte) // large2 := p.Get(4194304)
// fmt.Println("small2 len:", len(small2)) // fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2)) // fmt.Println("large2 len:", len(large2))
// //
...@@ -21,92 +21,75 @@ ...@@ -21,92 +21,75 @@
package pool package pool
import ( import (
"fmt" "math/bits"
"sync" "sync"
) )
// ByteSlicePool is a static Pool for reusing byteslices of various sizes. // GlobalPool is a static Pool for reusing byteslices of various sizes.
var ByteSlicePool = &Pool{ var GlobalPool = new(BufferPool)
New: func(length int) interface{} {
return make([]byte, length)
},
}
// MaxLength is the maximum length of an element that can be added to the Pool. // MaxLength is the maximum length of an element that can be added to the Pool.
const MaxLength = 1 << 32 const MaxLength = 1 << 32
// Pool is a pool to handle cases of reusing elements of varying sizes. // BufferPool is a pool to handle cases of reusing elements of varying sizes. It
// It maintains up to 32 internal pools, for each power of 2 in 0-32. // maintains 32 internal pools, for each power of 2 in 0-32.
type Pool struct { //
// You should generally just call the package level Get and Put methods or use
// the GlobalPool BufferPool instead of constructing your own.
//
// You MUST NOT copy Pool after using.
type BufferPool struct {
pools [32]sync.Pool // a list of singlePools pools [32]sync.Pool // a list of singlePools
// New is a function that constructs a new element in the pool, with given len
New func(len int) interface{}
}
func (p *Pool) getPool(idx uint32) *sync.Pool {
if idx > uint32(len(p.pools)) {
panic(fmt.Errorf("index too large: %d", idx))
}
return &p.pools[idx]
} }
// Get selects an arbitrary item from the Pool, removes it from the Pool, // Get retrieves a buffer of the appropriate length from the buffer pool or
// and returns it to the caller. Get may choose to ignore the pool and // allocates a new one. Get may choose to ignore the pool and treat it as empty.
// treat it as empty. Callers should not assume any relation between values // Callers should not assume any relation between values passed to Put and the
// passed to Put and the values returned by Get. // values returned by Get.
// //
// If Get would otherwise return nil and p.New is non-nil, Get returns the // If no suitable buffer exists in the pool, Get creates one.
// result of calling p.New. func (p *BufferPool) Get(length uint32) []byte {
func (p *Pool) Get(length uint32) interface{} { if length == 0 {
idx := nextPowerOfTwo(length) return nil
sp := p.getPool(idx)
// fmt.Printf("Get(%d) idx(%d)\n", length, idx)
val := sp.Get()
if val == nil && p.New != nil {
val = p.New(0x1 << idx)
} }
return val idx := nextLogBase2(length)
if buf := p.pools[idx].Get(); buf != nil {
return buf.([]byte)[:length]
}
return make([]byte, 1<<idx)[:length]
} }
// Put adds x to the pool. // Put adds x to the pool.
func (p *Pool) Put(length uint32, val interface{}) { func (p *BufferPool) Put(buf []byte) {
idx := prevPowerOfTwo(length) capacity := cap(buf)
// fmt.Printf("Put(%d, -) idx(%d)\n", length, idx) if capacity == 0 || capacity > MaxLength {
sp := p.getPool(idx) return // drop it
sp.Put(val) }
idx := prevLogBase2(uint32(capacity))
p.pools[idx].Put(buf)
} }
func nextPowerOfTwo(v uint32) uint32 { // Get retrieves a buffer of the appropriate length from the global buffer pool
// fmt.Printf("nextPowerOfTwo(%d) ", v) // (or allocates a new one).
v-- func Get(length uint32) []byte {
v |= v >> 1 return GlobalPool.Get(length)
v |= v >> 2 }
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
// fmt.Printf("-> %d", v)
i := uint32(0) // Put returns a buffer to the global buffer pool.
for ; v > 1; i++ { func Put(slice []byte) {
v = v >> 1 GlobalPool.Put(slice)
} }
// fmt.Printf("-> %d\n", i) // Log of base two, round up (for v > 0).
return i func nextLogBase2(v uint32) uint32 {
return uint32(bits.Len32(v - 1))
} }
func prevPowerOfTwo(num uint32) uint32 { // Log of base two, round down (for v > 0)
next := nextPowerOfTwo(num) func prevLogBase2(num uint32) uint32 {
// fmt.Printf("prevPowerOfTwo(%d) next: %d", num, next) next := nextLogBase2(num)
switch { if num == (1 << uint32(next)) {
case num == (1 << next): // num is a power of 2 return next
case next == 0:
default:
next = next - 1 // smaller
} }
// fmt.Printf(" = %d\n", next) return next - 1
return next
} }
...@@ -8,153 +8,61 @@ ...@@ -8,153 +8,61 @@
package pool package pool
import ( import (
"bytes"
"fmt" "fmt"
"math/rand" "math/rand"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"sync/atomic"
"testing" "testing"
"time"
) )
func TestPool(t *testing.T) { func TestPool(t *testing.T) {
// disable GC so we can control when it happens. // disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1)) defer debug.SetGCPercent(debug.SetGCPercent(-1))
var p Pool var p BufferPool
if p.Get(10) != nil {
t.Fatal("expected empty") a := make([]byte, 21)
} a[0] = 1
p.Put(16, "a") b := make([]byte, 2050)
p.Put(2048, "b") b[0] = 2
if g := p.Get(16); g != "a" { p.Put(a)
t.Fatalf("got %#v; want a", g) p.Put(b)
if g := p.Get(16); &g[0] != &a[0] {
t.Fatalf("got [%d,...]; want [1,...]", g[0])
} }
if g := p.Get(2048); g != "b" { if g := p.Get(2048); &g[0] != &b[0] {
t.Fatalf("got %#v; want b", g) t.Fatalf("got [%d,...]; want [2,...]", g[0])
} }
if g := p.Get(16); g != nil { if g := p.Get(16); cap(g) != 16 || !bytes.Equal(g[:16], make([]byte, 16)) {
t.Fatalf("got %#v; want nil", g) t.Fatalf("got existing slice; want new slice")
} }
if g := p.Get(2048); g != nil { if g := p.Get(2048); cap(g) != 2048 || !bytes.Equal(g[:2048], make([]byte, 2048)) {
t.Fatalf("got %#v; want nil", g) t.Fatalf("got existing slice; want new slice")
} }
if g := p.Get(1); g != nil { if g := p.Get(1); cap(g) != 1 || !bytes.Equal(g[:1], make([]byte, 1)) {
t.Fatalf("got %#v; want nil", g) t.Fatalf("got existing slice; want new slice")
} }
p.Put(1023, "d") d := make([]byte, 1023)
if g := p.Get(1024); g != nil { d[0] = 3
t.Fatalf("got %#v; want nil", g) p.Put(d)
if g := p.Get(1024); cap(g) != 1024 || !bytes.Equal(g, make([]byte, 1024)) {
t.Fatalf("got existing slice; want new slice")
} }
if g := p.Get(512); g != "d" { if g := p.Get(512); cap(g) != 1023 || g[0] != 3 {
t.Fatalf("got %#v; want d", g) t.Fatalf("got [%d,...]; want [3,...]", g[0])
} }
p.Put(a)
debug.SetGCPercent(100) // to allow following GC to actually run debug.SetGCPercent(100) // to allow following GC to actually run
runtime.GC() runtime.GC()
if g := p.Get(10); g != nil { if g := p.Get(10); &g[0] == &a[0] {
t.Fatalf("got %#v; want nil after GC", g) t.Fatalf("got a; want new slice after GC")
}
}
func TestPoolNew(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
s := [32]int{}
p := Pool{
New: func(length int) interface{} {
idx := nextPowerOfTwo(uint32(length))
s[idx]++
return s[idx]
},
}
if v := p.Get(1 << 5); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 2 {
t.Fatalf("got %v; want 2", v)
}
if v := p.Get(1 << 5); v != 2 {
t.Fatalf("got %v; want 2", v)
}
p.Put(1<<2, 42)
p.Put(1<<5, 42)
if v := p.Get(1 << 2); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 2); v != 3 {
t.Fatalf("got %v; want 3", v)
}
if v := p.Get(1 << 5); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 5); v != 3 {
t.Fatalf("got %v; want 3", v)
}
}
// Test that Pool does not hold pointers to previously cached
// resources
func TestPoolGC(t *testing.T) {
var p Pool
var fin uint32
const N = 100
for i := 0; i < N; i++ {
v := new(string)
runtime.SetFinalizer(v, func(vv *string) {
atomic.AddUint32(&fin, 1)
})
p.Put(uint32(i), v)
}
for i := 0; i < N; i++ {
p.Get(uint32(i))
}
for i := 0; i < 5; i++ {
runtime.GC()
time.Sleep(time.Duration(i*100+10) * time.Millisecond)
// 1 pointer can remain on stack or elsewhere
if atomic.LoadUint32(&fin) >= N-1 {
return
}
}
t.Fatalf("only %v out of %v resources are finalized",
atomic.LoadUint32(&fin), N)
}
func TestPoolStress(t *testing.T) {
const P = 10
N := int(1e6)
if testing.Short() {
N /= 100
}
var p Pool
done := make(chan bool)
for i := 0; i < P; i++ {
go func() {
var v interface{} = 0
for j := 0; j < N; j++ {
if v == nil {
v = 0
}
p.Put(uint32(j), v)
v = p.Get(uint32(j))
if v != nil && v.(int) != 0 {
t.Fatalf("expect 0, got %v", v)
}
}
done <- true
}()
}
for i := 0; i < P; i++ {
// fmt.Printf("%d/%d\n", i, P)
<-done
} }
} }
func TestPoolStressByteSlicePool(t *testing.T) { func TestPoolStressByteSlicePool(t *testing.T) {
var p BufferPool
const P = 10 const P = 10
chs := 10 chs := 10
maxSize := uint32(1 << 16) maxSize := uint32(1 << 16)
...@@ -162,7 +70,6 @@ func TestPoolStressByteSlicePool(t *testing.T) { ...@@ -162,7 +70,6 @@ func TestPoolStressByteSlicePool(t *testing.T) {
if testing.Short() { if testing.Short() {
N /= 100 N /= 100
} }
p := ByteSlicePool
done := make(chan bool) done := make(chan bool)
errs := make(chan error) errs := make(chan error)
for i := 0; i < P; i++ { for i := 0; i < P; i++ {
...@@ -171,16 +78,16 @@ func TestPoolStressByteSlicePool(t *testing.T) { ...@@ -171,16 +78,16 @@ func TestPoolStressByteSlicePool(t *testing.T) {
for i := 0; i < chs; i++ { for i := 0; i < chs; i++ {
j := rand.Uint32() % maxSize j := rand.Uint32() % maxSize
ch <- p.Get(j).([]byte) ch <- p.Get(j)
} }
for j := 0; j < N; j++ { for j := 0; j < N; j++ {
r := uint32(0) r := uint32(0)
for i := 0; i < chs; i++ { for i := 0; i < chs; i++ {
v := <-ch v := <-ch
p.Put(uint32(cap(v)), v) p.Put(v)
r = rand.Uint32() % maxSize r = rand.Uint32() % maxSize
v = p.Get(r).([]byte) v = p.Get(r)
if uint32(len(v)) < r { if uint32(len(v)) < r {
errs <- fmt.Errorf("expect len(v) >= %d, got %d", j, len(v)) errs <- fmt.Errorf("expect len(v) >= %d, got %d", j, len(v))
} }
...@@ -199,7 +106,6 @@ func TestPoolStressByteSlicePool(t *testing.T) { ...@@ -199,7 +106,6 @@ func TestPoolStressByteSlicePool(t *testing.T) {
select { select {
case <-done: case <-done:
i++ i++
// fmt.Printf("%d/%d\n", i, P)
case err := <-errs: case err := <-errs:
t.Error(err) t.Error(err)
} }
...@@ -207,50 +113,34 @@ func TestPoolStressByteSlicePool(t *testing.T) { ...@@ -207,50 +113,34 @@ func TestPoolStressByteSlicePool(t *testing.T) {
} }
func BenchmarkPool(b *testing.B) { func BenchmarkPool(b *testing.B) {
var p Pool var p BufferPool
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
i := 0 i := uint32(7)
for pb.Next() { for pb.Next() {
i = i << 1 if i > 1<<20 {
p.Put(uint32(i), 1) i = 7
p.Get(uint32(i)) } else {
i = i << 1
}
b := p.Get(i)
p.Put(b)
} }
}) })
} }
func BenchmarkPoolOverlflow(b *testing.B) { func BenchmarkPoolOverlflow(b *testing.B) {
var p Pool var p BufferPool
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
for pow := uint32(0); pow < 32; pow++ { bufs := make([][]byte, 2100)
for b := 0; b < 100; b++ { for pow := uint32(0); pow < 21; pow++ {
p.Put(uint32(1<<pow), 1) for i := 0; i < 100; i++ {
bufs = append(bufs, p.Get(uint32(1<<pow)))
} }
} }
for pow := uint32(0); pow < 32; pow++ { for _, b := range bufs {
for b := 0; b < 100; b++ { p.Put(b)
p.Get(uint32(1 << pow))
}
} }
} }
}) })
} }
// DISABLED: This example is *not* guaranteed to work. This buffer pool can
// choose to ignore the interior pool and return `nil`.
//func ExamplePool() {
// var p Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(uint32(len(small)), small)
// p.Put(uint32(len(large)), large)
//
// small2 := p.Get(uint32(len(small))).([]byte)
// large2 := p.Get(uint32(len(large))).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment