dial_queue.go 10 KB
Newer Older
1 2 3 4
package dht

import (
	"context"
5
	"fmt"
6 7 8
	"math"
	"time"

9 10
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
11 12
)

13
const (
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
	// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
	// be alive at any time.
	DefaultDialQueueMinParallelism = 6
	// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
	// be alive at any time.
	DefaultDialQueueMaxParallelism = 20
	// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
	// a worker pool downscaling.
	DefaultDialQueueMaxIdle = 5 * time.Second
	// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
	// scaling events, after one is processed. Its role is to reduce jitter.
	DefaultDialQueueScalingMutePeriod = 1 * time.Second
	// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
	// or divided when upscaling and downscaling events occur, respectively.
	DefaultDialQueueScalingFactor = 1.5
29
)
30 31

type dialQueue struct {
32
	*dqParams
33

34 35
	nWorkers uint
	out      *queue.ChanQueue
36

37
	waitingCh chan waitingCh
38 39 40 41 42
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
type dqParams struct {
	ctx    context.Context
	target string
	dialFn func(context.Context, peer.ID) error
	in     *queue.ChanQueue
	config dqConfig
}

type dqConfig struct {
	// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	minParallelism uint
	// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	maxParallelism uint
	// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
	// and downscaling events occur, respectively.
	scalingFactor float64
	// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
	// Its role is to reduce jitter.
	mutePeriod time.Duration
	// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
	return dqConfig{
		minParallelism: DefaultDialQueueMinParallelism,
		maxParallelism: DefaultDialQueueMaxParallelism,
		scalingFactor:  DefaultDialQueueScalingFactor,
		maxIdle:        DefaultDialQueueMaxIdle,
		mutePeriod:     DefaultDialQueueScalingMutePeriod,
	}
}

func (dqc *dqConfig) validate() error {
	if dqc.minParallelism > dqc.maxParallelism {
		return fmt.Errorf("minParallelism must be below maxParallelism; actual values: min=%d, max=%d",
			dqc.minParallelism, dqc.maxParallelism)
	}
	if dqc.scalingFactor < 1 {
		return fmt.Errorf("scalingFactor must be >= 1; actual value: %f", dqc.scalingFactor)
	}
	return nil
}

88 89 90 91 92
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

93
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
Raúl Kripalani's avatar
Raúl Kripalani committed
94 95
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
96 97 98 99 100
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
101
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
102 103 104
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
105 106 107
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
108
//
Raúl Kripalani's avatar
Raúl Kripalani committed
109 110
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
111 112
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
113
	sq := &dialQueue{
114 115 116
		dqParams:  params,
		nWorkers:  params.config.minParallelism,
		out:       queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
117
		growCh:    make(chan struct{}, 1),
118
		shrinkCh:  make(chan struct{}, 1),
119
		waitingCh: make(chan waitingCh),
120
		dieCh:     make(chan struct{}, params.config.maxParallelism),
121
	}
122 123

	for i := 0; i < int(params.config.minParallelism); i++ {
124 125 126
		go sq.worker()
	}
	go sq.control()
127
	return sq, nil
128 129 130 131
}

func (dq *dialQueue) control() {
	var (
132 133
		dialled        <-chan peer.ID
		waiting        []waitingCh
134 135
		lastScalingEvt = time.Now()
	)
136 137

	defer func() {
138
		for _, w := range waiting {
139 140
			close(w.ch)
		}
141
		waiting = nil
142 143
	}()

144 145 146 147 148 149
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
150 151 152
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
153
			continue // onto the top.
154 155 156 157 158 159 160 161 162 163 164 165 166
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
167
			continue // onto the top.
168 169 170 171 172 173 174
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
191
		case <-dq.growCh:
192
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
193 194 195 196 197
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
198
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
199 200 201 202 203 204 205 206
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

207
func (dq *dialQueue) Consume() <-chan peer.ID {
208 209 210 211
	ch := make(chan peer.ID, 1)

	select {
	case p := <-dq.out.DeqChan:
212
		// short circuit and return a dialled peer if it's immediately available.
213 214
		ch <- p
		close(ch)
215 216 217 218 219
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
220 221 222 223 224 225 226 227 228 229 230
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
231 232
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
233 234 235
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
236
	}
237
	return ch
238 239 240
}

func (dq *dialQueue) grow() {
241
	// no mutex needed as this is only called from the (single-threaded) control loop.
242
	defer func(prev uint) {
243 244 245 246 247 248
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

249
	if dq.nWorkers == dq.config.maxParallelism {
250 251
		return
	}
252 253 254 255
	// choosing not to worry about uint wrapping beyond max value.
	target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
	if target > dq.config.maxParallelism {
		target = dq.config.maxParallelism
256 257 258 259 260 261 262
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
263
	// no mutex needed as this is only called from the (single-threaded) control loop.
264
	defer func(prev uint) {
265 266 267 268 269 270
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

271
	if dq.nWorkers == dq.config.minParallelism {
272 273
		return
	}
274 275 276
	target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
	if target < dq.config.minParallelism {
		target = dq.config.minParallelism
277 278 279 280 281 282 283 284 285 286 287 288 289 290
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
			log.Debugf("too many die signals queued up.")
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
291
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
292 293 294 295 296 297 298 299 300 301
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

302 303 304 305
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
306
		}
307
		idleTimer.Reset(dq.config.maxIdle)
308 309 310 311 312 313 314 315 316

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
		case p := <-dq.in.DeqChan:
317
			t := time.Now()
318 319 320 321
			if err := dq.dialFn(dq.ctx, p); err != nil {
				log.Debugf("discarding dialled peer because of error: %v", err)
				continue
			}
322
			log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
			waiting := len(dq.waitingCh)
			dq.out.EnqChan <- p
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}