dial_queue.go 10.5 KB
Newer Older
1 2 3 4 5
package dht

import (
	"context"
	"math"
6
	"sync"
7 8
	"time"

9
	"github.com/libp2p/go-libp2p-core/peer"
10
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
11 12
)

13
const (
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
	// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
	// be alive at any time.
	DefaultDialQueueMinParallelism = 6
	// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
	// be alive at any time.
	DefaultDialQueueMaxParallelism = 20
	// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
	// a worker pool downscaling.
	DefaultDialQueueMaxIdle = 5 * time.Second
	// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
	// scaling events, after one is processed. Its role is to reduce jitter.
	DefaultDialQueueScalingMutePeriod = 1 * time.Second
	// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
	// or divided when upscaling and downscaling events occur, respectively.
	DefaultDialQueueScalingFactor = 1.5
29
)
30 31

type dialQueue struct {
32
	*dqParams
33

34 35 36
	nWorkers  uint
	out       *queue.ChanQueue
	startOnce sync.Once
37

38
	waitingCh chan waitingCh
39 40 41 42 43
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
type dqParams struct {
	ctx    context.Context
	target string
	dialFn func(context.Context, peer.ID) error
	in     *queue.ChanQueue
	config dqConfig
}

type dqConfig struct {
	// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	minParallelism uint
	// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	maxParallelism uint
	// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
	// and downscaling events occur, respectively.
	scalingFactor float64
	// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
	// Its role is to reduce jitter.
	mutePeriod time.Duration
	// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
	return dqConfig{
		minParallelism: DefaultDialQueueMinParallelism,
		maxParallelism: DefaultDialQueueMaxParallelism,
		scalingFactor:  DefaultDialQueueScalingFactor,
		maxIdle:        DefaultDialQueueMaxIdle,
		mutePeriod:     DefaultDialQueueScalingMutePeriod,
	}
}

78 79 80 81 82
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

83 84 85 86
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
87 88 89 90 91
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
92
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
93 94 95
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
96 97 98
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
99
//
Raúl Kripalani's avatar
Raúl Kripalani committed
100 101
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
102 103
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
104
	dq := &dialQueue{
105 106
		dqParams:  params,
		out:       queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
107
		growCh:    make(chan struct{}, 1),
108
		shrinkCh:  make(chan struct{}, 1),
109
		waitingCh: make(chan waitingCh),
110
		dieCh:     make(chan struct{}, params.config.maxParallelism),
111
	}
112

113
	return dq, nil
114 115
}

116 117
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
118
	dq.startOnce.Do(func() {
119
		go dq.control()
120
	})
121 122
}

123 124
func (dq *dialQueue) control() {
	var (
125 126
		dialled        <-chan peer.ID
		waiting        []waitingCh
127 128
		lastScalingEvt = time.Now()
	)
129 130

	defer func() {
131
		for _, w := range waiting {
132 133
			close(w.ch)
		}
134
		waiting = nil
135 136
	}()

137 138 139 140 141 142 143 144 145 146
	// start workers

	tgt := int(dq.dqParams.config.minParallelism)
	for i := 0; i < tgt; i++ {
		go dq.worker()
	}
	dq.nWorkers = uint(tgt)

	// control workers

147 148 149 150 151 152
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
153 154 155
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
156
			continue // onto the top.
157 158 159 160 161
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
162
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
163 164 165 166 167 168 169
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
170
			continue // onto the top.
171 172 173 174 175 176 177
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
178 179 180 181 182 183 184 185
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
186
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
187 188 189 190 191 192 193
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
194
		case <-dq.growCh:
195
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
196 197 198 199 200
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
201
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
202 203 204 205 206 207 208 209
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

210
func (dq *dialQueue) Consume() <-chan peer.ID {
211 212 213
	ch := make(chan peer.ID, 1)

	select {
Raúl Kripalani's avatar
Raúl Kripalani committed
214 215 216 217 218
	case p, ok := <-dq.out.DeqChan:
		// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
		if ok {
			ch <- p
		}
219
		close(ch)
220 221 222 223 224
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
225 226 227 228 229 230 231 232 233 234 235
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
236 237
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
238 239 240
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
241
	}
242
	return ch
243 244 245
}

func (dq *dialQueue) grow() {
246
	// no mutex needed as this is only called from the (single-threaded) control loop.
247
	defer func(prev uint) {
248 249 250
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
251
		logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
252 253
	}(dq.nWorkers)

254
	if dq.nWorkers == dq.config.maxParallelism {
255 256
		return
	}
257 258 259 260
	// choosing not to worry about uint wrapping beyond max value.
	target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
	if target > dq.config.maxParallelism {
		target = dq.config.maxParallelism
261 262 263 264 265 266 267
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
268
	// no mutex needed as this is only called from the (single-threaded) control loop.
269
	defer func(prev uint) {
270 271 272
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
273
		logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
274 275
	}(dq.nWorkers)

276
	if dq.nWorkers == dq.config.minParallelism {
277 278
		return
	}
279 280 281
	target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
	if target < dq.config.minParallelism {
		target = dq.config.minParallelism
282 283 284 285 286 287
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
Matt Joiner's avatar
Matt Joiner committed
288
			logger.Debugf("too many die signals queued up.")
289 290 291 292 293 294 295
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
296
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
297
	defer idleTimer.Stop()
298 299 300 301 302 303 304 305 306 307
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

308 309 310 311
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
312 313 314 315 316
			// NOTE: There is a slight race here. We could be in the
			// middle of firing the timer and not read anything from the channel.
			//
			// However, that's not really a huge issue. We'll think
			// we're idle but that's fine.
317
		}
318
		idleTimer.Reset(dq.config.maxIdle)
319 320 321 322 323 324 325 326

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
327 328 329 330 331
		case p, ok := <-dq.in.DeqChan:
			if !ok {
				return
			}

332
			t := time.Now()
333
			if err := dq.dialFn(dq.ctx, p); err != nil {
Matt Joiner's avatar
Matt Joiner committed
334
				logger.Debugf("discarding dialled peer because of error: %v", err)
335 336
				continue
			}
Matt Joiner's avatar
Matt Joiner committed
337
			logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
338
			waiting := len(dq.waitingCh)
339 340 341 342 343 344 345 346

			// by the time we're done dialling, it's possible that the context is closed, in which case there will
			// be nobody listening on dq.out.EnqChan and we could block forever.
			select {
			case dq.out.EnqChan <- p:
			case <-dq.ctx.Done():
				return
			}
347 348 349 350 351 352 353 354 355 356 357 358 359 360
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}