From b1df8043fd8e5ce004b67cb06240cebd5e7bae21 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 13 May 2016 09:28:52 -0700 Subject: [PATCH] refactor locking order structure --- limiter.go | 59 +++++++++++++++++++++++++++++-------------------- limiter_test.go | 4 ++++ 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/limiter.go b/limiter.go index 7835fe5..94ce05b 100644 --- a/limiter.go +++ b/limiter.go @@ -66,6 +66,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { dl.rllock.Lock() defer dl.rllock.Unlock() + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd = dl.waitingOnFd[1:] + if len(dl.waitingOnFd) == 0 { + dl.waitingOnFd = nil // clear out memory + } + dl.fdConsuming++ + + go dl.executeDial(next) + } + } + // release tokens in reverse order than we take them dl.activePerPeer[dj.peer]-- if dl.activePerPeer[dj.peer] == 0 { @@ -87,17 +101,6 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { go dl.executeDial(next) } - if addrutil.IsFDCostlyTransport(dj.addr) { - dl.fdConsuming-- - if len(dl.waitingOnFd) > 0 { - next := dl.waitingOnFd[0] - dl.waitingOnFd = dl.waitingOnFd[1:] - dl.fdConsuming++ - - // now, attempt to take the 'per peer limit' token - dl.schedulePerPeerDial(next) - } - } } // AddDialJob tries to take the needed tokens for starting the given dial job. @@ -107,6 +110,13 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.rllock.Lock() defer dl.rllock.Unlock() + if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[dj.peer] + dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) + return + } + dl.activePerPeer[dj.peer]++ + if addrutil.IsFDCostlyTransport(dj.addr) { if dl.fdConsuming >= dl.fdLimit { dl.waitingOnFd = append(dl.waitingOnFd, dj) @@ -117,7 +127,20 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.fdConsuming++ } - dl.schedulePerPeerDial(dj) + // take second needed token and start dial! + go dl.executeDial(dj) +} + +func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { + if dl.activePerPeer[j.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[j.peer] + dl.waitingOnPeerLimit[j.peer] = append(wlist, j) + return + } + + // take second needed token and start dial! + dl.activePerPeer[j.peer]++ + go dl.executeDial(j) } // executeDial calls the dialFunc, and reports the result through the response @@ -135,15 +158,3 @@ func (dl *dialLimiter) executeDial(j *dialJob) { case <-j.ctx.Done(): } } - -func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { - if dl.activePerPeer[j.peer] >= dl.perPeerLimit { - wlist := dl.waitingOnPeerLimit[j.peer] - dl.waitingOnPeerLimit[j.peer] = append(wlist, j) - return - } - - // take second needed token and start dial! - dl.activePerPeer[j.peer]++ - go dl.executeDial(j) -} diff --git a/limiter_test.go b/limiter_test.go index 28733c5..fb1be19 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -3,6 +3,7 @@ package swarm import ( "fmt" "math/rand" + "runtime" "strconv" "testing" "time" @@ -262,6 +263,7 @@ func TestTokenRedistribution(t *testing.T) { func TestStressLimiter(t *testing.T) { df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + fmt.Println("dial for peer: ", string(p)) if tcpPortOver(a, 1000) { return conn.Conn(nil), nil } else { @@ -305,6 +307,8 @@ func TestStressLimiter(t *testing.T) { }(peer.ID(fmt.Sprintf("testpeer%d", i))) } + time.Sleep(time.Millisecond * 1000) + fmt.Println("NUM GOROS: ", runtime.NumGoroutine()) for i := 0; i < 20; i++ { select { case <-success: -- GitLab