diff --git a/.travis.yml b/.travis.yml index 448d69047474a3582bbdb7672509bb32b9726e38..96b2d5fa0886611e2a4b6cbfccaeae35f33a5ca3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ os: language: go go: - - 1.7 + - 1.8 install: true diff --git a/limiter.go b/limiter.go index 67c6af4ddfc4f37df0d8e6fea23c39f879707f24..f2513d8e10ad1a140d0976c1abf5d15f9af4d7cb 100644 --- a/limiter.go +++ b/limiter.go @@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter { return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) } -func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { +func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter { return &dialLimiter{ - fdLimit: fdl, - perPeerLimit: ppl, + fdLimit: fdLimit, + perPeerLimit: perPeerLimit, waitingOnPeerLimit: make(map[peer.ID][]*dialJob), activePerPeer: make(map[peer.ID]int), dialFunc: df, @@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { if addrutil.IsFDCostlyTransport(dj.addr) { dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { next := dl.waitingOnFd[0] dl.waitingOnFd = dl.waitingOnFd[1:] @@ -89,6 +90,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { waitlist := dl.waitingOnPeerLimit[dj.peer] if !dj.success && len(waitlist) > 0 { next := waitlist[0] + if len(waitlist) == 1 { delete(dl.waitingOnPeerLimit, dj.peer) } else { @@ -96,11 +98,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { } dl.activePerPeer[dj.peer]++ // just kidding, we still want this token + if addrutil.IsFDCostlyTransport(next.addr) { + if dl.fdConsuming >= dl.fdLimit { + dl.waitingOnFd = append(dl.waitingOnFd, next) + return + } + + // take token + dl.fdConsuming++ + } + // can kick this off right here, dials in this list already // have the other tokens needed go dl.executeDial(next) } - } // AddDialJob tries to take the needed tokens for starting the given dial job. diff --git a/limiter_test.go b/limiter_test.go index cbb6afdbc6c40bf990773224a9ac480e2415138c..3111a5d10716069fe8cabf4ae57228cf9b158bf8 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) { } } } + +func TestFDLimitUnderflow(t *testing.T) { + dials := 0 + + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) { + dials++ + + timeout := make(chan bool, 1) + go func() { + time.Sleep(time.Second * 5) + timeout <- true + }() + + select { + case <-ctx.Done(): + case <-timeout: + } + + return nil, fmt.Errorf("df timed out") + } + + l := newDialLimiterWithParams(df, 20, 3) + + var addrs []ma.Multiaddr + for i := 0; i <= 1000; i++ { + addrs = append(addrs, addrWithPort(t, i)) + } + + for i := 0; i < 1000; i++ { + go func(id peer.ID, i int) { + ctx, cancel := context.WithCancel(context.Background()) + + resp := make(chan dialResult) + l.AddDialJob(&dialJob{ + addr: addrs[i], + ctx: ctx, + peer: id, + resp: resp, + }) + + //cancel first 60 after 1s, next 60 after 2s + if i > 60 { + time.Sleep(time.Second * 1) + } + if i < 120 { + time.Sleep(time.Second * 1) + cancel() + return + } + defer cancel() + + for res := range resp { + if res.Err != nil { + return + } + t.Fatal("got dial res, shouldn't") + } + }(peer.ID(fmt.Sprintf("testpeer%d", i % 20)), i) + } + + time.Sleep(time.Second * 3) + + if l.fdConsuming < 0 { + t.Fatalf("l.fdConsuming < 0") + } +}