dial_queue.go 7.46 KB
Newer Older
1 2 3 4 5 6 7
package dht

import (
	"context"
	"math"
	"time"

8 9
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
10 11
)

12 13 14 15 16 17
var (
	DialQueueMinParallelism    = 6
	DialQueueMaxParallelism    = 20
	DialQueueMaxIdle           = 5 * time.Second
	DialQueueScalingMutePeriod = 1 * time.Second
)
18 19 20 21 22 23 24 25 26 27 28

type dialQueue struct {
	ctx    context.Context
	dialFn func(context.Context, peer.ID) error

	nWorkers      int
	scalingFactor float64

	in  *queue.ChanQueue
	out *queue.ChanQueue

29
	waitingCh chan waitingCh
30 31 32 33 34
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

35 36 37 38 39
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

40
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
Raúl Kripalani's avatar
Raúl Kripalani committed
41 42
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
43 44 45 46 47 48 49 50 51
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
52 53 54
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
55
//
Raúl Kripalani's avatar
Raúl Kripalani committed
56 57 58
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
59 60 61 62 63 64 65 66 67 68 69 70
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
	sq := &dialQueue{
		ctx:           ctx,
		dialFn:        dialFn,
		nWorkers:      DialQueueMinParallelism,
		scalingFactor: 1.5,

		in:  in,
		out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

		growCh:    make(chan struct{}, nConsumers),
		shrinkCh:  make(chan struct{}, 1),
71
		waitingCh: make(chan waitingCh, nConsumers),
72 73 74 75 76 77 78 79 80 81 82 83 84
		dieCh:     make(chan struct{}, DialQueueMaxParallelism),
	}
	for i := 0; i < DialQueueMinParallelism; i++ {
		go sq.worker()
	}
	go sq.control()
	return sq
}

func (dq *dialQueue) control() {
	var (
		p              peer.ID
		dialled        = dq.out.DeqChan
85 86
		resp           waitingCh
		waiting        <-chan waitingCh
87 88 89 90 91 92 93
		lastScalingEvt = time.Now()
	)
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
94 95 96 97 98 99 100
			// close channels.
			if resp.ch != nil {
				close(resp.ch)
			}
			for w := range waiting {
				close(w.ch)
			}
101 102 103 104 105 106
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
			continue // onto the top.
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
107 108 109
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
110 111 112 113 114 115 116 117 118 119 120 121 122
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
			continue                               // onto the top.
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
123 124 125
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
		case <-dq.growCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

143
func (dq *dialQueue) Consume() <-chan peer.ID {
144 145 146 147
	ch := make(chan peer.ID, 1)

	select {
	case p := <-dq.out.DeqChan:
148
		// short circuit and return a dialled peer if it's immediately available.
149 150
		ch <- p
		close(ch)
151 152 153 154 155
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
156 157 158 159 160 161 162 163 164 165 166
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
167
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
168
	default:
169
		panic("detected more consuming goroutines than declared upfront")
170
	}
171
	return ch
172 173 174
}

func (dq *dialQueue) grow() {
175
	// no mutex needed as this is only called from the (single-threaded) control loop.
176 177 178 179 180 181 182
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

183 184 185 186 187 188 189 190 191 192 193 194 195
	if dq.nWorkers == DialQueueMaxParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
	if target > DialQueueMaxParallelism {
		target = DialQueueMinParallelism
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
196
	// no mutex needed as this is only called from the (single-threaded) control loop.
197 198 199 200 201 202 203
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
	if dq.nWorkers == DialQueueMinParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
	if target < DialQueueMinParallelism {
		target = DialQueueMinParallelism
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
			log.Debugf("too many die signals queued up.")
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
	idleTimer := time.NewTimer(0)

	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

		if !idleTimer.Stop() {
			select {
			case <-idleTimer.C:
			default:
			}
		}
		idleTimer.Reset(DialQueueMaxIdle)

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
		case p := <-dq.in.DeqChan:
252
			t := time.Now()
253 254 255 256
			if err := dq.dialFn(dq.ctx, p); err != nil {
				log.Debugf("discarding dialled peer because of error: %v", err)
				continue
			}
257
			log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
			waiting := len(dq.waitingCh)
			dq.out.EnqChan <- p
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}