Commit ebcfcd46 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by Matt Joiner

make dial queue parameters configurable.

parent 1b1fb7e0
...@@ -2,6 +2,7 @@ package dht ...@@ -2,6 +2,7 @@ package dht
import ( import (
"context" "context"
"fmt"
"math" "math"
"time" "time"
...@@ -10,28 +11,28 @@ import ( ...@@ -10,28 +11,28 @@ import (
) )
const ( const (
// DialQueueMinParallelism is the minimum number of worker dial goroutines that will be alive at any time. // DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
DialQueueMinParallelism = 6 // be alive at any time.
// DialQueueMaxParallelism is the maximum number of worker dial goroutines that can be alive at any time. DefaultDialQueueMinParallelism = 6
DialQueueMaxParallelism = 20 // DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
// DialQueueMaxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling. // be alive at any time.
DialQueueMaxIdle = 5 * time.Second DefaultDialQueueMaxParallelism = 20
// DialQueueScalingMutePeriod is the amount of time to ignore further worker pool scaling events, after one is // DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
// processed. Its role is to reduce jitter. // a worker pool downscaling.
DialQueueScalingMutePeriod = 1 * time.Second DefaultDialQueueMaxIdle = 5 * time.Second
// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
// scaling events, after one is processed. Its role is to reduce jitter.
DefaultDialQueueScalingMutePeriod = 1 * time.Second
// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
// or divided when upscaling and downscaling events occur, respectively.
DefaultDialQueueScalingFactor = 1.5
) )
type dialQueue struct { type dialQueue struct {
ctx context.Context *dqParams
dialFn func(context.Context, peer.ID) error
nWorkers int
scalingFactor float64
scalingMutePeriod time.Duration
maxIdle time.Duration
in *queue.ChanQueue nWorkers uint
out *queue.ChanQueue out *queue.ChanQueue
waitingCh chan waitingCh waitingCh chan waitingCh
dieCh chan struct{} dieCh chan struct{}
...@@ -39,6 +40,51 @@ type dialQueue struct { ...@@ -39,6 +40,51 @@ type dialQueue struct {
shrinkCh chan struct{} shrinkCh chan struct{}
} }
type dqParams struct {
ctx context.Context
target string
dialFn func(context.Context, peer.ID) error
in *queue.ChanQueue
config dqConfig
}
type dqConfig struct {
// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
minParallelism uint
// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
maxParallelism uint
// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
// and downscaling events occur, respectively.
scalingFactor float64
// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
// Its role is to reduce jitter.
mutePeriod time.Duration
// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
maxIdle time.Duration
}
// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
return dqConfig{
minParallelism: DefaultDialQueueMinParallelism,
maxParallelism: DefaultDialQueueMaxParallelism,
scalingFactor: DefaultDialQueueScalingFactor,
maxIdle: DefaultDialQueueMaxIdle,
mutePeriod: DefaultDialQueueScalingMutePeriod,
}
}
func (dqc *dqConfig) validate() error {
if dqc.minParallelism > dqc.maxParallelism {
return fmt.Errorf("minParallelism must be below maxParallelism; actual values: min=%d, max=%d",
dqc.minParallelism, dqc.maxParallelism)
}
if dqc.scalingFactor < 1 {
return fmt.Errorf("scalingFactor must be >= 1; actual value: %f", dqc.scalingFactor)
}
return nil
}
type waitingCh struct { type waitingCh struct {
ch chan<- peer.ID ch chan<- peer.ID
ts time.Time ts time.Time
...@@ -52,7 +98,7 @@ type waitingCh struct { ...@@ -52,7 +98,7 @@ type waitingCh struct {
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake, // connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation. // and protocol negotiation.
// //
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of // We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers. // dialled peers.
// //
// The following events trigger scaling: // The following events trigger scaling:
...@@ -62,31 +108,23 @@ type waitingCh struct { ...@@ -62,31 +108,23 @@ type waitingCh struct {
// //
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and // Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency // end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism. // to config.maxParallelism.
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, func newDialQueue(params *dqParams) (*dialQueue, error) {
maxIdle, scalingMutePeriod time.Duration,
) *dialQueue {
sq := &dialQueue{ sq := &dialQueue{
ctx: ctx, dqParams: params,
dialFn: dialFn, nWorkers: params.config.minParallelism,
nWorkers: DialQueueMinParallelism, out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
scalingFactor: 1.5,
scalingMutePeriod: scalingMutePeriod,
maxIdle: maxIdle,
in: in,
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),
growCh: make(chan struct{}, 1), growCh: make(chan struct{}, 1),
shrinkCh: make(chan struct{}, 1), shrinkCh: make(chan struct{}, 1),
waitingCh: make(chan waitingCh), waitingCh: make(chan waitingCh),
dieCh: make(chan struct{}, DialQueueMaxParallelism), dieCh: make(chan struct{}, params.config.maxParallelism),
} }
for i := 0; i < DialQueueMinParallelism; i++ {
for i := 0; i < int(params.config.minParallelism); i++ {
go sq.worker() go sq.worker()
} }
go sq.control() go sq.control()
return sq return sq, nil
} }
func (dq *dialQueue) control() { func (dq *dialQueue) control() {
...@@ -151,13 +189,13 @@ func (dq *dialQueue) control() { ...@@ -151,13 +189,13 @@ func (dq *dialQueue) control() {
dialled = nil dialled = nil
} }
case <-dq.growCh: case <-dq.growCh:
if time.Since(lastScalingEvt) < dq.scalingMutePeriod { if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue continue
} }
dq.grow() dq.grow()
lastScalingEvt = time.Now() lastScalingEvt = time.Now()
case <-dq.shrinkCh: case <-dq.shrinkCh:
if time.Since(lastScalingEvt) < dq.scalingMutePeriod { if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue continue
} }
dq.shrink() dq.shrink()
...@@ -201,19 +239,20 @@ func (dq *dialQueue) Consume() <-chan peer.ID { ...@@ -201,19 +239,20 @@ func (dq *dialQueue) Consume() <-chan peer.ID {
func (dq *dialQueue) grow() { func (dq *dialQueue) grow() {
// no mutex needed as this is only called from the (single-threaded) control loop. // no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev int) { defer func(prev uint) {
if prev == dq.nWorkers { if prev == dq.nWorkers {
return return
} }
log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers) log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers) }(dq.nWorkers)
if dq.nWorkers == DialQueueMaxParallelism { if dq.nWorkers == dq.config.maxParallelism {
return return
} }
target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor)) // choosing not to worry about uint wrapping beyond max value.
if target > DialQueueMaxParallelism { target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
target = DialQueueMinParallelism if target > dq.config.maxParallelism {
target = dq.config.maxParallelism
} }
for ; dq.nWorkers < target; dq.nWorkers++ { for ; dq.nWorkers < target; dq.nWorkers++ {
go dq.worker() go dq.worker()
...@@ -222,19 +261,19 @@ func (dq *dialQueue) grow() { ...@@ -222,19 +261,19 @@ func (dq *dialQueue) grow() {
func (dq *dialQueue) shrink() { func (dq *dialQueue) shrink() {
// no mutex needed as this is only called from the (single-threaded) control loop. // no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev int) { defer func(prev uint) {
if prev == dq.nWorkers { if prev == dq.nWorkers {
return return
} }
log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers) log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers) }(dq.nWorkers)
if dq.nWorkers == DialQueueMinParallelism { if dq.nWorkers == dq.config.minParallelism {
return return
} }
target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor)) target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
if target < DialQueueMinParallelism { if target < dq.config.minParallelism {
target = DialQueueMinParallelism target = dq.config.minParallelism
} }
// send as many die signals as workers we have to prune. // send as many die signals as workers we have to prune.
for ; dq.nWorkers > target; dq.nWorkers-- { for ; dq.nWorkers > target; dq.nWorkers-- {
...@@ -265,7 +304,7 @@ func (dq *dialQueue) worker() { ...@@ -265,7 +304,7 @@ func (dq *dialQueue) worker() {
case <-idleTimer.C: case <-idleTimer.C:
default: default:
} }
idleTimer.Reset(dq.maxIdle) idleTimer.Reset(dq.config.maxIdle)
select { select {
case <-dq.dieCh: case <-dq.dieCh:
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
) )
func TestDialQueueGrowsOnSlowDials(t *testing.T) { func TestDialQueueGrowsOnSlowDials(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test")) in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{}) hang := make(chan struct{})
...@@ -29,7 +28,19 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) { ...@@ -29,7 +28,19 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
} }
// remove the mute period to grow faster. // remove the mute period to grow faster.
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0) config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
_ = dq.Consume() _ = dq.Consume()
...@@ -37,7 +48,7 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) { ...@@ -37,7 +48,7 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
} }
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
if atomic.LoadInt32(&cnt) > int32(DialQueueMinParallelism) { if atomic.LoadInt32(&cnt) > int32(DefaultDialQueueMinParallelism) {
return return
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
...@@ -61,7 +72,19 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) { ...@@ -61,7 +72,19 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
return nil return nil
} }
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0) config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed // acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable. // and immediately returnable.
...@@ -121,7 +144,19 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) { ...@@ -121,7 +144,19 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in.EnqChan <- peer.ID(i) in.EnqChan <- peer.ID(i)
} }
dq := newDialQueue(context.Background(), "test", in, dialFn, time.Second, 0) config := dqDefaultConfig()
config.maxIdle = 1 * time.Second
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
// keep up to speed with backlog by releasing the dial function every time we acquire a channel. // keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ { for i := 0; i < 13; i++ {
...@@ -162,7 +197,18 @@ func TestDialQueueMutePeriodHonored(t *testing.T) { ...@@ -162,7 +197,18 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
in.EnqChan <- peer.ID(i) in.EnqChan <- peer.ID(i)
} }
dq := newDialQueue(context.Background(), "test", in, dialFn, DialQueueMaxIdle, 2*time.Second) config := dqDefaultConfig()
config.mutePeriod = 2 * time.Second
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
// pick up three consumers. // pick up three consumers.
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
......
...@@ -103,7 +103,17 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner { ...@@ -103,7 +103,17 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: peersToQuery, peersToQuery: peersToQuery,
proc: proc, proc: proc,
} }
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, DialQueueMaxIdle, DialQueueScalingMutePeriod) dq, err := newDialQueue(&dqParams{
ctx: ctx,
target: q.key,
in: peersToQuery,
dialFn: r.dialPeer,
config: dqDefaultConfig(),
})
if err != nil {
panic(err)
}
r.peersDialed = dq
return r return r
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment