dial_queue.go 10.2 KB
Newer Older
1 2 3 4 5
package dht

import (
	"context"
	"math"
6
	"sync"
7 8
	"time"

9 10
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
11 12
)

13
const (
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
	// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
	// be alive at any time.
	DefaultDialQueueMinParallelism = 6
	// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
	// be alive at any time.
	DefaultDialQueueMaxParallelism = 20
	// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
	// a worker pool downscaling.
	DefaultDialQueueMaxIdle = 5 * time.Second
	// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
	// scaling events, after one is processed. Its role is to reduce jitter.
	DefaultDialQueueScalingMutePeriod = 1 * time.Second
	// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
	// or divided when upscaling and downscaling events occur, respectively.
	DefaultDialQueueScalingFactor = 1.5
29
)
30 31

type dialQueue struct {
32
	*dqParams
33

34 35 36
	nWorkers  uint
	out       *queue.ChanQueue
	startOnce sync.Once
37

38
	waitingCh chan waitingCh
39 40 41 42 43
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
type dqParams struct {
	ctx    context.Context
	target string
	dialFn func(context.Context, peer.ID) error
	in     *queue.ChanQueue
	config dqConfig
}

type dqConfig struct {
	// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	minParallelism uint
	// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	maxParallelism uint
	// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
	// and downscaling events occur, respectively.
	scalingFactor float64
	// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
	// Its role is to reduce jitter.
	mutePeriod time.Duration
	// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
	return dqConfig{
		minParallelism: DefaultDialQueueMinParallelism,
		maxParallelism: DefaultDialQueueMaxParallelism,
		scalingFactor:  DefaultDialQueueScalingFactor,
		maxIdle:        DefaultDialQueueMaxIdle,
		mutePeriod:     DefaultDialQueueScalingMutePeriod,
	}
}

78 79 80 81 82
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

83 84 85 86
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
87 88 89 90 91
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
92
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
93 94 95
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
96 97 98
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
99
//
Raúl Kripalani's avatar
Raúl Kripalani committed
100 101
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
102 103
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
104
	dq := &dialQueue{
105 106
		dqParams:  params,
		out:       queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
107
		growCh:    make(chan struct{}, 1),
108
		shrinkCh:  make(chan struct{}, 1),
109
		waitingCh: make(chan waitingCh),
110
		dieCh:     make(chan struct{}, params.config.maxParallelism),
111
	}
112

113 114
	go dq.control()
	return dq, nil
115 116
}

117 118
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
119 120 121 122 123 124 125
	dq.startOnce.Do(func() {
		tgt := int(dq.dqParams.config.minParallelism)
		for i := 0; i < tgt; i++ {
			go dq.worker()
		}
		dq.nWorkers = uint(tgt)
	})
126 127
}

128 129
func (dq *dialQueue) control() {
	var (
130 131
		dialled        <-chan peer.ID
		waiting        []waitingCh
132 133
		lastScalingEvt = time.Now()
	)
134 135

	defer func() {
136
		for _, w := range waiting {
137 138
			close(w.ch)
		}
139
		waiting = nil
140 141
	}()

142 143 144 145 146 147
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
148 149 150
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
151
			continue // onto the top.
152 153 154 155 156
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
157
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
158 159 160 161 162 163 164
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
165
			continue // onto the top.
166 167 168 169 170 171 172
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
173 174 175 176 177 178 179 180
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
181
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
182 183 184 185 186 187 188
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
189
		case <-dq.growCh:
190
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
191 192 193 194 195
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
196
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
197 198 199 200 201 202 203 204
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

205
func (dq *dialQueue) Consume() <-chan peer.ID {
206 207 208
	ch := make(chan peer.ID, 1)

	select {
Raúl Kripalani's avatar
Raúl Kripalani committed
209 210 211 212 213
	case p, ok := <-dq.out.DeqChan:
		// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
		if ok {
			ch <- p
		}
214
		close(ch)
215 216 217 218 219
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
220 221 222 223 224 225 226 227 228 229 230
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
231 232
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
233 234 235
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
236
	}
237
	return ch
238 239 240
}

func (dq *dialQueue) grow() {
241
	// no mutex needed as this is only called from the (single-threaded) control loop.
242
	defer func(prev uint) {
243 244 245
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
246
		logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
247 248
	}(dq.nWorkers)

249
	if dq.nWorkers == dq.config.maxParallelism {
250 251
		return
	}
252 253 254 255
	// choosing not to worry about uint wrapping beyond max value.
	target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
	if target > dq.config.maxParallelism {
		target = dq.config.maxParallelism
256 257 258 259 260 261 262
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
263
	// no mutex needed as this is only called from the (single-threaded) control loop.
264
	defer func(prev uint) {
265 266 267
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
268
		logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
269 270
	}(dq.nWorkers)

271
	if dq.nWorkers == dq.config.minParallelism {
272 273
		return
	}
274 275 276
	target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
	if target < dq.config.minParallelism {
		target = dq.config.minParallelism
277 278 279 280 281 282
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
Matt Joiner's avatar
Matt Joiner committed
283
			logger.Debugf("too many die signals queued up.")
284 285 286 287 288 289 290
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
291
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
292 293 294 295 296 297 298 299 300 301
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

302 303 304 305
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
306
		}
307
		idleTimer.Reset(dq.config.maxIdle)
308 309 310 311 312 313 314 315

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
316 317 318 319 320
		case p, ok := <-dq.in.DeqChan:
			if !ok {
				return
			}

321
			t := time.Now()
322
			if err := dq.dialFn(dq.ctx, p); err != nil {
Matt Joiner's avatar
Matt Joiner committed
323
				logger.Debugf("discarding dialled peer because of error: %v", err)
324 325
				continue
			}
Matt Joiner's avatar
Matt Joiner committed
326
			logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
327
			waiting := len(dq.waitingCh)
328 329 330 331 332 333 334 335

			// by the time we're done dialling, it's possible that the context is closed, in which case there will
			// be nobody listening on dq.out.EnqChan and we could block forever.
			select {
			case dq.out.EnqChan <- p:
			case <-dq.ctx.Done():
				return
			}
336 337 338 339 340 341 342 343 344 345 346 347 348 349
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}