dial_queue.go 10.6 KB
Newer Older
1 2 3 4
package dht

import (
	"context"
5
	"fmt"
6
	"math"
7
	"sync/atomic"
8 9
	"time"

10 11
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
12 13
)

14
const (
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
	// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
	// be alive at any time.
	DefaultDialQueueMinParallelism = 6
	// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
	// be alive at any time.
	DefaultDialQueueMaxParallelism = 20
	// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
	// a worker pool downscaling.
	DefaultDialQueueMaxIdle = 5 * time.Second
	// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
	// scaling events, after one is processed. Its role is to reduce jitter.
	DefaultDialQueueScalingMutePeriod = 1 * time.Second
	// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
	// or divided when upscaling and downscaling events occur, respectively.
	DefaultDialQueueScalingFactor = 1.5
30
)
31 32

type dialQueue struct {
33
	*dqParams
34

35 36
	nWorkers uint
	out      *queue.ChanQueue
37
	started  int32
38

39
	waitingCh chan waitingCh
40 41 42 43 44
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
type dqParams struct {
	ctx    context.Context
	target string
	dialFn func(context.Context, peer.ID) error
	in     *queue.ChanQueue
	config dqConfig
}

type dqConfig struct {
	// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	minParallelism uint
	// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	maxParallelism uint
	// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
	// and downscaling events occur, respectively.
	scalingFactor float64
	// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
	// Its role is to reduce jitter.
	mutePeriod time.Duration
	// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
	return dqConfig{
		minParallelism: DefaultDialQueueMinParallelism,
		maxParallelism: DefaultDialQueueMaxParallelism,
		scalingFactor:  DefaultDialQueueScalingFactor,
		maxIdle:        DefaultDialQueueMaxIdle,
		mutePeriod:     DefaultDialQueueScalingMutePeriod,
	}
}

func (dqc *dqConfig) validate() error {
	if dqc.minParallelism > dqc.maxParallelism {
		return fmt.Errorf("minParallelism must be below maxParallelism; actual values: min=%d, max=%d",
			dqc.minParallelism, dqc.maxParallelism)
	}
	if dqc.scalingFactor < 1 {
		return fmt.Errorf("scalingFactor must be >= 1; actual value: %f", dqc.scalingFactor)
	}
	return nil
}

90 91 92 93 94
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

95 96 97 98
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
99 100 101 102 103
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
104
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
105 106 107
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
108 109 110
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
111
//
Raúl Kripalani's avatar
Raúl Kripalani committed
112 113
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
114 115
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
116
	dq := &dialQueue{
117 118
		dqParams:  params,
		out:       queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
119
		growCh:    make(chan struct{}, 1),
120
		shrinkCh:  make(chan struct{}, 1),
121
		waitingCh: make(chan waitingCh),
122
		dieCh:     make(chan struct{}, params.config.maxParallelism),
123
	}
124

125 126
	go dq.control()
	return dq, nil
127 128
}

129 130 131 132 133 134 135 136 137 138 139 140
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
	if !atomic.CompareAndSwapInt32(&dq.started, 0, 1) {
		return
	}
	tgt := int(dq.dqParams.config.minParallelism)
	for i := 0; i < tgt; i++ {
		go dq.worker()
	}
	dq.nWorkers = uint(tgt)
}

141 142
func (dq *dialQueue) control() {
	var (
143 144
		dialled        <-chan peer.ID
		waiting        []waitingCh
145 146
		lastScalingEvt = time.Now()
	)
147 148

	defer func() {
149
		for _, w := range waiting {
150 151
			close(w.ch)
		}
152
		waiting = nil
153 154
	}()

155 156 157 158 159 160
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
161 162 163
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
164
			continue // onto the top.
165 166 167 168 169
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
170
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
171 172 173 174 175 176 177
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
178
			continue // onto the top.
179 180 181 182 183 184 185
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
186 187 188 189 190 191 192 193
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
194
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
195 196 197 198 199 200 201
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
202
		case <-dq.growCh:
203
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
204 205 206 207 208
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
209
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
210 211 212 213 214 215 216 217
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

218
func (dq *dialQueue) Consume() <-chan peer.ID {
219 220 221
	ch := make(chan peer.ID, 1)

	select {
Raúl Kripalani's avatar
Raúl Kripalani committed
222 223 224 225 226
	case p, ok := <-dq.out.DeqChan:
		// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
		if ok {
			ch <- p
		}
227
		close(ch)
228 229 230 231 232
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
233 234 235 236 237 238 239 240 241 242 243
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
244 245
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
246 247 248
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
249
	}
250
	return ch
251 252 253
}

func (dq *dialQueue) grow() {
254
	// no mutex needed as this is only called from the (single-threaded) control loop.
255
	defer func(prev uint) {
256 257 258
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
259
		logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
260 261
	}(dq.nWorkers)

262
	if dq.nWorkers == dq.config.maxParallelism {
263 264
		return
	}
265 266 267 268
	// choosing not to worry about uint wrapping beyond max value.
	target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
	if target > dq.config.maxParallelism {
		target = dq.config.maxParallelism
269 270 271 272 273 274 275
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
276
	// no mutex needed as this is only called from the (single-threaded) control loop.
277
	defer func(prev uint) {
278 279 280
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
281
		logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
282 283
	}(dq.nWorkers)

284
	if dq.nWorkers == dq.config.minParallelism {
285 286
		return
	}
287 288 289
	target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
	if target < dq.config.minParallelism {
		target = dq.config.minParallelism
290 291 292 293 294 295
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
Matt Joiner's avatar
Matt Joiner committed
296
			logger.Debugf("too many die signals queued up.")
297 298 299 300 301 302 303
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
304
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
305 306 307 308 309 310 311 312 313 314
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

315 316 317 318
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
319
		}
320
		idleTimer.Reset(dq.config.maxIdle)
321 322 323 324 325 326 327 328

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
329 330 331 332 333
		case p, ok := <-dq.in.DeqChan:
			if !ok {
				return
			}

334
			t := time.Now()
335
			if err := dq.dialFn(dq.ctx, p); err != nil {
Matt Joiner's avatar
Matt Joiner committed
336
				logger.Debugf("discarding dialled peer because of error: %v", err)
337 338
				continue
			}
Matt Joiner's avatar
Matt Joiner committed
339
			logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
340
			waiting := len(dq.waitingCh)
341 342 343 344 345 346 347 348

			// by the time we're done dialling, it's possible that the context is closed, in which case there will
			// be nobody listening on dq.out.EnqChan and we could block forever.
			select {
			case dq.out.EnqChan <- p:
			case <-dq.ctx.Done():
				return
			}
349 350 351 352 353 354 355 356 357 358 359 360 361 362
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}