Commit 97e0f73e authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

msgio: updated to remove max size

parent 5c64ee90
......@@ -107,7 +107,7 @@
},
{
"ImportPath": "github.com/jbenet/go-msgio",
"Rev": "ab0e7a0e111d7c7d814ad238bcbf3934efb76ac3"
"Rev": "8361f0f6c783f09419ec3d8dd75fe950744f5e91"
},
{
"ImportPath": "github.com/jbenet/go-multiaddr",
......
......@@ -2,17 +2,19 @@ package msgio
import (
"io"
"sync"
mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
)
// Chan is a msgio duplex channel. It is used to have a channel interface
// around a msgio.Reader or Writer.
type Chan struct {
Buffers [][]byte
MsgChan chan []byte
ErrChan chan error
CloseChan chan bool
BufPool *sync.Pool
}
// NewChan constructs a Chan with a given buffer size.
func NewChan(chanSize int) *Chan {
return &Chan{
MsgChan: make(chan []byte, chanSize),
......@@ -21,36 +23,27 @@ func NewChan(chanSize int) *Chan {
}
}
func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan {
return &Chan{
MsgChan: make(chan []byte, chanSize),
ErrChan: make(chan error, 1),
CloseChan: make(chan bool, 2),
BufPool: pool,
}
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel.
func (s *Chan) ReadFrom(r io.Reader) {
s.readFrom(NewReader(r))
}
func (s *Chan) getBuffer(size int) []byte {
if s.BufPool == nil {
return make([]byte, size)
} else {
bufi := s.BufPool.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("Got invalid type from sync pool!")
}
return buf
}
// ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel. Uses given Pool
func (s *Chan) ReadFromWithPool(r io.Reader, p *mpool.Pool) {
s.readFrom(NewReaderWithPool(r, p))
}
func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mr := NewReader(r)
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel.
func (s *Chan) readFrom(mr Reader) {
// single reader, no need for Mutex
mr.(*reader).lock = new(nullLocker)
Loop:
for {
buf := s.getBuffer(maxMsgLen)
l, err := mr.ReadMsg(buf)
buf, err := mr.ReadMsg()
if err != nil {
if err == io.EOF {
break Loop // done
......@@ -64,7 +57,7 @@ Loop:
select {
case <-s.CloseChan:
break Loop // told we're done
case s.MsgChan <- buf[:l]:
case s.MsgChan <- buf:
// ok seems fine. send it away
}
}
......@@ -74,10 +67,15 @@ Loop:
s.CloseChan <- true
}
// WriteTo wraps the given io.Writer with a msgio.Writer, listens on the
// channel and writes all messages to the writer.
func (s *Chan) WriteTo(w io.Writer) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mw := NewWriter(w)
// single writer, no need for Mutex
mw.(*writer).lock = new(nullLocker)
Loop:
for {
select {
......@@ -104,6 +102,13 @@ Loop:
s.CloseChan <- true
}
// Close the Chan
func (s *Chan) Close() {
s.CloseChan <- true
}
// nullLocker conforms to the sync.Locker interface but does nothing.
type nullLocker struct{}
func (l *nullLocker) Lock() {}
func (l *nullLocker) Unlock() {}
......@@ -5,7 +5,6 @@ import (
randbuf "github.com/jbenet/go-randbuf"
"io"
"math/rand"
"sync"
"testing"
"time"
)
......@@ -13,8 +12,7 @@ import (
func TestReadChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
p := &sync.Pool{New: func() interface{} { return make([]byte, 1000) }}
rchan := NewChan(10, p)
rchan := NewChan(10)
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
......@@ -30,7 +28,7 @@ func TestReadChan(t *testing.T) {
t.Fatal(err)
}
go rchan.ReadFrom(buf, 1000)
go rchan.ReadFrom(buf)
defer rchan.Close()
Loop:
......@@ -60,7 +58,7 @@ Loop:
func TestWriteChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
reader := NewReader(buf)
wchan := NewChan(10, nil)
wchan := NewChan(10)
msgs := [1000][]byte{}
go wchan.WriteTo(buf)
......@@ -87,8 +85,7 @@ func TestWriteChan(t *testing.T) {
defer wchan.Close()
for i := 0; ; i++ {
msg2 := make([]byte, 1000)
n, err := reader.ReadMsg(msg2)
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
......@@ -100,7 +97,6 @@ func TestWriteChan(t *testing.T) {
}
msg1 := msgs[i]
msg2 = msg2[:n]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
......
// Package mpool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32.
//
// import "github.com/jbenet/go-msgio/mpool"
// var p mpool.Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(1024, small)
// p.Put(4194304, large)
//
// small2 := p.Get(1024).([]byte)
// large2 := p.Get(4194304).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
//
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//
package mpool
import (
"fmt"
"sync"
)
// ByteSlicePool is a static Pool for reusing byteslices of various sizes.
var ByteSlicePool Pool
func init() {
ByteSlicePool.New = func(length int) interface{} {
return make([]byte, length)
}
}
// MaxLength is the maximum length of an element that can be added to the Pool.
const MaxLength = 1 << 32
// Pool is a pool to handle cases of reusing elements of varying sizes.
// It maintains up to 32 internal pools, for each power of 2 in 0-32.
type Pool struct {
small int // the size of the first pool
pools [32]*sync.Pool // a list of singlePools
sync.Mutex // protecting list
// New is a function that constructs a new element in the pool, with given len
New func(len int) interface{}
}
func (p *Pool) getPool(idx uint32) *sync.Pool {
if idx > uint32(len(p.pools)) {
panic(fmt.Errorf("index too large: %d", idx))
}
p.Lock()
defer p.Unlock()
sp := p.pools[idx]
if sp == nil {
sp = new(sync.Pool)
p.pools[idx] = sp
}
return sp
}
// Get selects an arbitrary item from the Pool, removes it from the Pool,
// and returns it to the caller. Get may choose to ignore the pool and
// treat it as empty. Callers should not assume any relation between values
// passed to Put and the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns the
// result of calling p.New.
func (p *Pool) Get(length uint32) interface{} {
idx := largerPowerOfTwo(length)
sp := p.getPool(idx)
val := sp.Get()
if val == nil && p.New != nil {
val = p.New(0x1 << idx)
}
return val
}
// Put adds x to the pool.
func (p *Pool) Put(length uint32, val interface{}) {
idx := smallerPowerOfTwo(length)
sp := p.getPool(idx)
sp.Put(val)
}
func largerPowerOfTwo(num uint32) uint32 {
for p := uint32(0); p < 32; p++ {
if (0x1 << p) >= num {
return p
}
}
panic("unreachable")
}
func smallerPowerOfTwo(num uint32) uint32 {
for p := uint32(1); p < 32; p++ {
if (0x1 << p) > num {
return p - 1
}
}
panic("unreachable")
}
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pool is no-op under race detector, so all these tests do not work.
// +build !race
package mpool
import (
"fmt"
"runtime"
"runtime/debug"
"sync/atomic"
"testing"
"time"
)
func TestPool(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
var p Pool
if p.Get(10) != nil {
t.Fatal("expected empty")
}
p.Put(16, "a")
p.Put(2048, "b")
if g := p.Get(16); g != "a" {
t.Fatalf("got %#v; want a", g)
}
if g := p.Get(2048); g != "b" {
t.Fatalf("got %#v; want b", g)
}
if g := p.Get(16); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(2048); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(1); g != nil {
t.Fatalf("got %#v; want nil", g)
}
p.Put(1023, "d")
if g := p.Get(1024); g != nil {
t.Fatalf("got %#v; want nil", g)
}
if g := p.Get(512); g != "d" {
t.Fatalf("got %#v; want d", g)
}
debug.SetGCPercent(100) // to allow following GC to actually run
runtime.GC()
if g := p.Get(10); g != nil {
t.Fatalf("got %#v; want nil after GC", g)
}
}
func TestPoolNew(t *testing.T) {
// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))
s := [32]int{}
p := Pool{
New: func(length int) interface{} {
idx := largerPowerOfTwo(uint32(length))
s[idx]++
return s[idx]
},
}
if v := p.Get(1 << 5); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 1 {
t.Fatalf("got %v; want 1", v)
}
if v := p.Get(1 << 2); v != 2 {
t.Fatalf("got %v; want 2", v)
}
if v := p.Get(1 << 5); v != 2 {
t.Fatalf("got %v; want 2", v)
}
p.Put(1<<2, 42)
p.Put(1<<5, 42)
if v := p.Get(1 << 2); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 2); v != 3 {
t.Fatalf("got %v; want 3", v)
}
if v := p.Get(1 << 5); v != 42 {
t.Fatalf("got %v; want 42", v)
}
if v := p.Get(1 << 5); v != 3 {
t.Fatalf("got %v; want 3", v)
}
}
// Test that Pool does not hold pointers to previously cached
// resources
func TestPoolGC(t *testing.T) {
var p Pool
var fin uint32
const N = 100
for i := 0; i < N; i++ {
v := new(string)
runtime.SetFinalizer(v, func(vv *string) {
atomic.AddUint32(&fin, 1)
})
p.Put(uint32(i), v)
}
for i := 0; i < N; i++ {
p.Get(uint32(i))
}
for i := 0; i < 5; i++ {
runtime.GC()
time.Sleep(time.Duration(i*100+10) * time.Millisecond)
// 1 pointer can remain on stack or elsewhere
if atomic.LoadUint32(&fin) >= N-1 {
return
}
}
t.Fatalf("only %v out of %v resources are finalized",
atomic.LoadUint32(&fin), N)
}
func TestPoolStress(t *testing.T) {
const P = 10
N := int(1e6)
if testing.Short() {
N /= 100
}
var p Pool
done := make(chan bool)
for i := 0; i < P; i++ {
go func() {
var v interface{} = 0
for j := 0; j < N; j++ {
if v == nil {
v = 0
}
p.Put(uint32(j), v)
v = p.Get(uint32(j))
if v != nil && v.(int) != 0 {
t.Fatalf("expect 0, got %v", v)
}
}
done <- true
}()
}
for i := 0; i < P; i++ {
<-done
}
}
func BenchmarkPool(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
i = i << 1
p.Put(uint32(i), 1)
p.Get(uint32(i))
}
})
}
func BenchmarkPoolOverlflow(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for pow := uint32(0); pow < 32; pow++ {
for b := 0; b < 100; b++ {
p.Put(uint32(1<<pow), 1)
}
}
for pow := uint32(0); pow < 32; pow++ {
for b := 0; b < 100; b++ {
p.Get(uint32(1 << pow))
}
}
}
})
}
func ExamplePool() {
var p Pool
small := make([]byte, 1024)
large := make([]byte, 4194304)
p.Put(uint32(len(small)), small)
p.Put(uint32(len(large)), large)
small2 := p.Get(uint32(len(small))).([]byte)
large2 := p.Get(uint32(len(large))).([]byte)
fmt.Println("small2 len:", len(small2))
fmt.Println("large2 len:", len(large2))
// Output:
// small2 len: 1024
// large2 len: 4194304
}
......@@ -3,48 +3,90 @@ package msgio
import (
"encoding/binary"
"io"
"sync"
mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
)
// NBO is NetworkByteOrder
var NBO = binary.BigEndian
const lengthSize = 4
// Writer is the msgio Writer interface. It writes len-framed messages.
type Writer interface {
// Write writes passed in buffer as a single message.
Write([]byte) error
// WriteMsg writes the msg in the passed in buffer.
WriteMsg([]byte) error
}
// WriteCloser is a Writer + Closer interface. Like in `golang/pkg/io`
type WriteCloser interface {
Writer
io.Closer
}
// Reader is the msgio Reader interface. It reads len-framed messages.
type Reader interface {
ReadMsg([]byte) (int, error)
// Read reads the next message from the Reader.
// The client must pass a buffer large enough, or io.ErrShortBuffer will be
// returned.
Read([]byte) (int, error)
// ReadMsg reads the next message from the Reader.
// Uses a mpool.Pool internally to reuse buffers. io.ErrShortBuffer will
// be returned if the Pool.Get(...) returns nil.
// User may call ReleaseMsg(msg) to signal a buffer can be reused.
ReadMsg() ([]byte, error)
// ReleaseMsg signals a buffer can be reused.
ReleaseMsg([]byte)
}
// ReadCloser combines a Reader and Closer.
type ReadCloser interface {
Reader
io.Closer
}
// ReadWriter combines a Reader and Writer.
type ReadWriter interface {
Reader
Writer
}
// ReadWriteCloser combines a Reader, a Writer, and Closer.
type ReadWriteCloser interface {
Reader
Writer
io.Closer
}
type Writer_ struct {
// writer is the underlying type that implements the Writer interface.
type writer struct {
W io.Writer
lock sync.Locker
}
// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer
// will write the length prefix of every message written.
func NewWriter(w io.Writer) WriteCloser {
return &Writer_{w}
return &writer{W: w, lock: new(sync.Mutex)}
}
func (s *writer) Write(msg []byte) (err error) {
return s.WriteMsg(msg)
}
func (s *Writer_) WriteMsg(msg []byte) (err error) {
func (s *writer) WriteMsg(msg []byte) (err error) {
s.lock.Lock()
defer s.lock.Unlock()
length := uint32(len(msg))
if err := binary.Write(s.W, NBO, &length); err != nil {
return err
......@@ -53,59 +95,161 @@ func (s *Writer_) WriteMsg(msg []byte) (err error) {
return err
}
func (s *Writer_) Close() error {
func (s *writer) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
return nil
}
type Reader_ struct {
R io.Reader
// reader is the underlying type that implements the Reader interface.
type reader struct {
R io.Reader
lbuf []byte
next int
pool *mpool.Pool
lock sync.Locker
}
// NewReader wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side.
func NewReader(r io.Reader) ReadCloser {
return &Reader_{r, make([]byte, 4)}
return NewReaderWithPool(r, &mpool.ByteSlicePool)
}
// NewReaderWithPool wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side. It uses a given mpool.Pool
func NewReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser {
if p == nil {
panic("nil pool")
}
return &reader{
R: r,
lbuf: make([]byte, lengthSize),
next: -1,
pool: p,
lock: new(sync.Mutex),
}
}
// nextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like ReadMsg, nextMsgLen is destructive. It reads from the internal
// reader.
func (s *reader) nextMsgLen() (int, error) {
if s.next == -1 {
if _, err := io.ReadFull(s.R, s.lbuf); err != nil {
return 0, err
}
s.next = int(NBO.Uint32(s.lbuf))
}
return s.next, nil
}
func (s *Reader_) ReadMsg(msg []byte) (int, error) {
if _, err := io.ReadFull(s.R, s.lbuf); err != nil {
func (s *reader) Read(msg []byte) (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return 0, err
}
length := int(NBO.Uint32(s.lbuf))
if length < 0 || length > len(msg) {
if length > len(msg) {
return 0, io.ErrShortBuffer
}
_, err := io.ReadFull(s.R, msg[:length])
_, err = io.ReadFull(s.R, msg[:length])
s.next = -1 // signal we've consumed this msg
return length, err
}
func (s *Reader_) Close() error {
func (s *reader) ReadMsg() ([]byte, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return nil, err
}
msgb := s.pool.Get(uint32(length))
if msgb == nil {
return nil, io.ErrShortBuffer
}
msg := msgb.([]byte)[:length]
_, err = io.ReadFull(s.R, msg)
s.next = -1 // signal we've consumed this msg
return msg, err
}
func (s *reader) ReleaseMsg(msg []byte) {
s.pool.Put(uint32(cap(msg)), msg)
}
func (s *reader) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
return nil
}
type ReadWriter_ struct {
// readWriter is the underlying type that implements a ReadWriter.
type readWriter struct {
Reader
Writer
}
// NewReadWriter wraps an io.ReadWriter with a msgio.ReadWriter. Writing
// and Reading will be appropriately framed.
func NewReadWriter(rw io.ReadWriter) ReadWriter {
return &ReadWriter_{
return &readWriter{
Reader: NewReader(rw),
Writer: NewWriter(rw),
}
}
func (rw *ReadWriter_) Close() error {
func (rw *readWriter) Close() error {
var errs []error
if w, ok := rw.Writer.(WriteCloser); ok {
return w.Close()
if err := w.Close(); err != nil {
errs = append(errs, err)
}
}
if r, ok := rw.Reader.(ReadCloser); ok {
return r.Close()
if err := r.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return multiErr(errs)
}
return nil
}
// multiErr is a util to return multiple errors
type multiErr []error
func (m multiErr) Error() string {
if len(m) == 0 {
return "no errors"
}
s := "Multiple errors: "
for i, e := range m {
if i != 0 {
s += ", "
}
s += e.Error()
}
return s
}
......@@ -2,14 +2,16 @@ package msgio
import (
"bytes"
"fmt"
randbuf "github.com/jbenet/go-randbuf"
"io"
"math/rand"
"sync"
"testing"
"time"
)
func TestReaderWriter(t *testing.T) {
func TestReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
......@@ -18,7 +20,7 @@ func TestReaderWriter(t *testing.T) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randbuf.RandBuf(r, r.Intn(1000))
err := writer.WriteMsg(msgs[i])
err := writer.Write(msgs[i])
if err != nil {
t.Fatal(err)
}
......@@ -30,7 +32,7 @@ func TestReaderWriter(t *testing.T) {
for i := 0; ; i++ {
msg2 := make([]byte, 1000)
n, err := reader.ReadMsg(msg2)
n, err := reader.Read(msg2)
if err != nil {
if err == io.EOF {
if i < len(msg2) {
......@@ -52,3 +54,114 @@ func TestReaderWriter(t *testing.T) {
t.Error(err)
}
}
func TestReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randbuf.RandBuf(r, r.Intn(1000))
err := writer.WriteMsg(msgs[i])
if err != nil {
t.Fatal(err)
}
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
for i := 0; ; i++ {
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
t.Error("failed to read all messages", len(msgs), i)
}
break
}
t.Error("unexpected error", err)
}
msg1 := msgs[i]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}
if err := reader.Close(); err != nil {
t.Error(err)
}
}
func TestReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randbuf.RandBuf(r, r.Intn(1000)+4)
NBO.PutUint32(msgs[i][:4], uint32(i))
}
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
errs := make(chan error, 10000)
for i := range msgs {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
err := writer.WriteMsg(msgs[i])
if err != nil {
errs <- err
}
}(i)
}
wg1.Wait()
if err := writer.Close(); err != nil {
t.Fatal(err)
}
for i := 0; i < len(msgs)+1; i++ {
wg2.Add(1)
go func(i int) {
defer wg2.Done()
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
errs <- fmt.Errorf("failed to read all messages", len(msgs), i)
}
return
}
errs <- fmt.Errorf("unexpected error", err)
}
mi := NBO.Uint32(msg2[:4])
msg1 := msgs[mi]
if !bytes.Equal(msg1, msg2) {
errs <- fmt.Errorf("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}(i)
}
wg2.Wait()
close(errs)
if err := reader.Close(); err != nil {
t.Error(err)
}
for e := range errs {
t.Error(e)
}
}
// Package mpool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32.
//
// import "github.com/jbenet/go-msgio/mpool"
// var p mpool.Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(1024, small)
// p.Put(4194304, large)
//
// small2 := p.Get(1024).([]byte)
// large2 := p.Get(4194304).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
//
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//
package mpool
import (
"fmt"
"sync"
)
// ByteSlicePool is a static Pool for reusing byteslices of various sizes.
var ByteSlicePool Pool
func init() {
ByteSlicePool.New = func(length int) interface{} {
return make([]byte, length)
}
}
// MaxLength is the maximum length of an element that can be added to the Pool.
const MaxLength = (1 << 32) - 1
// Pool is a pool to handle cases of reusing elements of varying sizes.
// It maintains up to 32 internal pools, for each power of 2 in 0-32.
type Pool struct {
small int // the size of the first pool
pools [32]*sync.Pool // a list of singlePools
sync.Mutex // protecting list
// New is a function that constructs a new element in the pool, with given len
New func(len int) interface{}
}
func (p *Pool) getPool(idx uint32) *sync.Pool {
if idx > uint32(len(p.pools)) {
panic(fmt.Errorf("index too large: %d", idx))
}
p.Lock()
defer p.Unlock()
sp := p.pools[idx]
if sp == nil {
sp = new(sync.Pool)
p.pools[idx] = sp
}
return sp
}
// Get selects an arbitrary item from the Pool, removes it from the Pool,
// and returns it to the caller. Get may choose to ignore the pool and
// treat it as empty. Callers should not assume any relation between values
// passed to Put and the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns the
// result of calling p.New.
func (p *Pool) Get(length uint32) interface{} {
idx := largerPowerOfTwo(length)
sp := p.getPool(idx)
val := sp.Get()
if val == nil && p.New != nil {
val = p.New(0x1 << idx)
}
return val
}
// Put adds x to the pool.
func (p *Pool) Put(length uint32, val interface{}) {
if length > MaxLength {
length = MaxLength
}
idx := smallerPowerOfTwo(length)
sp := p.getPool(idx)
sp.Put(val)
}
func largerPowerOfTwo(num uint32) uint32 {
for p := uint32(0); p < 32; p++ {
if (0x1 << p) >= num {
return p
}
}
panic("unreachable")
}
func smallerPowerOfTwo(num uint32) uint32 {
for p := uint32(1); p < 32; p++ {
if (0x1 << p) > num {
return p - 1
}
}
panic("unreachable")
}
......@@ -2,11 +2,11 @@ package conn
import (
"fmt"
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
......@@ -28,26 +28,11 @@ const (
HandshakeTimeout = time.Second * 5
)
// global static buffer pool for byte arrays of size MaxMessageSize
var BufferPool *sync.Pool
func init() {
BufferPool = new(sync.Pool)
BufferPool.New = func() interface{} {
log.Warning("Pool returning new object")
return make([]byte, MaxMessageSize)
}
}
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
func ReleaseBuffer(b []byte) {
log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
if cap(b) != MaxMessageSize {
log.Warning("Release buffer failed (cap, size = %d, %d)", cap(b), len(b))
return
}
BufferPool.Put(b[:cap(b)])
mpool.ByteSlicePool.Put(uint32(cap(b)), b)
}
// msgioPipe is a pipe using msgio channels.
......@@ -56,10 +41,10 @@ type msgioPipe struct {
incoming *msgio.Chan
}
func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe {
func newMsgioPipe(size int) *msgioPipe {
return &msgioPipe{
outgoing: msgio.NewChan(size),
incoming: msgio.NewChanWithPool(size, pool),
incoming: msgio.NewChan(size),
}
}
......@@ -81,7 +66,7 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
local: local,
remote: remote,
maconn: maconn,
msgio: newMsgioPipe(10, BufferPool),
msgio: newMsgioPipe(10),
}
conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
......@@ -96,7 +81,7 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
}()
conn.Children().Add(1)
go func() {
conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
conn.msgio.incoming.ReadFromWithPool(maconn, &mpool.ByteSlicePool)
conn.Children().Done()
}()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment