Commit 03c910f9 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #755 from jbenet/fix-ratelimiter-use

ratelimiter: fixing rate limiter use
parents 8558838d 91a79bc2
......@@ -117,11 +117,10 @@ func (w *Worker) start(c Config) {
}
})
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
// reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers)
defer limiter.Close()
for {
select {
case <-proc.Closing():
......
......@@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
go func() {
// rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)
// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
default:
limiter.Go(func(worker process.Process) {
// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
case <-worker.Closing(): // our context was cancelled
break
default:
}
workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.LimitedGo(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.Go(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
})
}()
// wair fot the results.
......
......@@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) {
// hooks into your object that block you accidentally.
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
defer n.mu.Unlock()
if n.nots == nil { // so that zero-value is ready to be used.
return
}
if n.lim == nil { // no rate limit
go notify(notifiee)
} else {
notifiee := notifiee // rebind for data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
// no rate limiting.
if n.lim == nil {
for notifiee := range n.nots {
go notify(notifiee)
}
return
}
n.mu.Unlock()
// with rate limiting.
n.lim.Go(func(worker process.Process) {
for notifiee := range n.nots {
notifiee := notifiee // rebind for loop data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
})
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment