dial_queue.go 7.37 KB
Newer Older
1 2 3 4 5 6 7 8
package dht

import (
	"context"
	"errors"
	"math"
	"time"

9 10
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
11 12
)

13 14 15 16 17 18
var (
	DialQueueMinParallelism    = 6
	DialQueueMaxParallelism    = 20
	DialQueueMaxIdle           = 5 * time.Second
	DialQueueScalingMutePeriod = 1 * time.Second
)
19 20 21 22 23 24 25 26 27 28 29 30 31

var ErrContextClosed = errors.New("context closed")

type dialQueue struct {
	ctx    context.Context
	dialFn func(context.Context, peer.ID) error

	nWorkers      int
	scalingFactor float64

	in  *queue.ChanQueue
	out *queue.ChanQueue

32
	waitingCh chan waitingCh
33 34 35 36 37
	dieCh     chan struct{}
	growCh    chan struct{}
	shrinkCh  chan struct{}
}

38 39 40 41 42
type waitingCh struct {
	ch chan<- peer.ID
	ts time.Time
}

43
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
Raúl Kripalani's avatar
Raúl Kripalani committed
44 45
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
46 47 48 49 50 51 52 53 54
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
Raúl Kripalani's avatar
Raúl Kripalani committed
55 56 57
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
58
//
Raúl Kripalani's avatar
Raúl Kripalani committed
59 60 61
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
62 63 64 65 66 67 68 69 70 71 72 73
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
	sq := &dialQueue{
		ctx:           ctx,
		dialFn:        dialFn,
		nWorkers:      DialQueueMinParallelism,
		scalingFactor: 1.5,

		in:  in,
		out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

		growCh:    make(chan struct{}, nConsumers),
		shrinkCh:  make(chan struct{}, 1),
74
		waitingCh: make(chan waitingCh, nConsumers),
75 76 77 78 79 80 81 82 83 84 85 86 87
		dieCh:     make(chan struct{}, DialQueueMaxParallelism),
	}
	for i := 0; i < DialQueueMinParallelism; i++ {
		go sq.worker()
	}
	go sq.control()
	return sq
}

func (dq *dialQueue) control() {
	var (
		p              peer.ID
		dialled        = dq.out.DeqChan
88 89
		resp           waitingCh
		waiting        <-chan waitingCh
90 91 92 93 94 95 96 97 98 99 100 101 102
		lastScalingEvt = time.Now()
	)
	for {
		// First process any backlog of dial jobs and waiters -- making progress is the priority.
		// This block is copied below; couldn't find a more concise way of doing this.
		select {
		case <-dq.ctx.Done():
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
			continue // onto the top.
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
103 104 105
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
106 107 108 109 110 111 112 113 114 115 116 117 118
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
			continue                               // onto the top.
		default:
			// there's nothing to process, so proceed onto the main select block.
		}

		select {
		case <-dq.ctx.Done():
			return
		case p = <-dialled:
			dialled, waiting = nil, dq.waitingCh
		case resp = <-waiting:
			// got a channel that's waiting for a peer.
119 120 121
			log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
			resp.ch <- p
			close(resp.ch)
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
			dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
		case <-dq.growCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.grow()
			lastScalingEvt = time.Now()
		case <-dq.shrinkCh:
			if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
				continue
			}
			dq.shrink()
			lastScalingEvt = time.Now()
		}
	}
}

func (dq *dialQueue) Consume() (<-chan peer.ID, error) {
	ch := make(chan peer.ID, 1)

	// short circuit and return a dialled peer if it's immediately available.
	select {
	case <-dq.ctx.Done():
		return nil, ErrContextClosed
	case p := <-dq.out.DeqChan:
		ch <- p
		close(ch)
		return ch, nil
	default:
	}

	// we have no finished dials to return, trigger a scale up.
	select {
	case dq.growCh <- struct{}{}:
	default:
	}

	// park the channel until a dialled peer becomes available.
	select {
161
	case dq.waitingCh <- waitingCh{ch, time.Now()}:
162
	default:
163
		panic("detected more consuming goroutines than declared upfront")
164 165 166 167 168
	}
	return ch, nil
}

func (dq *dialQueue) grow() {
169
	// no mutex needed as this is only called from the (single-threaded) control loop.
170 171 172 173 174 175 176
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

177 178 179 180 181 182 183 184 185 186 187 188 189
	if dq.nWorkers == DialQueueMaxParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
	if target > DialQueueMaxParallelism {
		target = DialQueueMinParallelism
	}
	for ; dq.nWorkers < target; dq.nWorkers++ {
		go dq.worker()
	}
}

func (dq *dialQueue) shrink() {
190
	// no mutex needed as this is only called from the (single-threaded) control loop.
191 192 193 194 195 196 197
	defer func(prev int) {
		if prev == dq.nWorkers {
			return
		}
		log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
	}(dq.nWorkers)

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
	if dq.nWorkers == DialQueueMinParallelism {
		return
	}
	target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
	if target < DialQueueMinParallelism {
		target = DialQueueMinParallelism
	}
	// send as many die signals as workers we have to prune.
	for ; dq.nWorkers > target; dq.nWorkers-- {
		select {
		case dq.dieCh <- struct{}{}:
		default:
			log.Debugf("too many die signals queued up.")
		}
	}
}

func (dq *dialQueue) worker() {
	// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
	// it means that the DHT query is progressing slow and we should shrink the worker pool.
	idleTimer := time.NewTimer(0)

	for {
		// trap exit signals first.
		select {
		case <-dq.ctx.Done():
			return
		case <-dq.dieCh:
			return
		default:
		}

		if !idleTimer.Stop() {
			select {
			case <-idleTimer.C:
			default:
			}
		}
		idleTimer.Reset(DialQueueMaxIdle)

		select {
		case <-dq.dieCh:
			return
		case <-dq.ctx.Done():
			return
		case <-idleTimer.C:
			// no new dial requests during our idle period; time to scale down.
		case p := <-dq.in.DeqChan:
246
			t := time.Now()
247 248 249 250
			if err := dq.dialFn(dq.ctx, p); err != nil {
				log.Debugf("discarding dialled peer because of error: %v", err)
				continue
			}
251
			log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
			waiting := len(dq.waitingCh)
			dq.out.EnqChan <- p
			if waiting > 0 {
				// we have somebody to deliver this value to, so no need to shrink.
				continue
			}
		}

		// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
		// waiting for the value we just produced.
		select {
		case dq.shrinkCh <- struct{}{}:
		default:
		}
	}
}