Commit f2df3ec5 authored by Raúl Kripalani's avatar Raúl Kripalani

fix shutdown logic; fix timer logic.

parent 72f9d4c2
......@@ -91,18 +91,23 @@ func (dq *dialQueue) control() {
waiting <-chan waitingCh
lastScalingEvt = time.Now()
)
defer func() {
// close channels.
if resp.ch != nil {
close(resp.ch)
}
close(dq.waitingCh)
for w := range dq.waitingCh {
close(w.ch)
}
}()
for {
// First process any backlog of dial jobs and waiters -- making progress is the priority.
// This block is copied below; couldn't find a more concise way of doing this.
select {
case <-dq.ctx.Done():
// close channels.
if resp.ch != nil {
close(resp.ch)
}
for w := range waiting {
close(w.ch)
}
return
case p = <-dialled:
dialled, waiting = nil, dq.waitingCh
......@@ -113,7 +118,8 @@ func (dq *dialQueue) control() {
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
continue // onto the top.
resp.ch = nil
continue // onto the top.
default:
// there's nothing to process, so proceed onto the main select block.
}
......@@ -129,6 +135,7 @@ func (dq *dialQueue) control() {
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
resp.ch = nil
case <-dq.growCh:
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
continue
......@@ -169,6 +176,10 @@ func (dq *dialQueue) Consume() <-chan peer.ID {
// park the channel until a dialled peer becomes available.
select {
case <-dq.ctx.Done():
// return a closed channel with no value if we're done.
close(ch)
return ch
case dq.waitingCh <- waitingCh{ch, time.Now()}:
default:
panic("detected more consuming goroutines than declared upfront")
......@@ -226,8 +237,7 @@ func (dq *dialQueue) shrink() {
func (dq *dialQueue) worker() {
// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
// it means that the DHT query is progressing slow and we should shrink the worker pool.
idleTimer := time.NewTimer(0)
idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
for {
// trap exit signals first.
select {
......@@ -238,11 +248,10 @@ func (dq *dialQueue) worker() {
default:
}
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
idleTimer.Stop()
select {
case <-idleTimer.C:
default:
}
idleTimer.Reset(DialQueueMaxIdle)
......
......@@ -206,9 +206,9 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
case <-r.rateLimit:
ch := r.peersDialed.Consume()
select {
case p, _ := <-ch:
if p == "" {
// peer is nil; this signals context cancellation.
case p, ok := <-ch:
if !ok {
// this signals context cancellation.
return
}
// do it as a child func to make sure Run exits
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment