Commit 4a908bfe authored by Łukasz Magiera's avatar Łukasz Magiera

Fix dialLimiter.fdConsuming counting

parent 40dcf43d
......@@ -6,7 +6,7 @@ os:
language: go
go:
- 1.7
- 1.8
install: true
......
......@@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}
func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
......@@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
......@@ -89,6 +90,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
......@@ -96,11 +98,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token
if addrutil.IsFDCostlyTransport(next.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, next)
return
}
// take token
dl.fdConsuming++
}
// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
......
......@@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) {
}
}
}
func TestFDLimitUnderflow(t *testing.T) {
dials := 0
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) {
dials++
timeout := make(chan bool, 1)
go func() {
time.Sleep(time.Second * 5)
timeout <- true
}()
select {
case <-ctx.Done():
case <-timeout:
}
return nil, fmt.Errorf("df timed out")
}
l := newDialLimiterWithParams(df, 20, 3)
var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
addrs = append(addrs, addrWithPort(t, i))
}
for i := 0; i < 1000; i++ {
go func(id peer.ID, i int) {
ctx, cancel := context.WithCancel(context.Background())
resp := make(chan dialResult)
l.AddDialJob(&dialJob{
addr: addrs[i],
ctx: ctx,
peer: id,
resp: resp,
})
//cancel first 60 after 1s, next 60 after 2s
if i > 60 {
time.Sleep(time.Second * 1)
}
if i < 120 {
time.Sleep(time.Second * 1)
cancel()
return
}
defer cancel()
for res := range resp {
if res.Err != nil {
return
}
t.Fatal("got dial res, shouldn't")
}
}(peer.ID(fmt.Sprintf("testpeer%d", i % 20)), i)
}
time.Sleep(time.Second * 3)
if l.fdConsuming < 0 {
t.Fatalf("l.fdConsuming < 0")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment