limiter.go 3.43 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5 6
	"sync"

Jeromy's avatar
Jeromy committed
7
	addrutil "github.com/libp2p/go-addr-util"
8
	iconn "github.com/libp2p/go-libp2p-interface-conn"
Jeromy's avatar
Jeromy committed
9 10
	peer "github.com/libp2p/go-libp2p-peer"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
11 12 13
)

type dialResult struct {
14
	Conn iconn.Conn
Jeromy's avatar
Jeromy committed
15
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24 25 26
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

Jeromy's avatar
Jeromy committed
36 37 38 39 40 41
type dialLimiter struct {
	rllock      sync.Mutex
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

42
	dialFunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
43 44 45 46 47 48

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

49
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

func newDialLimiter(df dialfunc) *dialLimiter {
	return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
	return &dialLimiter{
		fdLimit:            fdl,
		perPeerLimit:       ppl,
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.fdConsuming--
		if len(dl.waitingOnFd) > 0 {
			next := dl.waitingOnFd[0]
			dl.waitingOnFd = dl.waitingOnFd[1:]
			if len(dl.waitingOnFd) == 0 {
				dl.waitingOnFd = nil // clear out memory
			}
			dl.fdConsuming++

			go dl.executeDial(next)
		}
	}

Jeromy's avatar
Jeromy committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
		if len(waitlist) == 1 {
			delete(dl.waitingOnPeerLimit, dj.peer)
		} else {
			dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
		}
		dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

		// can kick this off right here, dials in this list already
		// have the other tokens needed
		go dl.executeDial(next)
	}

}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

Jeromy's avatar
Jeromy committed
120 121 122 123 124 125 126 127 128 129
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
130 131 132 133
	// take second needed token and start dial!
	go dl.executeDial(dj)
}

Jeromy's avatar
Jeromy committed
134 135 136 137 138
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
139 140 141 142
	if j.cancelled() {
		return
	}

Jeromy's avatar
Jeromy committed
143 144
	con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
	select {
Jeromy's avatar
Jeromy committed
145
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
146 147 148
	case <-j.ctx.Done():
	}
}