Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-msgio
Commits
58542711
Commit
58542711
authored
Dec 20, 2017
by
Steven Allen
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
extract buffer pool (and simplify interface)
parent
d82125c9
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
35 additions
and
396 deletions
+35
-396
chan.go
chan.go
+3
-3
mpool/pool.go
mpool/pool.go
+0
-112
mpool/pool_test.go
mpool/pool_test.go
+0
-256
msgio.go
msgio.go
+14
-14
package.json
package.json
+6
-0
varint.go
varint.go
+12
-11
No files found.
chan.go
View file @
58542711
...
...
@@ -3,7 +3,7 @@ package msgio
import
(
"io"
m
pool
"github.com/libp2p/go-
msgio/m
pool"
pool
"github.com/libp2p/go-
buffer-
pool"
)
// Chan is a msgio duplex channel. It is used to have a channel interface
...
...
@@ -30,8 +30,8 @@ func (s *Chan) ReadFrom(r io.Reader) {
}
// ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel. Uses given Pool
func
(
s
*
Chan
)
ReadFromWithPool
(
r
io
.
Reader
,
p
*
m
pool
.
Pool
)
{
// messages, ands sends them down the channel. Uses given
Buffer
Pool
.
func
(
s
*
Chan
)
ReadFromWithPool
(
r
io
.
Reader
,
p
*
pool
.
Buffer
Pool
)
{
s
.
readFrom
(
NewReaderWithPool
(
r
,
p
))
}
...
...
mpool/pool.go
deleted
100644 → 0
View file @
d82125c9
// Package mpool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32.
//
// import "github.com/libp2p/go-msgio/mpool"
// var p mpool.Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(1024, small)
// p.Put(4194304, large)
//
// small2 := p.Get(1024).([]byte)
// large2 := p.Get(4194304).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
//
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//
package
mpool
import
(
"fmt"
"sync"
)
// ByteSlicePool is a static Pool for reusing byteslices of various sizes.
var
ByteSlicePool
=
&
Pool
{
New
:
func
(
length
int
)
interface
{}
{
return
make
([]
byte
,
length
)
},
}
// MaxLength is the maximum length of an element that can be added to the Pool.
const
MaxLength
=
1
<<
32
// Pool is a pool to handle cases of reusing elements of varying sizes.
// It maintains up to 32 internal pools, for each power of 2 in 0-32.
type
Pool
struct
{
pools
[
32
]
sync
.
Pool
// a list of singlePools
// New is a function that constructs a new element in the pool, with given len
New
func
(
len
int
)
interface
{}
}
func
(
p
*
Pool
)
getPool
(
idx
uint32
)
*
sync
.
Pool
{
if
idx
>
uint32
(
len
(
p
.
pools
))
{
panic
(
fmt
.
Errorf
(
"index too large: %d"
,
idx
))
}
return
&
p
.
pools
[
idx
]
}
// Get selects an arbitrary item from the Pool, removes it from the Pool,
// and returns it to the caller. Get may choose to ignore the pool and
// treat it as empty. Callers should not assume any relation between values
// passed to Put and the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns the
// result of calling p.New.
func
(
p
*
Pool
)
Get
(
length
uint32
)
interface
{}
{
idx
:=
nextPowerOfTwo
(
length
)
sp
:=
p
.
getPool
(
idx
)
// fmt.Printf("Get(%d) idx(%d)\n", length, idx)
val
:=
sp
.
Get
()
if
val
==
nil
&&
p
.
New
!=
nil
{
val
=
p
.
New
(
0x1
<<
idx
)
}
return
val
}
// Put adds x to the pool.
func
(
p
*
Pool
)
Put
(
length
uint32
,
val
interface
{})
{
idx
:=
prevPowerOfTwo
(
length
)
// fmt.Printf("Put(%d, -) idx(%d)\n", length, idx)
sp
:=
p
.
getPool
(
idx
)
sp
.
Put
(
val
)
}
func
nextPowerOfTwo
(
v
uint32
)
uint32
{
// fmt.Printf("nextPowerOfTwo(%d) ", v)
v
--
v
|=
v
>>
1
v
|=
v
>>
2
v
|=
v
>>
4
v
|=
v
>>
8
v
|=
v
>>
16
v
++
// fmt.Printf("-> %d", v)
i
:=
uint32
(
0
)
for
;
v
>
1
;
i
++
{
v
=
v
>>
1
}
// fmt.Printf("-> %d\n", i)
return
i
}
func
prevPowerOfTwo
(
num
uint32
)
uint32
{
next
:=
nextPowerOfTwo
(
num
)
// fmt.Printf("prevPowerOfTwo(%d) next: %d", num, next)
switch
{
case
num
==
(
1
<<
next
)
:
// num is a power of 2
case
next
==
0
:
default
:
next
=
next
-
1
// smaller
}
// fmt.Printf(" = %d\n", next)
return
next
}
mpool/pool_test.go
deleted
100644 → 0
View file @
d82125c9
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pool is no-op under race detector, so all these tests do not work.
// +build !race
package
mpool
import
(
"fmt"
"math/rand"
"runtime"
"runtime/debug"
"sync/atomic"
"testing"
"time"
)
func
TestPool
(
t
*
testing
.
T
)
{
// disable GC so we can control when it happens.
defer
debug
.
SetGCPercent
(
debug
.
SetGCPercent
(
-
1
))
var
p
Pool
if
p
.
Get
(
10
)
!=
nil
{
t
.
Fatal
(
"expected empty"
)
}
p
.
Put
(
16
,
"a"
)
p
.
Put
(
2048
,
"b"
)
if
g
:=
p
.
Get
(
16
);
g
!=
"a"
{
t
.
Fatalf
(
"got %#v; want a"
,
g
)
}
if
g
:=
p
.
Get
(
2048
);
g
!=
"b"
{
t
.
Fatalf
(
"got %#v; want b"
,
g
)
}
if
g
:=
p
.
Get
(
16
);
g
!=
nil
{
t
.
Fatalf
(
"got %#v; want nil"
,
g
)
}
if
g
:=
p
.
Get
(
2048
);
g
!=
nil
{
t
.
Fatalf
(
"got %#v; want nil"
,
g
)
}
if
g
:=
p
.
Get
(
1
);
g
!=
nil
{
t
.
Fatalf
(
"got %#v; want nil"
,
g
)
}
p
.
Put
(
1023
,
"d"
)
if
g
:=
p
.
Get
(
1024
);
g
!=
nil
{
t
.
Fatalf
(
"got %#v; want nil"
,
g
)
}
if
g
:=
p
.
Get
(
512
);
g
!=
"d"
{
t
.
Fatalf
(
"got %#v; want d"
,
g
)
}
debug
.
SetGCPercent
(
100
)
// to allow following GC to actually run
runtime
.
GC
()
if
g
:=
p
.
Get
(
10
);
g
!=
nil
{
t
.
Fatalf
(
"got %#v; want nil after GC"
,
g
)
}
}
func
TestPoolNew
(
t
*
testing
.
T
)
{
// disable GC so we can control when it happens.
defer
debug
.
SetGCPercent
(
debug
.
SetGCPercent
(
-
1
))
s
:=
[
32
]
int
{}
p
:=
Pool
{
New
:
func
(
length
int
)
interface
{}
{
idx
:=
nextPowerOfTwo
(
uint32
(
length
))
s
[
idx
]
++
return
s
[
idx
]
},
}
if
v
:=
p
.
Get
(
1
<<
5
);
v
!=
1
{
t
.
Fatalf
(
"got %v; want 1"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
2
);
v
!=
1
{
t
.
Fatalf
(
"got %v; want 1"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
2
);
v
!=
2
{
t
.
Fatalf
(
"got %v; want 2"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
5
);
v
!=
2
{
t
.
Fatalf
(
"got %v; want 2"
,
v
)
}
p
.
Put
(
1
<<
2
,
42
)
p
.
Put
(
1
<<
5
,
42
)
if
v
:=
p
.
Get
(
1
<<
2
);
v
!=
42
{
t
.
Fatalf
(
"got %v; want 42"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
2
);
v
!=
3
{
t
.
Fatalf
(
"got %v; want 3"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
5
);
v
!=
42
{
t
.
Fatalf
(
"got %v; want 42"
,
v
)
}
if
v
:=
p
.
Get
(
1
<<
5
);
v
!=
3
{
t
.
Fatalf
(
"got %v; want 3"
,
v
)
}
}
// Test that Pool does not hold pointers to previously cached
// resources
func
TestPoolGC
(
t
*
testing
.
T
)
{
var
p
Pool
var
fin
uint32
const
N
=
100
for
i
:=
0
;
i
<
N
;
i
++
{
v
:=
new
(
string
)
runtime
.
SetFinalizer
(
v
,
func
(
vv
*
string
)
{
atomic
.
AddUint32
(
&
fin
,
1
)
})
p
.
Put
(
uint32
(
i
),
v
)
}
for
i
:=
0
;
i
<
N
;
i
++
{
p
.
Get
(
uint32
(
i
))
}
for
i
:=
0
;
i
<
5
;
i
++
{
runtime
.
GC
()
time
.
Sleep
(
time
.
Duration
(
i
*
100
+
10
)
*
time
.
Millisecond
)
// 1 pointer can remain on stack or elsewhere
if
atomic
.
LoadUint32
(
&
fin
)
>=
N
-
1
{
return
}
}
t
.
Fatalf
(
"only %v out of %v resources are finalized"
,
atomic
.
LoadUint32
(
&
fin
),
N
)
}
func
TestPoolStress
(
t
*
testing
.
T
)
{
const
P
=
10
N
:=
int
(
1e6
)
if
testing
.
Short
()
{
N
/=
100
}
var
p
Pool
done
:=
make
(
chan
bool
)
for
i
:=
0
;
i
<
P
;
i
++
{
go
func
()
{
var
v
interface
{}
=
0
for
j
:=
0
;
j
<
N
;
j
++
{
if
v
==
nil
{
v
=
0
}
p
.
Put
(
uint32
(
j
),
v
)
v
=
p
.
Get
(
uint32
(
j
))
if
v
!=
nil
&&
v
.
(
int
)
!=
0
{
t
.
Fatalf
(
"expect 0, got %v"
,
v
)
}
}
done
<-
true
}()
}
for
i
:=
0
;
i
<
P
;
i
++
{
// fmt.Printf("%d/%d\n", i, P)
<-
done
}
}
func
TestPoolStressByteSlicePool
(
t
*
testing
.
T
)
{
const
P
=
10
chs
:=
10
maxSize
:=
uint32
(
1
<<
16
)
N
:=
int
(
1e4
)
if
testing
.
Short
()
{
N
/=
100
}
p
:=
ByteSlicePool
done
:=
make
(
chan
bool
)
errs
:=
make
(
chan
error
)
for
i
:=
0
;
i
<
P
;
i
++
{
go
func
()
{
ch
:=
make
(
chan
[]
byte
,
chs
+
1
)
for
i
:=
0
;
i
<
chs
;
i
++
{
j
:=
rand
.
Uint32
()
%
maxSize
ch
<-
p
.
Get
(
j
)
.
([]
byte
)
}
for
j
:=
0
;
j
<
N
;
j
++
{
r
:=
uint32
(
0
)
for
i
:=
0
;
i
<
chs
;
i
++
{
v
:=
<-
ch
p
.
Put
(
uint32
(
cap
(
v
)),
v
)
r
=
rand
.
Uint32
()
%
maxSize
v
=
p
.
Get
(
r
)
.
([]
byte
)
if
uint32
(
len
(
v
))
<
r
{
errs
<-
fmt
.
Errorf
(
"expect len(v) >= %d, got %d"
,
j
,
len
(
v
))
}
ch
<-
v
}
if
r
%
1000
==
0
{
runtime
.
GC
()
}
}
done
<-
true
}()
}
for
i
:=
0
;
i
<
P
;
{
select
{
case
<-
done
:
i
++
// fmt.Printf("%d/%d\n", i, P)
case
err
:=
<-
errs
:
t
.
Error
(
err
)
}
}
}
func
BenchmarkPool
(
b
*
testing
.
B
)
{
var
p
Pool
b
.
RunParallel
(
func
(
pb
*
testing
.
PB
)
{
i
:=
0
for
pb
.
Next
()
{
i
=
i
<<
1
p
.
Put
(
uint32
(
i
),
1
)
p
.
Get
(
uint32
(
i
))
}
})
}
func
BenchmarkPoolOverlflow
(
b
*
testing
.
B
)
{
var
p
Pool
b
.
RunParallel
(
func
(
pb
*
testing
.
PB
)
{
for
pb
.
Next
()
{
for
pow
:=
uint32
(
0
);
pow
<
32
;
pow
++
{
for
b
:=
0
;
b
<
100
;
b
++
{
p
.
Put
(
uint32
(
1
<<
pow
),
1
)
}
}
for
pow
:=
uint32
(
0
);
pow
<
32
;
pow
++
{
for
b
:=
0
;
b
<
100
;
b
++
{
p
.
Get
(
uint32
(
1
<<
pow
))
}
}
}
})
}
// DISABLED: This example is *not* guaranteed to work. This buffer pool can
// choose to ignore the interior pool and return `nil`.
//func ExamplePool() {
// var p Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(uint32(len(small)), small)
// p.Put(uint32(len(large)), large)
//
// small2 := p.Get(uint32(len(small))).([]byte)
// large2 := p.Get(uint32(len(large))).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//}
msgio.go
View file @
58542711
...
...
@@ -5,7 +5,7 @@ import (
"io"
"sync"
m
pool
"github.com/libp2p/go-
msgio/m
pool"
pool
"github.com/libp2p/go-
buffer-
pool"
)
// ErrMsgTooLarge is returned when the message length is exessive
...
...
@@ -41,9 +41,8 @@ type Reader interface {
Read
([]
byte
)
(
int
,
error
)
// ReadMsg reads the next message from the Reader.
// Uses a mpool.Pool internally to reuse buffers. io.ErrShortBuffer will
// be returned if the Pool.Get(...) returns nil.
// User may call ReleaseMsg(msg) to signal a buffer can be reused.
// Uses a pool.BufferPool internally to reuse buffers. User may call
// ReleaseMsg(msg) to signal a buffer can be reused.
ReadMsg
()
([]
byte
,
error
)
// ReleaseMsg signals a buffer can be reused.
...
...
@@ -117,7 +116,7 @@ type reader struct {
lbuf
[]
byte
next
int
pool
*
m
pool
.
Pool
pool
*
pool
.
Buffer
Pool
lock
sync
.
Locker
max
int
// the maximal message size (in bytes) this reader handles
}
...
...
@@ -126,13 +125,13 @@ type reader struct {
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side.
func
NewReader
(
r
io
.
Reader
)
ReadCloser
{
return
NewReaderWithPool
(
r
,
m
pool
.
ByteSlice
Pool
)
return
NewReaderWithPool
(
r
,
pool
.
Global
Pool
)
}
// NewReaderWithPool wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side. It uses a given
m
pool.Pool
func
NewReaderWithPool
(
r
io
.
Reader
,
p
*
m
pool
.
Pool
)
ReadCloser
{
// writer on the other side. It uses a given pool.
Buffer
Pool
func
NewReaderWithPool
(
r
io
.
Reader
,
p
*
pool
.
Buffer
Pool
)
ReadCloser
{
if
p
==
nil
{
panic
(
"nil pool"
)
}
...
...
@@ -194,22 +193,23 @@ func (s *reader) ReadMsg() ([]byte, error) {
return
nil
,
err
}
if
length
==
0
{
s
.
next
=
-
1
return
nil
,
nil
}
if
length
>
s
.
max
||
length
<
0
{
return
nil
,
ErrMsgTooLarge
}
msgb
:=
s
.
pool
.
Get
(
uint32
(
length
))
if
msgb
==
nil
{
return
nil
,
io
.
ErrShortBuffer
}
msg
:=
msgb
.
([]
byte
)[
:
length
]
msg
:=
s
.
pool
.
Get
(
length
)
_
,
err
=
io
.
ReadFull
(
s
.
R
,
msg
)
s
.
next
=
-
1
// signal we've consumed this msg
return
msg
,
err
}
func
(
s
*
reader
)
ReleaseMsg
(
msg
[]
byte
)
{
s
.
pool
.
Put
(
uint32
(
cap
(
msg
)),
msg
)
s
.
pool
.
Put
(
msg
)
}
func
(
s
*
reader
)
Close
()
error
{
...
...
package.json
View file @
58542711
...
...
@@ -9,6 +9,12 @@
"hash"
:
"QmYNGtJHgaGZkpzq8yG6Wxqm6EQTKqgpBfnyyGBKbZeDUi"
,
"name"
:
"go-randbuf"
,
"version"
:
"0.0.0"
},
{
"author"
:
"Stebalien"
,
"hash"
:
"QmUQy76yspPa3fRyY3GzXFTg9n8JVwFru6ue3KFRt4MeTw"
,
"name"
:
"go-buffer-pool"
,
"version"
:
"0.1.1"
}
],
"gxVersion"
:
"0.11.0"
,
...
...
varint.go
View file @
58542711
...
...
@@ -5,7 +5,7 @@ import (
"io"
"sync"
m
pool
"github.com/libp2p/go-
msgio/m
pool"
pool
"github.com/libp2p/go-
buffer-
pool"
)
// varintWriter is the underlying type that implements the Writer interface.
...
...
@@ -62,7 +62,7 @@ type varintReader struct {
lbuf
[]
byte
next
int
pool
*
m
pool
.
Pool
pool
*
pool
.
Buffer
Pool
lock
sync
.
Locker
max
int
// the maximal message size (in bytes) this reader handles
}
...
...
@@ -72,14 +72,15 @@ type varintReader struct {
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side.
func
NewVarintReader
(
r
io
.
Reader
)
ReadCloser
{
return
NewVarintReaderWithPool
(
r
,
m
pool
.
ByteSlice
Pool
)
return
NewVarintReaderWithPool
(
r
,
pool
.
Global
Pool
)
}
// NewVarintReaderWithPool wraps an io.Reader with a varint msgio framed reader.
// The msgio.Reader will read whole messages at a time (using the length).
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side. It uses a given mpool.Pool
func
NewVarintReaderWithPool
(
r
io
.
Reader
,
p
*
mpool
.
Pool
)
ReadCloser
{
// Assumes an equivalent writer on the other side. It uses a given
// pool.BufferPool.
func
NewVarintReaderWithPool
(
r
io
.
Reader
,
p
*
pool
.
BufferPool
)
ReadCloser
{
if
p
==
nil
{
panic
(
"nil pool"
)
}
...
...
@@ -139,23 +140,23 @@ func (s *varintReader) ReadMsg() ([]byte, error) {
if
err
!=
nil
{
return
nil
,
err
}
if
length
==
0
{
s
.
next
=
-
1
return
nil
,
nil
}
if
length
>
s
.
max
{
return
nil
,
ErrMsgTooLarge
}
msgb
:=
s
.
pool
.
Get
(
uint32
(
length
))
if
msgb
==
nil
{
return
nil
,
io
.
ErrShortBuffer
}
msg
:=
msgb
.
([]
byte
)[
:
length
]
msg
:=
s
.
pool
.
Get
(
length
)
_
,
err
=
io
.
ReadFull
(
s
.
R
,
msg
)
s
.
next
=
-
1
// signal we've consumed this msg
return
msg
,
err
}
func
(
s
*
varintReader
)
ReleaseMsg
(
msg
[]
byte
)
{
s
.
pool
.
Put
(
uint32
(
cap
(
msg
)),
msg
)
s
.
pool
.
Put
(
msg
)
}
func
(
s
*
varintReader
)
Close
()
error
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment