dial_queue.go 10.6 KB
Newer Older
1 2 3 4
package dht

import (
	"context"
5
	"fmt"
6
	"math"
7
	"sync"
8 9
	"time"

10 11
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
12 13
)

14
const (
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
	// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
	// be alive at any time.
	DefaultDialQueueMinParallelism = 6
	// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
	// be alive at any time.
	DefaultDialQueueMaxParallelism = 20
	// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
	// a worker pool downscaling.
	DefaultDialQueueMaxIdle = 5 * time.Second
	// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
	// scaling events, after one is processed. Its role is to reduce jitter.
	DefaultDialQueueScalingMutePeriod = 1 * time.Second
	// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
	// or divided when upscaling and downscaling events occur, respectively.
	DefaultDialQueueScalingFactor = 1.5
30
)
31 32

type dialQueue struct {
33
	*dqParams
34

35 36 37
	nWorkers  uint
	out       *queue.ChanQueue
	startOnce sync.Once
38

39
	waitingCh chan waitingCh
40 41 42 43 44
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
type dqParams struct {
	ctx    context.Context
	target string
	dialFn func(context.Context, peer.ID) error
	in     *queue.ChanQueue
	config dqConfig
}

type dqConfig struct {
	// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	minParallelism uint
	// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	maxParallelism uint
	// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
	// and downscaling events occur, respectively.
	scalingFactor float64
	// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
	// Its role is to reduce jitter.
	mutePeriod time.Duration
	// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
	return dqConfig{
		minParallelism: DefaultDialQueueMinParallelism,
		maxParallelism: DefaultDialQueueMaxParallelism,
		scalingFactor:  DefaultDialQueueScalingFactor,
		maxIdle:        DefaultDialQueueMaxIdle,
		mutePeriod:     DefaultDialQueueScalingMutePeriod,
	}
}

func (dqc *dqConfig) validate() error {
	if dqc.minParallelism > dqc.maxParallelism {
		return fmt.Errorf("minParallelism must be below maxParallelism; actual values: min=%d, max=%d",
			dqc.minParallelism, dqc.maxParallelism)
	}
	if dqc.scalingFactor < 1 {
		return fmt.Errorf("scalingFactor must be >= 1; actual value: %f", dqc.scalingFactor)
	}
	return nil
}

90 91 92 93 94
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

95 96 97 98
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
99 100 101 102 103
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
104
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
105 106 107
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
108 109 110
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
111
//
Raúl Kripalani's avatar
Raúl Kripalani committed
112 113
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
114 115
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
116
	dq := &dialQueue{
117 118
		dqParams:  params,
		out:       queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
119
		growCh:    make(chan struct{}, 1),
120
		shrinkCh:  make(chan struct{}, 1),
121
		waitingCh: make(chan waitingCh),
122
		dieCh:     make(chan struct{}, params.config.maxParallelism),
123
	}
124

125 126
	go dq.control()
	return dq, nil
127 128
}

129 130
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
131 132 133 134 135 136 137
	dq.startOnce.Do(func() {
		tgt := int(dq.dqParams.config.minParallelism)
		for i := 0; i < tgt; i++ {
			go dq.worker()
		}
		dq.nWorkers = uint(tgt)
	})
138 139
}

140 141
func (dq *dialQueue) control() {
	var (
142 143
		dialled        <-chan peer.ID
		waiting        []waitingCh
144 145
		lastScalingEvt = time.Now()
	)
146 147

	defer func() {
148
		for _, w := range waiting {
149 150
			close(w.ch)
		}
151
		waiting = nil
152 153
	}()

154 155 156 157 158 159
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
160 161 162
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
163
			continue // onto the top.
164 165 166 167 168
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
169
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
170 171 172 173 174 175 176
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
177
			continue // onto the top.
178 179 180 181 182 183 184
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
185 186 187 188 189 190 191 192
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
Matt Joiner's avatar
Matt Joiner committed
193
			logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
194 195 196 197 198 199 200
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
201
		case <-dq.growCh:
202
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
203 204 205 206 207
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
208
			if time.Since(lastScalingEvt) < dq.config.mutePeriod {
209 210 211 212 213 214 215 216
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

217
func (dq *dialQueue) Consume() <-chan peer.ID {
218 219 220
	ch := make(chan peer.ID, 1)

	select {
Raúl Kripalani's avatar
Raúl Kripalani committed
221 222 223 224 225
	case p, ok := <-dq.out.DeqChan:
		// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
		if ok {
			ch <- p
		}
226
		close(ch)
227 228 229 230 231
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
232 233 234 235 236 237 238 239 240 241 242
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
243 244
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
245 246 247
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
248
	}
249
	return ch
250 251 252
}

func (dq *dialQueue) grow() {
253
	// no mutex needed as this is only called from the (single-threaded) control loop.
254
	defer func(prev uint) {
255 256 257
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
258
		logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
259 260
	}(dq.nWorkers)

261
	if dq.nWorkers == dq.config.maxParallelism {
262 263
		return
	}
264 265 266 267
	// choosing not to worry about uint wrapping beyond max value.
	target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
	if target > dq.config.maxParallelism {
		target = dq.config.maxParallelism
268 269 270 271 272 273 274
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
275
	// no mutex needed as this is only called from the (single-threaded) control loop.
276
	defer func(prev uint) {
277 278 279
		if prev == dq.nWorkers {
			return
		}
Matt Joiner's avatar
Matt Joiner committed
280
		logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
281 282
	}(dq.nWorkers)

283
	if dq.nWorkers == dq.config.minParallelism {
284 285
		return
	}
286 287 288
	target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
	if target < dq.config.minParallelism {
		target = dq.config.minParallelism
289 290 291 292 293 294
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
Matt Joiner's avatar
Matt Joiner committed
295
			logger.Debugf("too many die signals queued up.")
296 297 298 299 300 301 302
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
303
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
304 305 306 307 308 309 310 311 312 313
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

314 315 316 317
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
318
		}
319
		idleTimer.Reset(dq.config.maxIdle)
320 321 322 323 324 325 326 327

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
328 329 330 331 332
		case p, ok := <-dq.in.DeqChan:
			if !ok {
				return
			}

333
			t := time.Now()
334
			if err := dq.dialFn(dq.ctx, p); err != nil {
Matt Joiner's avatar
Matt Joiner committed
335
				logger.Debugf("discarding dialled peer because of error: %v", err)
336 337
				continue
			}
Matt Joiner's avatar
Matt Joiner committed
338
			logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
339
			waiting := len(dq.waitingCh)
340 341 342 343 344 345 346 347

			// by the time we're done dialling, it's possible that the context is closed, in which case there will
			// be nobody listening on dq.out.EnqChan and we could block forever.
			select {
			case dq.out.EnqChan <- p:
			case <-dq.ctx.Done():
				return
			}
348 349 350 351 352 353 354 355 356 357 358 359 360 361
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}