Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-swarm
Commits
10021b2c
Commit
10021b2c
authored
Jul 31, 2017
by
Steven Allen
Committed by
GitHub
Jul 31, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #26 from libp2p/fix/fdconsuming
Fix dialLimiter.fdConsuming counting
parents
4ba23a77
4a908bfe
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
81 additions
and
4 deletions
+81
-4
limiter.go
limiter.go
+15
-4
limiter_test.go
limiter_test.go
+66
-0
No files found.
limiter.go
View file @
10021b2c
...
@@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter {
...
@@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter {
return
newDialLimiterWithParams
(
df
,
concurrentFdDials
,
defaultPerPeerRateLimit
)
return
newDialLimiterWithParams
(
df
,
concurrentFdDials
,
defaultPerPeerRateLimit
)
}
}
func
newDialLimiterWithParams
(
df
dialfunc
,
fd
l
,
ppl
int
)
*
dialLimiter
{
func
newDialLimiterWithParams
(
df
dialfunc
,
fd
Limit
,
perPeerLimit
int
)
*
dialLimiter
{
return
&
dialLimiter
{
return
&
dialLimiter
{
fdLimit
:
fd
l
,
fdLimit
:
fd
Limit
,
perPeerLimit
:
p
pl
,
perPeerLimit
:
p
erPeerLimit
,
waitingOnPeerLimit
:
make
(
map
[
peer
.
ID
][]
*
dialJob
),
waitingOnPeerLimit
:
make
(
map
[
peer
.
ID
][]
*
dialJob
),
activePerPeer
:
make
(
map
[
peer
.
ID
]
int
),
activePerPeer
:
make
(
map
[
peer
.
ID
]
int
),
dialFunc
:
df
,
dialFunc
:
df
,
...
@@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
...
@@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
if
addrutil
.
IsFDCostlyTransport
(
dj
.
addr
)
{
if
addrutil
.
IsFDCostlyTransport
(
dj
.
addr
)
{
dl
.
fdConsuming
--
dl
.
fdConsuming
--
if
len
(
dl
.
waitingOnFd
)
>
0
{
if
len
(
dl
.
waitingOnFd
)
>
0
{
next
:=
dl
.
waitingOnFd
[
0
]
next
:=
dl
.
waitingOnFd
[
0
]
dl
.
waitingOnFd
=
dl
.
waitingOnFd
[
1
:
]
dl
.
waitingOnFd
=
dl
.
waitingOnFd
[
1
:
]
...
@@ -89,6 +90,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
...
@@ -89,6 +90,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
waitlist
:=
dl
.
waitingOnPeerLimit
[
dj
.
peer
]
waitlist
:=
dl
.
waitingOnPeerLimit
[
dj
.
peer
]
if
!
dj
.
success
&&
len
(
waitlist
)
>
0
{
if
!
dj
.
success
&&
len
(
waitlist
)
>
0
{
next
:=
waitlist
[
0
]
next
:=
waitlist
[
0
]
if
len
(
waitlist
)
==
1
{
if
len
(
waitlist
)
==
1
{
delete
(
dl
.
waitingOnPeerLimit
,
dj
.
peer
)
delete
(
dl
.
waitingOnPeerLimit
,
dj
.
peer
)
}
else
{
}
else
{
...
@@ -96,11 +98,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
...
@@ -96,11 +98,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
}
}
dl
.
activePerPeer
[
dj
.
peer
]
++
// just kidding, we still want this token
dl
.
activePerPeer
[
dj
.
peer
]
++
// just kidding, we still want this token
if
addrutil
.
IsFDCostlyTransport
(
next
.
addr
)
{
if
dl
.
fdConsuming
>=
dl
.
fdLimit
{
dl
.
waitingOnFd
=
append
(
dl
.
waitingOnFd
,
next
)
return
}
// take token
dl
.
fdConsuming
++
}
// can kick this off right here, dials in this list already
// can kick this off right here, dials in this list already
// have the other tokens needed
// have the other tokens needed
go
dl
.
executeDial
(
next
)
go
dl
.
executeDial
(
next
)
}
}
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// AddDialJob tries to take the needed tokens for starting the given dial job.
...
...
limiter_test.go
View file @
10021b2c
...
@@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) {
...
@@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) {
}
}
}
}
}
}
func
TestFDLimitUnderflow
(
t
*
testing
.
T
)
{
dials
:=
0
df
:=
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
a
ma
.
Multiaddr
)
(
iconn
.
Conn
,
error
)
{
dials
++
timeout
:=
make
(
chan
bool
,
1
)
go
func
()
{
time
.
Sleep
(
time
.
Second
*
5
)
timeout
<-
true
}()
select
{
case
<-
ctx
.
Done
()
:
case
<-
timeout
:
}
return
nil
,
fmt
.
Errorf
(
"df timed out"
)
}
l
:=
newDialLimiterWithParams
(
df
,
20
,
3
)
var
addrs
[]
ma
.
Multiaddr
for
i
:=
0
;
i
<=
1000
;
i
++
{
addrs
=
append
(
addrs
,
addrWithPort
(
t
,
i
))
}
for
i
:=
0
;
i
<
1000
;
i
++
{
go
func
(
id
peer
.
ID
,
i
int
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
resp
:=
make
(
chan
dialResult
)
l
.
AddDialJob
(
&
dialJob
{
addr
:
addrs
[
i
],
ctx
:
ctx
,
peer
:
id
,
resp
:
resp
,
})
//cancel first 60 after 1s, next 60 after 2s
if
i
>
60
{
time
.
Sleep
(
time
.
Second
*
1
)
}
if
i
<
120
{
time
.
Sleep
(
time
.
Second
*
1
)
cancel
()
return
}
defer
cancel
()
for
res
:=
range
resp
{
if
res
.
Err
!=
nil
{
return
}
t
.
Fatal
(
"got dial res, shouldn't"
)
}
}(
peer
.
ID
(
fmt
.
Sprintf
(
"testpeer%d"
,
i
%
20
)),
i
)
}
time
.
Sleep
(
time
.
Second
*
3
)
if
l
.
fdConsuming
<
0
{
t
.
Fatalf
(
"l.fdConsuming < 0"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment