dial_queue.go 7.34 KB
Newer Older
1 2 3 4 5 6 7
package dht

import (
	"context"
	"math"
	"time"

8 9
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
10 11
)

12 13 14 15 16 17
var (
	DialQueueMinParallelism    = 6
	DialQueueMaxParallelism    = 20
	DialQueueMaxIdle           = 5 * time.Second
	DialQueueScalingMutePeriod = 1 * time.Second
)
18 19 20 21 22 23 24 25 26 27 28

type dialQueue struct {
	ctx    context.Context
	dialFn func(context.Context, peer.ID) error

	nWorkers      int
	scalingFactor float64

	in  *queue.ChanQueue
	out *queue.ChanQueue

29
	waitingCh chan waitingCh
30 31 32 33 34
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

35 36 37 38 39
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

40
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
Raúl Kripalani's avatar
Raúl Kripalani committed
41 42
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
43 44 45 46 47 48 49 50 51
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
52 53 54
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
55
//
Raúl Kripalani's avatar
Raúl Kripalani committed
56 57 58
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
59 60 61 62 63 64 65 66 67 68 69 70
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
	sq := &dialQueue{
		ctx:           ctx,
		dialFn:        dialFn,
		nWorkers:      DialQueueMinParallelism,
		scalingFactor: 1.5,

		in:  in,
		out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

		growCh:    make(chan struct{}, nConsumers),
		shrinkCh:  make(chan struct{}, 1),
71
		waitingCh: make(chan waitingCh, nConsumers),
72 73 74 75 76 77 78 79 80 81 82 83 84
		dieCh:     make(chan struct{}, DialQueueMaxParallelism),
	}
	for i := 0; i < DialQueueMinParallelism; i++ {
		go sq.worker()
	}
	go sq.control()
	return sq
}

func (dq *dialQueue) control() {
	var (
		p              peer.ID
		dialled        = dq.out.DeqChan
85 86
		resp           waitingCh
		waiting        <-chan waitingCh
87 88 89 90 91 92 93 94 95 96 97 98 99
		lastScalingEvt = time.Now()
	)
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
			continue // onto the top.
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
100 101 102
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
103 104 105 106 107 108 109 110 111 112 113 114 115
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
			continue                               // onto the top.
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
116 117 118
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
		case <-dq.growCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

136
func (dq *dialQueue) Consume() <-chan peer.ID {
137 138 139 140
	ch := make(chan peer.ID, 1)

	select {
	case p := <-dq.out.DeqChan:
141
		// short circuit and return a dialled peer if it's immediately available.
142 143
		ch <- p
		close(ch)
144 145 146 147 148
		return ch
	case <-dq.ctx.Done():
		// return a closed channel with no value if we're done.
		close(ch)
		return ch
149 150 151 152 153 154 155 156 157 158 159
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
160
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
161
	default:
162
		panic("detected more consuming goroutines than declared upfront")
163
	}
164
	return ch
165 166 167
}

func (dq *dialQueue) grow() {
168
	// no mutex needed as this is only called from the (single-threaded) control loop.
169 170 171 172 173 174 175
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

176 177 178 179 180 181 182 183 184 185 186 187 188
	if dq.nWorkers == DialQueueMaxParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
	if target > DialQueueMaxParallelism {
		target = DialQueueMinParallelism
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
189
	// no mutex needed as this is only called from the (single-threaded) control loop.
190 191 192 193 194 195 196
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
	if dq.nWorkers == DialQueueMinParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
	if target < DialQueueMinParallelism {
		target = DialQueueMinParallelism
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
			log.Debugf("too many die signals queued up.")
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
	idleTimer := time.NewTimer(0)

	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

		if !idleTimer.Stop() {
			select {
			case <-idleTimer.C:
			default:
			}
		}
		idleTimer.Reset(DialQueueMaxIdle)

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
		case p := <-dq.in.DeqChan:
245
			t := time.Now()
246 247 248 249
			if err := dq.dialFn(dq.ctx, p); err != nil {
				log.Debugf("discarding dialled peer because of error: %v", err)
				continue
			}
250
			log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
			waiting := len(dq.waitingCh)
			dq.out.EnqChan <- p
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}