Unverified Commit 5b7d4ee2 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #9 from libp2p/feat/extract-mpool

extract buffer pool (and simplify interface)
parents d82125c9 58542711
...@@ -3,7 +3,7 @@ package msgio ...@@ -3,7 +3,7 @@ package msgio
import ( import (
"io" "io"
mpool "github.com/libp2p/go-msgio/mpool" pool "github.com/libp2p/go-buffer-pool"
) )
// Chan is a msgio duplex channel. It is used to have a channel interface // Chan is a msgio duplex channel. It is used to have a channel interface
...@@ -30,8 +30,8 @@ func (s *Chan) ReadFrom(r io.Reader) { ...@@ -30,8 +30,8 @@ func (s *Chan) ReadFrom(r io.Reader) {
} }
// ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all // ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel. Uses given Pool // messages, ands sends them down the channel. Uses given BufferPool.
func (s *Chan) ReadFromWithPool(r io.Reader, p *mpool.Pool) { func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) {
s.readFrom(NewReaderWithPool(r, p)) s.readFrom(NewReaderWithPool(r, p))
} }
......
// Package mpool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32.
//
// import "github.com/libp2p/go-msgio/mpool"
// var p mpool.Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(1024, small)
// p.Put(4194304, large)
//
// small2 := p.Get(1024).([]byte)
// large2 := p.Get(4194304).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
//
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//
package mpool
import (
"fmt"
"sync"
)
// ByteSlicePool is a static Pool for reusing byteslices of various sizes.
var ByteSlicePool = &Pool{
New: func(length int) interface{} {
return make([]byte, length)
},
}
// MaxLength is the maximum length of an element that can be added to the Pool.
const MaxLength = 1 << 32
// Pool is a pool to handle cases of reusing elements of varying sizes.
// It maintains up to 32 internal pools, for each power of 2 in 0-32.
type Pool struct {
pools [32]sync.Pool // a list of singlePools
// New is a function that constructs a new element in the pool, with given len
New func(len int) interface{}
}
func (p *Pool) getPool(idx uint32) *sync.Pool {
if idx > uint32(len(p.pools)) {
panic(fmt.Errorf("index too large: %d", idx))
}
return &p.pools[idx]
}
// Get selects an arbitrary item from the Pool, removes it from the Pool,
// and returns it to the caller. Get may choose to ignore the pool and
// treat it as empty. Callers should not assume any relation between values
// passed to Put and the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns the
// result of calling p.New.
func (p *Pool) Get(length uint32) interface{} {
idx := nextPowerOfTwo(length)
sp := p.getPool(idx)
// fmt.Printf("Get(%d) idx(%d)\n", length, idx)
val := sp.Get()
if val == nil && p.New != nil {
val = p.New(0x1 << idx)
}
return val
}
// Put adds x to the pool.
func (p *Pool) Put(length uint32, val interface{}) {
idx := prevPowerOfTwo(length)
// fmt.Printf("Put(%d, -) idx(%d)\n", length, idx)
sp := p.getPool(idx)
sp.Put(val)
}
func nextPowerOfTwo(v uint32) uint32 {
// fmt.Printf("nextPowerOfTwo(%d) ", v)
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
// fmt.Printf("-> %d", v)
i := uint32(0)
for ; v > 1; i++ {
v = v >> 1
}
// fmt.Printf("-> %d\n", i)
return i
}
func prevPowerOfTwo(num uint32) uint32 {
next := nextPowerOfTwo(num)
// fmt.Printf("prevPowerOfTwo(%d) next: %d", num, next)
switch {
case num == (1 << next): // num is a power of 2
case next == 0:
default:
next = next - 1 // smaller
}
// fmt.Printf(" = %d\n", next)
return next
}
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pool is no-op under race detector, so all these tests do not work.
// +build !race
package mpool
import (
"fmt"
"math/rand"
"runtime"
"runtime/debug"
"sync/atomic"
"testing"
"time"
)
func TestPool(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
var p Pool
if p.Get(10) != nil {
t.Fatal("expected empty")
}
p.Put(16, "a")
p.Put(2048, "b")
if g := p.Get(16); g != "a" {
t.Fatalf("got %#v; want a", g)
}
if g := p.Get(2048); g != "b" {
t.Fatalf("got %#v; want b", g)
}
if g := p.Get(16); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(2048); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(1); g != nil {
t.Fatalf("got %#v; want nil", g)
}
p.Put(1023, "d")
if g := p.Get(1024); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(512); g != "d" {
t.Fatalf("got %#v; want d", g)
}
debug.SetGCPercent(100) // to allow following GC to actually run
runtime.GC()
if g := p.Get(10); g != nil {
t.Fatalf("got %#v; want nil after GC", g)
}
}
func TestPoolNew(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
s := [32]int{}
p := Pool{
New: func(length int) interface{} {
idx := nextPowerOfTwo(uint32(length))
s[idx]++
return s[idx]
},
}
if v := p.Get(1 << 5); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 2 {
t.Fatalf("got %v; want 2", v)
}
if v := p.Get(1 << 5); v != 2 {
t.Fatalf("got %v; want 2", v)
}
p.Put(1<<2, 42)
p.Put(1<<5, 42)
if v := p.Get(1 << 2); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 2); v != 3 {
t.Fatalf("got %v; want 3", v)
}
if v := p.Get(1 << 5); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 5); v != 3 {
t.Fatalf("got %v; want 3", v)
}
}
// Test that Pool does not hold pointers to previously cached
// resources
func TestPoolGC(t *testing.T) {
var p Pool
var fin uint32
const N = 100
for i := 0; i < N; i++ {
v := new(string)
runtime.SetFinalizer(v, func(vv *string) {
atomic.AddUint32(&fin, 1)
})
p.Put(uint32(i), v)
}
for i := 0; i < N; i++ {
p.Get(uint32(i))
}
for i := 0; i < 5; i++ {
runtime.GC()
time.Sleep(time.Duration(i*100+10) * time.Millisecond)
// 1 pointer can remain on stack or elsewhere
if atomic.LoadUint32(&fin) >= N-1 {
return
}
}
t.Fatalf("only %v out of %v resources are finalized",
atomic.LoadUint32(&fin), N)
}
func TestPoolStress(t *testing.T) {
const P = 10
N := int(1e6)
if testing.Short() {
N /= 100
}
var p Pool
done := make(chan bool)
for i := 0; i < P; i++ {
go func() {
var v interface{} = 0
for j := 0; j < N; j++ {
if v == nil {
v = 0
}
p.Put(uint32(j), v)
v = p.Get(uint32(j))
if v != nil && v.(int) != 0 {
t.Fatalf("expect 0, got %v", v)
}
}
done <- true
}()
}
for i := 0; i < P; i++ {
// fmt.Printf("%d/%d\n", i, P)
<-done
}
}
func TestPoolStressByteSlicePool(t *testing.T) {
const P = 10
chs := 10
maxSize := uint32(1 << 16)
N := int(1e4)
if testing.Short() {
N /= 100
}
p := ByteSlicePool
done := make(chan bool)
errs := make(chan error)
for i := 0; i < P; i++ {
go func() {
ch := make(chan []byte, chs+1)
for i := 0; i < chs; i++ {
j := rand.Uint32() % maxSize
ch <- p.Get(j).([]byte)
}
for j := 0; j < N; j++ {
r := uint32(0)
for i := 0; i < chs; i++ {
v := <-ch
p.Put(uint32(cap(v)), v)
r = rand.Uint32() % maxSize
v = p.Get(r).([]byte)
if uint32(len(v)) < r {
errs <- fmt.Errorf("expect len(v) >= %d, got %d", j, len(v))
}
ch <- v
}
if r%1000 == 0 {
runtime.GC()
}
}
done <- true
}()
}
for i := 0; i < P; {
select {
case <-done:
i++
// fmt.Printf("%d/%d\n", i, P)
case err := <-errs:
t.Error(err)
}
}
}
func BenchmarkPool(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
i = i << 1
p.Put(uint32(i), 1)
p.Get(uint32(i))
}
})
}
func BenchmarkPoolOverlflow(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for pow := uint32(0); pow < 32; pow++ {
for b := 0; b < 100; b++ {
p.Put(uint32(1<<pow), 1)
}
}
for pow := uint32(0); pow < 32; pow++ {
for b := 0; b < 100; b++ {
p.Get(uint32(1 << pow))
}
}
}
})
}
// DISABLED: This example is *not* guaranteed to work. This buffer pool can
// choose to ignore the interior pool and return `nil`.
//func ExamplePool() {
// var p Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(uint32(len(small)), small)
// p.Put(uint32(len(large)), large)
//
// small2 := p.Get(uint32(len(small))).([]byte)
// large2 := p.Get(uint32(len(large))).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//}
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"io" "io"
"sync" "sync"
mpool "github.com/libp2p/go-msgio/mpool" pool "github.com/libp2p/go-buffer-pool"
) )
// ErrMsgTooLarge is returned when the message length is exessive // ErrMsgTooLarge is returned when the message length is exessive
...@@ -41,9 +41,8 @@ type Reader interface { ...@@ -41,9 +41,8 @@ type Reader interface {
Read([]byte) (int, error) Read([]byte) (int, error)
// ReadMsg reads the next message from the Reader. // ReadMsg reads the next message from the Reader.
// Uses a mpool.Pool internally to reuse buffers. io.ErrShortBuffer will // Uses a pool.BufferPool internally to reuse buffers. User may call
// be returned if the Pool.Get(...) returns nil. // ReleaseMsg(msg) to signal a buffer can be reused.
// User may call ReleaseMsg(msg) to signal a buffer can be reused.
ReadMsg() ([]byte, error) ReadMsg() ([]byte, error)
// ReleaseMsg signals a buffer can be reused. // ReleaseMsg signals a buffer can be reused.
...@@ -117,7 +116,7 @@ type reader struct { ...@@ -117,7 +116,7 @@ type reader struct {
lbuf []byte lbuf []byte
next int next int
pool *mpool.Pool pool *pool.BufferPool
lock sync.Locker lock sync.Locker
max int // the maximal message size (in bytes) this reader handles max int // the maximal message size (in bytes) this reader handles
} }
...@@ -126,13 +125,13 @@ type reader struct { ...@@ -126,13 +125,13 @@ type reader struct {
// will read whole messages at a time (using the length). Assumes an equivalent // will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side. // writer on the other side.
func NewReader(r io.Reader) ReadCloser { func NewReader(r io.Reader) ReadCloser {
return NewReaderWithPool(r, mpool.ByteSlicePool) return NewReaderWithPool(r, pool.GlobalPool)
} }
// NewReaderWithPool wraps an io.Reader with a msgio framed reader. The msgio.Reader // NewReaderWithPool wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent // will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side. It uses a given mpool.Pool // writer on the other side. It uses a given pool.BufferPool
func NewReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser { func NewReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
if p == nil { if p == nil {
panic("nil pool") panic("nil pool")
} }
...@@ -194,22 +193,23 @@ func (s *reader) ReadMsg() ([]byte, error) { ...@@ -194,22 +193,23 @@ func (s *reader) ReadMsg() ([]byte, error) {
return nil, err return nil, err
} }
if length == 0 {
s.next = -1
return nil, nil
}
if length > s.max || length < 0 { if length > s.max || length < 0 {
return nil, ErrMsgTooLarge return nil, ErrMsgTooLarge
} }
msgb := s.pool.Get(uint32(length)) msg := s.pool.Get(length)
if msgb == nil {
return nil, io.ErrShortBuffer
}
msg := msgb.([]byte)[:length]
_, err = io.ReadFull(s.R, msg) _, err = io.ReadFull(s.R, msg)
s.next = -1 // signal we've consumed this msg s.next = -1 // signal we've consumed this msg
return msg, err return msg, err
} }
func (s *reader) ReleaseMsg(msg []byte) { func (s *reader) ReleaseMsg(msg []byte) {
s.pool.Put(uint32(cap(msg)), msg) s.pool.Put(msg)
} }
func (s *reader) Close() error { func (s *reader) Close() error {
......
...@@ -9,6 +9,12 @@ ...@@ -9,6 +9,12 @@
"hash": "QmYNGtJHgaGZkpzq8yG6Wxqm6EQTKqgpBfnyyGBKbZeDUi", "hash": "QmYNGtJHgaGZkpzq8yG6Wxqm6EQTKqgpBfnyyGBKbZeDUi",
"name": "go-randbuf", "name": "go-randbuf",
"version": "0.0.0" "version": "0.0.0"
},
{
"author": "Stebalien",
"hash": "QmUQy76yspPa3fRyY3GzXFTg9n8JVwFru6ue3KFRt4MeTw",
"name": "go-buffer-pool",
"version": "0.1.1"
} }
], ],
"gxVersion": "0.11.0", "gxVersion": "0.11.0",
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"io" "io"
"sync" "sync"
mpool "github.com/libp2p/go-msgio/mpool" pool "github.com/libp2p/go-buffer-pool"
) )
// varintWriter is the underlying type that implements the Writer interface. // varintWriter is the underlying type that implements the Writer interface.
...@@ -62,7 +62,7 @@ type varintReader struct { ...@@ -62,7 +62,7 @@ type varintReader struct {
lbuf []byte lbuf []byte
next int next int
pool *mpool.Pool pool *pool.BufferPool
lock sync.Locker lock sync.Locker
max int // the maximal message size (in bytes) this reader handles max int // the maximal message size (in bytes) this reader handles
} }
...@@ -72,14 +72,15 @@ type varintReader struct { ...@@ -72,14 +72,15 @@ type varintReader struct {
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint // Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side. // Assumes an equivalent writer on the other side.
func NewVarintReader(r io.Reader) ReadCloser { func NewVarintReader(r io.Reader) ReadCloser {
return NewVarintReaderWithPool(r, mpool.ByteSlicePool) return NewVarintReaderWithPool(r, pool.GlobalPool)
} }
// NewVarintReaderWithPool wraps an io.Reader with a varint msgio framed reader. // NewVarintReaderWithPool wraps an io.Reader with a varint msgio framed reader.
// The msgio.Reader will read whole messages at a time (using the length). // The msgio.Reader will read whole messages at a time (using the length).
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint // Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side. It uses a given mpool.Pool // Assumes an equivalent writer on the other side. It uses a given
func NewVarintReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser { // pool.BufferPool.
func NewVarintReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
if p == nil { if p == nil {
panic("nil pool") panic("nil pool")
} }
...@@ -139,23 +140,23 @@ func (s *varintReader) ReadMsg() ([]byte, error) { ...@@ -139,23 +140,23 @@ func (s *varintReader) ReadMsg() ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if length == 0 {
s.next = -1
return nil, nil
}
if length > s.max { if length > s.max {
return nil, ErrMsgTooLarge return nil, ErrMsgTooLarge
} }
msgb := s.pool.Get(uint32(length)) msg := s.pool.Get(length)
if msgb == nil {
return nil, io.ErrShortBuffer
}
msg := msgb.([]byte)[:length]
_, err = io.ReadFull(s.R, msg) _, err = io.ReadFull(s.R, msg)
s.next = -1 // signal we've consumed this msg s.next = -1 // signal we've consumed this msg
return msg, err return msg, err
} }
func (s *varintReader) ReleaseMsg(msg []byte) { func (s *varintReader) ReleaseMsg(msg []byte) {
s.pool.Put(uint32(cap(msg)), msg) s.pool.Put(msg)
} }
func (s *varintReader) Close() error { func (s *varintReader) Close() error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment