Commit 988ab6cb authored by Łukasz Magiera's avatar Łukasz Magiera

limiter: cleanup the code

parent c1461238
......@@ -34,12 +34,13 @@ func (dj *dialJob) cancelled() bool {
}
type dialLimiter struct {
rllock sync.Mutex
lk sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
dialFunc dialfunc
activePerPeer map[peer.ID]int
perPeerLimit int
......@@ -62,26 +63,26 @@ func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimit
}
}
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd[0] = nil // clear out memory
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
go dl.executeDial(next)
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd[0] = nil // clear out memory
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
// we already have activePerPeer token at this point so we can just dial
go dl.executeDial(next)
}
}
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
......@@ -91,45 +92,31 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
delete(dl.waitingOnPeerLimit, next.peer)
} else {
waitlist[0] = nil // clear out memory
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
dl.waitingOnPeerLimit[next.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token
if addrutil.IsFDCostlyTransport(next.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, next)
return
}
// take token
dl.fdConsuming++
}
dl.activePerPeer[next.peer]++ // just kidding, we still want this token
// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
dl.addCheckFdLimit(next)
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.freeFDToken()
}
dl.activePerPeer[dj.peer]++
dl.freePeerToken(dj)
}
func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
......@@ -140,15 +127,35 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.fdConsuming++
}
// take second needed token and start dial!
go dl.executeDial(dj)
}
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
dl.addCheckFdLimit(dj)
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()
dl.addCheckPeerLimit(dj)
}
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
dl.lk.Lock()
defer dl.lk.Unlock()
delete(dl.waitingOnPeerLimit, p)
// NB: the waitingOnFd list doesnt need to be cleaned out here, we will
// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
// remove them as we encounter them because they are 'cancelled' at this
// point
}
......
......@@ -378,9 +378,9 @@ func TestFDLimitUnderflow(t *testing.T) {
time.Sleep(time.Second * 3)
l.rllock.Lock()
l.lk.Lock()
fdConsuming := l.fdConsuming
l.rllock.Unlock()
l.lk.Unlock()
if fdConsuming < 0 {
t.Fatalf("l.fdConsuming < 0")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment