From 4a908bfe2e9c9867095927527ba7ef712a8a28d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 Jul 2017 23:25:59 +0200 Subject: [PATCH] Fix dialLimiter.fdConsuming counting --- .travis.yml | 2 +- limiter.go | 19 +++++++++++--- limiter_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 448d690..96b2d5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ os: language: go go: - - 1.7 + - 1.8 install: true diff --git a/limiter.go b/limiter.go index 67c6af4..f2513d8 100644 --- a/limiter.go +++ b/limiter.go @@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter { return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) } -func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { +func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter { return &dialLimiter{ - fdLimit: fdl, - perPeerLimit: ppl, + fdLimit: fdLimit, + perPeerLimit: perPeerLimit, waitingOnPeerLimit: make(map[peer.ID][]*dialJob), activePerPeer: make(map[peer.ID]int), dialFunc: df, @@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { if addrutil.IsFDCostlyTransport(dj.addr) { dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { next := dl.waitingOnFd[0] dl.waitingOnFd = dl.waitingOnFd[1:] @@ -89,6 +90,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { waitlist := dl.waitingOnPeerLimit[dj.peer] if !dj.success && len(waitlist) > 0 { next := waitlist[0] + if len(waitlist) == 1 { delete(dl.waitingOnPeerLimit, dj.peer) } else { @@ -96,11 +98,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { } dl.activePerPeer[dj.peer]++ // just kidding, we still want this token + if addrutil.IsFDCostlyTransport(next.addr) { + if dl.fdConsuming >= dl.fdLimit { + dl.waitingOnFd = append(dl.waitingOnFd, next) + return + } + + // take token + dl.fdConsuming++ + } + // can kick this off right here, dials in this list already // have the other tokens needed go dl.executeDial(next) } - } // AddDialJob tries to take the needed tokens for starting the given dial job. diff --git a/limiter_test.go b/limiter_test.go index cbb6afd..3111a5d 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) { } } } + +func TestFDLimitUnderflow(t *testing.T) { + dials := 0 + + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) { + dials++ + + timeout := make(chan bool, 1) + go func() { + time.Sleep(time.Second * 5) + timeout <- true + }() + + select { + case <-ctx.Done(): + case <-timeout: + } + + return nil, fmt.Errorf("df timed out") + } + + l := newDialLimiterWithParams(df, 20, 3) + + var addrs []ma.Multiaddr + for i := 0; i <= 1000; i++ { + addrs = append(addrs, addrWithPort(t, i)) + } + + for i := 0; i < 1000; i++ { + go func(id peer.ID, i int) { + ctx, cancel := context.WithCancel(context.Background()) + + resp := make(chan dialResult) + l.AddDialJob(&dialJob{ + addr: addrs[i], + ctx: ctx, + peer: id, + resp: resp, + }) + + //cancel first 60 after 1s, next 60 after 2s + if i > 60 { + time.Sleep(time.Second * 1) + } + if i < 120 { + time.Sleep(time.Second * 1) + cancel() + return + } + defer cancel() + + for res := range resp { + if res.Err != nil { + return + } + t.Fatal("got dial res, shouldn't") + } + }(peer.ID(fmt.Sprintf("testpeer%d", i % 20)), i) + } + + time.Sleep(time.Second * 3) + + if l.fdConsuming < 0 { + t.Fatalf("l.fdConsuming < 0") + } +} -- GitLab