Commit b1df8043 authored by Jeromy's avatar Jeromy

refactor locking order structure

parent 108afdf5
...@@ -66,6 +66,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { ...@@ -66,6 +66,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock() dl.rllock.Lock()
defer dl.rllock.Unlock() defer dl.rllock.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
go dl.executeDial(next)
}
}
// release tokens in reverse order than we take them // release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]-- dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 { if dl.activePerPeer[dj.peer] == 0 {
...@@ -87,17 +101,6 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { ...@@ -87,17 +101,6 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
go dl.executeDial(next) go dl.executeDial(next)
} }
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
dl.fdConsuming++
// now, attempt to take the 'per peer limit' token
dl.schedulePerPeerDial(next)
}
}
} }
// AddDialJob tries to take the needed tokens for starting the given dial job. // AddDialJob tries to take the needed tokens for starting the given dial job.
...@@ -107,6 +110,13 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { ...@@ -107,6 +110,13 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock() dl.rllock.Lock()
defer dl.rllock.Unlock() defer dl.rllock.Unlock()
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
if addrutil.IsFDCostlyTransport(dj.addr) { if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit { if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj) dl.waitingOnFd = append(dl.waitingOnFd, dj)
...@@ -117,7 +127,20 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { ...@@ -117,7 +127,20 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.fdConsuming++ dl.fdConsuming++
} }
dl.schedulePerPeerDial(dj) // take second needed token and start dial!
go dl.executeDial(dj)
}
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}
// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
} }
// executeDial calls the dialFunc, and reports the result through the response // executeDial calls the dialFunc, and reports the result through the response
...@@ -135,15 +158,3 @@ func (dl *dialLimiter) executeDial(j *dialJob) { ...@@ -135,15 +158,3 @@ func (dl *dialLimiter) executeDial(j *dialJob) {
case <-j.ctx.Done(): case <-j.ctx.Done():
} }
} }
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}
// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
}
...@@ -3,6 +3,7 @@ package swarm ...@@ -3,6 +3,7 @@ package swarm
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"runtime"
"strconv" "strconv"
"testing" "testing"
"time" "time"
...@@ -262,6 +263,7 @@ func TestTokenRedistribution(t *testing.T) { ...@@ -262,6 +263,7 @@ func TestTokenRedistribution(t *testing.T) {
func TestStressLimiter(t *testing.T) { func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
fmt.Println("dial for peer: ", string(p))
if tcpPortOver(a, 1000) { if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil return conn.Conn(nil), nil
} else { } else {
...@@ -305,6 +307,8 @@ func TestStressLimiter(t *testing.T) { ...@@ -305,6 +307,8 @@ func TestStressLimiter(t *testing.T) {
}(peer.ID(fmt.Sprintf("testpeer%d", i))) }(peer.ID(fmt.Sprintf("testpeer%d", i)))
} }
time.Sleep(time.Millisecond * 1000)
fmt.Println("NUM GOROS: ", runtime.NumGoroutine())
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
select { select {
case <-success: case <-success:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment