limiter.go 5.77 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
5 6
	"os"
	"strconv"
Jeromy's avatar
Jeromy committed
7
	"sync"
8
	"time"
Jeromy's avatar
Jeromy committed
9

10 11 12
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/transport"

Jeromy's avatar
Jeromy committed
13 14
	addrutil "github.com/libp2p/go-addr-util"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
15 16 17
)

type dialResult struct {
18
	Conn transport.CapableConn
Jeromy's avatar
Jeromy committed
19
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
20 21 22 23
	Err  error
}

type dialJob struct {
Steven Allen's avatar
Steven Allen committed
24 25 26 27
	addr ma.Multiaddr
	peer peer.ID
	ctx  context.Context
	resp chan dialResult
Jeromy's avatar
Jeromy committed
28 29
}

Jeromy's avatar
Jeromy committed
30
func (dj *dialJob) cancelled() bool {
Matt Joiner's avatar
Matt Joiner committed
31
	return dj.ctx.Err() != nil
Jeromy's avatar
Jeromy committed
32 33
}

34
func (dj *dialJob) dialTimeout() time.Duration {
35
	timeout := transport.DialTimeout
36 37 38 39 40 41 42
	if lowTimeoutFilters.AddrBlocked(dj.addr) {
		timeout = DialTimeoutLocal
	}

	return timeout
}

Jeromy's avatar
Jeromy committed
43
type dialLimiter struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
44 45
	lk sync.Mutex

Jeromy's avatar
Jeromy committed
46 47 48 49
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

Łukasz Magiera's avatar
Łukasz Magiera committed
50
	dialFunc dialfunc
Jeromy's avatar
Jeromy committed
51 52 53 54 55 56

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

57
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
Jeromy's avatar
Jeromy committed
58 59

func newDialLimiter(df dialfunc) *dialLimiter {
60 61 62 63 64 65 66
	fd := ConcurrentFdDials
	if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
		if n, err := strconv.ParseInt(env, 10, 32); err == nil {
			fd = int(n)
		}
	}
	return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit)
Jeromy's avatar
Jeromy committed
67 68
}

69
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
70
	return &dialLimiter{
71 72
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
73 74 75 76 77 78
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
79 80 81
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
82
	log.Debugf("[limiter] freeing FD token; waiting: %d; consuming: %d", len(dl.waitingOnFd), dl.fdConsuming)
Łukasz Magiera's avatar
Łukasz Magiera committed
83 84
	dl.fdConsuming--

85
	for len(dl.waitingOnFd) > 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
86 87 88
		next := dl.waitingOnFd[0]
		dl.waitingOnFd[0] = nil // clear out memory
		dl.waitingOnFd = dl.waitingOnFd[1:]
89

Łukasz Magiera's avatar
Łukasz Magiera committed
90
		if len(dl.waitingOnFd) == 0 {
91 92 93 94 95 96 97 98
			// clear out memory.
			dl.waitingOnFd = nil
		}

		// Skip over canceled dials instead of queuing up a goroutine.
		if next.cancelled() {
			dl.freePeerToken(next)
			continue
Jeromy's avatar
Jeromy committed
99
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
100 101 102 103
		dl.fdConsuming++

		// we already have activePerPeer token at this point so we can just dial
		go dl.executeDial(next)
104
		return
Jeromy's avatar
Jeromy committed
105
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
106
}
Jeromy's avatar
Jeromy committed
107

Łukasz Magiera's avatar
Łukasz Magiera committed
108
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
109 110
	log.Debugf("[limiter] freeing peer token; peer %s; addr: %s; active for peer: %d; waiting on peer limit: %d",
		dj.peer, dj.addr, dl.activePerPeer[dj.peer], len(dl.waitingOnPeerLimit[dj.peer]))
Jeromy's avatar
Jeromy committed
111 112 113 114 115 116 117
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
118
	for len(waitlist) > 0 {
Jeromy's avatar
Jeromy committed
119
		next := waitlist[0]
120 121 122 123
		waitlist[0] = nil // clear out memory
		waitlist = waitlist[1:]

		if len(waitlist) == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
124
			delete(dl.waitingOnPeerLimit, next.peer)
Jeromy's avatar
Jeromy committed
125
		} else {
126 127 128 129 130
			dl.waitingOnPeerLimit[next.peer] = waitlist
		}

		if next.cancelled() {
			continue
Jeromy's avatar
Jeromy committed
131 132
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
133
		dl.activePerPeer[next.peer]++ // just kidding, we still want this token
134

Łukasz Magiera's avatar
Łukasz Magiera committed
135
		dl.addCheckFdLimit(next)
136
		return
Jeromy's avatar
Jeromy committed
137 138 139
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
140 141 142
func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()
Jeromy's avatar
Jeromy committed
143

Łukasz Magiera's avatar
Łukasz Magiera committed
144 145
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.freeFDToken()
Jeromy's avatar
Jeromy committed
146 147
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151
	dl.freePeerToken(dj)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
Jeromy's avatar
Jeromy committed
152 153
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
154 155
			log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
				"limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
Jeromy's avatar
Jeromy committed
156 157 158 159
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

160 161
		log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
			dj.peer, dj.addr, dl.fdConsuming)
Jeromy's avatar
Jeromy committed
162 163 164 165
		// take token
		dl.fdConsuming++
	}

166 167
	log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
		dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
Jeromy's avatar
Jeromy committed
168 169 170
	go dl.executeDial(dj)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
171 172
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
173 174 175
		log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
			"peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
			len(dl.waitingOnPeerLimit[dj.peer]))
Łukasz Magiera's avatar
Łukasz Magiera committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

	dl.addCheckFdLimit(dj)
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()

192
	log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr)
Łukasz Magiera's avatar
Łukasz Magiera committed
193 194 195
	dl.addCheckPeerLimit(dj)
}

196
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
Łukasz Magiera's avatar
Łukasz Magiera committed
197 198
	dl.lk.Lock()
	defer dl.lk.Unlock()
199
	delete(dl.waitingOnPeerLimit, p)
200
	log.Debugf("[limiter] clearing all peer dials: %v", p)
Łukasz Magiera's avatar
Łukasz Magiera committed
201
	// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
202 203 204 205
	// remove them as we encounter them because they are 'cancelled' at this
	// point
}

Jeromy's avatar
Jeromy committed
206 207 208 209 210
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
211 212 213 214
	if j.cancelled() {
		return
	}

215 216 217 218
	dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
	defer cancel()

	con, err := dl.dialFunc(dctx, j.peer, j.addr)
Jeromy's avatar
Jeromy committed
219
	select {
Jeromy's avatar
Jeromy committed
220
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
221
	case <-j.ctx.Done():
222 223 224
		if err == nil {
			con.Close()
		}
Jeromy's avatar
Jeromy committed
225 226
	}
}