From 988ab6cb4bf3748fac0c4909a08240982d1e43c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 12 Jun 2018 00:30:55 +0200 Subject: [PATCH] limiter: cleanup the code --- limiter.go | 107 ++++++++++++++++++++++++++---------------------- limiter_test.go | 4 +- 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/limiter.go b/limiter.go index 3d3437b..1d9ff1a 100644 --- a/limiter.go +++ b/limiter.go @@ -34,12 +34,13 @@ func (dj *dialJob) cancelled() bool { } type dialLimiter struct { - rllock sync.Mutex + lk sync.Mutex + fdConsuming int fdLimit int waitingOnFd []*dialJob - dialFunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error) + dialFunc dialfunc activePerPeer map[peer.ID]int perPeerLimit int @@ -62,26 +63,26 @@ func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimit } } -func (dl *dialLimiter) finishedDial(dj *dialJob) { - dl.rllock.Lock() - defer dl.rllock.Unlock() - - if addrutil.IsFDCostlyTransport(dj.addr) { - dl.fdConsuming-- - - if len(dl.waitingOnFd) > 0 { - next := dl.waitingOnFd[0] - dl.waitingOnFd[0] = nil // clear out memory - dl.waitingOnFd = dl.waitingOnFd[1:] - if len(dl.waitingOnFd) == 0 { - dl.waitingOnFd = nil // clear out memory - } - dl.fdConsuming++ - - go dl.executeDial(next) +// freeFDToken frees FD token and if there are any schedules another waiting dialJob +// in it's place +func (dl *dialLimiter) freeFDToken() { + dl.fdConsuming-- + + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd[0] = nil // clear out memory + dl.waitingOnFd = dl.waitingOnFd[1:] + if len(dl.waitingOnFd) == 0 { + dl.waitingOnFd = nil // clear out memory } + dl.fdConsuming++ + + // we already have activePerPeer token at this point so we can just dial + go dl.executeDial(next) } +} +func (dl *dialLimiter) freePeerToken(dj *dialJob) { // release tokens in reverse order than we take them dl.activePerPeer[dj.peer]-- if dl.activePerPeer[dj.peer] == 0 { @@ -91,45 +92,31 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { waitlist := dl.waitingOnPeerLimit[dj.peer] if !dj.success && len(waitlist) > 0 { next := waitlist[0] - if len(waitlist) == 1 { - delete(dl.waitingOnPeerLimit, dj.peer) + delete(dl.waitingOnPeerLimit, next.peer) } else { waitlist[0] = nil // clear out memory - dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] + dl.waitingOnPeerLimit[next.peer] = waitlist[1:] } - dl.activePerPeer[dj.peer]++ // just kidding, we still want this token - if addrutil.IsFDCostlyTransport(next.addr) { - if dl.fdConsuming >= dl.fdLimit { - dl.waitingOnFd = append(dl.waitingOnFd, next) - return - } - - // take token - dl.fdConsuming++ - } + dl.activePerPeer[next.peer]++ // just kidding, we still want this token - // can kick this off right here, dials in this list already - // have the other tokens needed - go dl.executeDial(next) + dl.addCheckFdLimit(next) } } -// AddDialJob tries to take the needed tokens for starting the given dial job. -// If it acquires all needed tokens, it immediately starts the dial, otherwise -// it will put it on the waitlist for the requested token. -func (dl *dialLimiter) AddDialJob(dj *dialJob) { - dl.rllock.Lock() - defer dl.rllock.Unlock() +func (dl *dialLimiter) finishedDial(dj *dialJob) { + dl.lk.Lock() + defer dl.lk.Unlock() - if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { - wlist := dl.waitingOnPeerLimit[dj.peer] - dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) - return + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.freeFDToken() } - dl.activePerPeer[dj.peer]++ + dl.freePeerToken(dj) +} + +func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) { if addrutil.IsFDCostlyTransport(dj.addr) { if dl.fdConsuming >= dl.fdLimit { dl.waitingOnFd = append(dl.waitingOnFd, dj) @@ -140,15 +127,35 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.fdConsuming++ } - // take second needed token and start dial! go dl.executeDial(dj) } +func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) { + if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[dj.peer] + dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) + return + } + dl.activePerPeer[dj.peer]++ + + dl.addCheckFdLimit(dj) +} + +// AddDialJob tries to take the needed tokens for starting the given dial job. +// If it acquires all needed tokens, it immediately starts the dial, otherwise +// it will put it on the waitlist for the requested token. +func (dl *dialLimiter) AddDialJob(dj *dialJob) { + dl.lk.Lock() + defer dl.lk.Unlock() + + dl.addCheckPeerLimit(dj) +} + func (dl *dialLimiter) clearAllPeerDials(p peer.ID) { - dl.rllock.Lock() - defer dl.rllock.Unlock() + dl.lk.Lock() + defer dl.lk.Unlock() delete(dl.waitingOnPeerLimit, p) - // NB: the waitingOnFd list doesnt need to be cleaned out here, we will + // NB: the waitingOnFd list doesn't need to be cleaned out here, we will // remove them as we encounter them because they are 'cancelled' at this // point } diff --git a/limiter_test.go b/limiter_test.go index 6ae3935..4338cad 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -378,9 +378,9 @@ func TestFDLimitUnderflow(t *testing.T) { time.Sleep(time.Second * 3) - l.rllock.Lock() + l.lk.Lock() fdConsuming := l.fdConsuming - l.rllock.Unlock() + l.lk.Unlock() if fdConsuming < 0 { t.Fatalf("l.fdConsuming < 0") -- GitLab