Commit 108afdf5 authored by Jeromy's avatar Jeromy

don't execute cancelled jobs

parent 34d63fed
......@@ -24,6 +24,15 @@ type dialJob struct {
success bool
}
func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}
type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
......@@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}
con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
......
......@@ -2,6 +2,7 @@ package swarm
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
......@@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) {
l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)
bads := []ma.Multiaddr{
addrWithPort(t, 1),
addrWithPort(t, 2),
addrWithPort(t, 3),
addrWithPort(t, 4),
}
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
resch := make(chan dialResult)
......@@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) {
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select {
......@@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) {
t.Fatal("should have gotten successful dial")
}
}
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil
} else {
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment