dial_queue.go 8.28 KB
Newer Older
1 2 3 4 5 6 7
package dht

import (
	"context"
	"math"
	"time"

8 9
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
10 11
)

12
const (
Raúl Kripalani's avatar
Raúl Kripalani committed
13 14 15 16 17 18 19 20
	// DialQueueMinParallelism is the minimum number of worker dial goroutines that will be alive at any time.
	DialQueueMinParallelism = 6
	// DialQueueMaxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
	DialQueueMaxParallelism = 20
	// DialQueueMaxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
	DialQueueMaxIdle = 5 * time.Second
	// DialQueueScalingMutePeriod is the amount of time to ignore further worker pool scaling events, after one is
	// processed. Its role is to reduce jitter.
21 22
	DialQueueScalingMutePeriod = 1 * time.Second
)
23 24 25 26 27

type dialQueue struct {
	ctx    context.Context
	dialFn func(context.Context, peer.ID) error

28 29 30 31
	nWorkers          int
	scalingFactor     float64
	scalingMutePeriod time.Duration
	maxIdle           time.Duration
32 33 34 35

	in  *queue.ChanQueue
	out *queue.ChanQueue

36
	waitingCh chan waitingCh
37 38 39 40 41
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

42 43 44 45 46
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

47
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
Raúl Kripalani's avatar
Raúl Kripalani committed
48 49
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
50 51 52 53 54 55 56 57 58
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
59 60 61
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
62
//
Raúl Kripalani's avatar
Raúl Kripalani committed
63 64 65
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
66 67 68
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error,
	maxIdle, scalingMutePeriod time.Duration,
) *dialQueue {
69
	sq := &dialQueue{
70 71 72 73 74 75
		ctx:               ctx,
		dialFn:            dialFn,
		nWorkers:          DialQueueMinParallelism,
		scalingFactor:     1.5,
		scalingMutePeriod: scalingMutePeriod,
		maxIdle:           maxIdle,
76 77 78 79

		in:  in,
		out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

80
		growCh:    make(chan struct{}, 1),
81
		shrinkCh:  make(chan struct{}, 1),
82
		waitingCh: make(chan waitingCh),
83 84 85 86 87 88 89 90 91 92 93
		dieCh:     make(chan struct{}, DialQueueMaxParallelism),
	}
	for i := 0; i < DialQueueMinParallelism; i++ {
		go sq.worker()
	}
	go sq.control()
	return sq
}

func (dq *dialQueue) control() {
	var (
94 95
		dialled        <-chan peer.ID
		waiting        []waitingCh
96 97
		lastScalingEvt = time.Now()
	)
98 99

	defer func() {
100
		for _, w := range waiting {
101 102
			close(w.ch)
		}
103
		waiting = nil
104 105
	}()

106 107 108 109 110 111
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
112 113 114
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
115
			continue // onto the top.
116 117 118 119 120 121 122 123 124 125 126 127 128
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
129
			continue // onto the top.
130 131 132 133 134 135 136
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
		case w := <-dq.waitingCh:
			waiting = append(waiting, w)
			dialled = dq.out.DeqChan
		case p, ok := <-dialled:
			if !ok {
				return // we're done if the ChanQueue is closed, which happens when the context is closed.
			}
			w := waiting[0]
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
			w.ch <- p
			close(w.ch)
			waiting = waiting[1:]
			if len(waiting) == 0 {
				// no more waiters, so stop consuming dialled jobs.
				dialled = nil
			}
153
		case <-dq.growCh:
154
			if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
155 156 157 158 159
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
160
			if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
161 162 163 164 165 166 167 168
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

169
func (dq *dialQueue) Consume() <-chan peer.ID {
170 171 172 173
	ch := make(chan peer.ID, 1)

	select {
	case p := <-dq.out.DeqChan:
174
		// short circuit and return a dialled peer if it's immediately available.
175 176
		ch <- p
		close(ch)
177 178 179 180 181
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
182 183 184 185 186 187 188 189 190 191 192
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
193 194
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
		// all good
195 196 197
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
198
	}
199
	return ch
200 201 202
}

func (dq *dialQueue) grow() {
203
	// no mutex needed as this is only called from the (single-threaded) control loop.
204 205 206 207 208 209 210
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

211 212 213 214 215 216 217 218 219 220 221 222 223
	if dq.nWorkers == DialQueueMaxParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
	if target > DialQueueMaxParallelism {
		target = DialQueueMinParallelism
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
224
	// no mutex needed as this is only called from the (single-threaded) control loop.
225 226 227 228 229 230 231
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
	if dq.nWorkers == DialQueueMinParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
	if target < DialQueueMinParallelism {
		target = DialQueueMinParallelism
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
			log.Debugf("too many die signals queued up.")
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
252
	idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
253 254 255 256 257 258 259 260 261 262
	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

263 264 265 266
		idleTimer.Stop()
		select {
		case <-idleTimer.C:
		default:
267
		}
268
		idleTimer.Reset(dq.maxIdle)
269 270 271 272 273 274 275 276 277

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
		case p := <-dq.in.DeqChan:
278
			t := time.Now()
279 280 281 282
			if err := dq.dialFn(dq.ctx, p); err != nil {
				log.Debugf("discarding dialled peer because of error: %v", err)
				continue
			}
283
			log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
			waiting := len(dq.waitingCh)
			dq.out.EnqChan <- p
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}