limiter.go 6.33 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
5 6
	"os"
	"strconv"
Jeromy's avatar
Jeromy committed
7
	"sync"
8
	"time"
Jeromy's avatar
Jeromy committed
9

tavit ohanian's avatar
tavit ohanian committed
10 11
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
	"gitlab.dms3.io/p2p/go-p2p-core/transport"
12

13
	ma "gitlab.dms3.io/mf/go-multiaddr"
Jeromy's avatar
Jeromy committed
14 15 16
)

type dialResult struct {
17
	Conn transport.CapableConn
Jeromy's avatar
Jeromy committed
18
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
19 20 21 22
	Err  error
}

type dialJob struct {
Steven Allen's avatar
Steven Allen committed
23 24 25 26
	addr ma.Multiaddr
	peer peer.ID
	ctx  context.Context
	resp chan dialResult
Jeromy's avatar
Jeromy committed
27 28
}

Jeromy's avatar
Jeromy committed
29
func (dj *dialJob) cancelled() bool {
Matt Joiner's avatar
Matt Joiner committed
30
	return dj.ctx.Err() != nil
Jeromy's avatar
Jeromy committed
31 32
}

33
func (dj *dialJob) dialTimeout() time.Duration {
34
	timeout := transport.DialTimeout
35 36 37 38 39 40 41
	if lowTimeoutFilters.AddrBlocked(dj.addr) {
		timeout = DialTimeoutLocal
	}

	return timeout
}

Jeromy's avatar
Jeromy committed
42
type dialLimiter struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
43 44
	lk sync.Mutex

Aarsh Shah's avatar
Aarsh Shah committed
45 46 47 48
	isFdConsumingFnc isFdConsumingFnc
	fdConsuming      int
	fdLimit          int
	waitingOnFd      []*dialJob
Jeromy's avatar
Jeromy committed
49

Łukasz Magiera's avatar
Łukasz Magiera committed
50
	dialFunc dialfunc
Jeromy's avatar
Jeromy committed
51 52 53 54 55 56

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

57
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
Aarsh Shah's avatar
Aarsh Shah committed
58
type isFdConsumingFnc func(ma.Multiaddr) bool
Jeromy's avatar
Jeromy committed
59

Aarsh Shah's avatar
Aarsh Shah committed
60
func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
61
	fd := ConcurrentFdDials
tavit ohanian's avatar
tavit ohanian committed
62
	if env := os.Getenv("P2P_SWARM_FD_LIMIT"); env != "" {
63 64 65 66
		if n, err := strconv.ParseInt(env, 10, 32); err == nil {
			fd = int(n)
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
67
	return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
Jeromy's avatar
Jeromy committed
68 69
}

Aarsh Shah's avatar
Aarsh Shah committed
70
func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
71
	return &dialLimiter{
Aarsh Shah's avatar
Aarsh Shah committed
72
		isFdConsumingFnc:   isFdConsumingFnc,
73 74
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
75 76 77 78 79 80
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
81 82 83
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
84
	log.Debugf("[limiter] freeing FD token; waiting: %d; consuming: %d", len(dl.waitingOnFd), dl.fdConsuming)
Łukasz Magiera's avatar
Łukasz Magiera committed
85 86
	dl.fdConsuming--

87
	for len(dl.waitingOnFd) > 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
88 89 90
		next := dl.waitingOnFd[0]
		dl.waitingOnFd[0] = nil // clear out memory
		dl.waitingOnFd = dl.waitingOnFd[1:]
91

Łukasz Magiera's avatar
Łukasz Magiera committed
92
		if len(dl.waitingOnFd) == 0 {
93 94 95 96 97 98 99 100
			// clear out memory.
			dl.waitingOnFd = nil
		}

		// Skip over canceled dials instead of queuing up a goroutine.
		if next.cancelled() {
			dl.freePeerToken(next)
			continue
Jeromy's avatar
Jeromy committed
101
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
102 103 104 105
		dl.fdConsuming++

		// we already have activePerPeer token at this point so we can just dial
		go dl.executeDial(next)
106
		return
Jeromy's avatar
Jeromy committed
107
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
108
}
Jeromy's avatar
Jeromy committed
109

Łukasz Magiera's avatar
Łukasz Magiera committed
110
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
111 112
	log.Debugf("[limiter] freeing peer token; peer %s; addr: %s; active for peer: %d; waiting on peer limit: %d",
		dj.peer, dj.addr, dl.activePerPeer[dj.peer], len(dl.waitingOnPeerLimit[dj.peer]))
Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
120
	for len(waitlist) > 0 {
Jeromy's avatar
Jeromy committed
121
		next := waitlist[0]
122 123 124 125
		waitlist[0] = nil // clear out memory
		waitlist = waitlist[1:]

		if len(waitlist) == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
126
			delete(dl.waitingOnPeerLimit, next.peer)
Jeromy's avatar
Jeromy committed
127
		} else {
128 129 130 131 132
			dl.waitingOnPeerLimit[next.peer] = waitlist
		}

		if next.cancelled() {
			continue
Jeromy's avatar
Jeromy committed
133 134
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
135
		dl.activePerPeer[next.peer]++ // just kidding, we still want this token
136

Łukasz Magiera's avatar
Łukasz Magiera committed
137
		dl.addCheckFdLimit(next)
138
		return
Jeromy's avatar
Jeromy committed
139 140 141
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
142 143 144
func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()
Aarsh Shah's avatar
Aarsh Shah committed
145
	if dl.shouldConsumeFd(dj.addr) {
Łukasz Magiera's avatar
Łukasz Magiera committed
146
		dl.freeFDToken()
Jeromy's avatar
Jeromy committed
147 148
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
149 150 151
	dl.freePeerToken(dj)
}

Aarsh Shah's avatar
Aarsh Shah committed
152 153 154 155 156 157 158 159 160 161 162
func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool {
	// we don't consume FD's for relay addresses for now as they will be consumed when the Relay Transport
	// actually dials the Relay server. That dial call will also pass through this limiter with
	// the address of the relay server i.e. non-relay address.
	_, err := addr.ValueForProtocol(ma.P_CIRCUIT)

	isRelay := err == nil

	return !isRelay && dl.isFdConsumingFnc(addr)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
163
func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
Aarsh Shah's avatar
Aarsh Shah committed
164
	if dl.shouldConsumeFd(dj.addr) {
Jeromy's avatar
Jeromy committed
165
		if dl.fdConsuming >= dl.fdLimit {
166 167
			log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
				"limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
Jeromy's avatar
Jeromy committed
168 169 170 171
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

172 173
		log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
			dj.peer, dj.addr, dl.fdConsuming)
Jeromy's avatar
Jeromy committed
174 175 176 177
		// take token
		dl.fdConsuming++
	}

178 179
	log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
		dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
Jeromy's avatar
Jeromy committed
180 181 182
	go dl.executeDial(dj)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
183 184
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
185 186 187
		log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
			"peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
			len(dl.waitingOnPeerLimit[dj.peer]))
Łukasz Magiera's avatar
Łukasz Magiera committed
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

	dl.addCheckFdLimit(dj)
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()

204
	log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr)
Łukasz Magiera's avatar
Łukasz Magiera committed
205 206 207
	dl.addCheckPeerLimit(dj)
}

208
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
Łukasz Magiera's avatar
Łukasz Magiera committed
209 210
	dl.lk.Lock()
	defer dl.lk.Unlock()
211
	delete(dl.waitingOnPeerLimit, p)
212
	log.Debugf("[limiter] clearing all peer dials: %v", p)
Łukasz Magiera's avatar
Łukasz Magiera committed
213
	// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
214 215 216 217
	// remove them as we encounter them because they are 'cancelled' at this
	// point
}

Jeromy's avatar
Jeromy committed
218 219 220 221 222
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
223 224 225 226
	if j.cancelled() {
		return
	}

227 228 229 230
	dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
	defer cancel()

	con, err := dl.dialFunc(dctx, j.peer, j.addr)
Jeromy's avatar
Jeromy committed
231
	select {
Jeromy's avatar
Jeromy committed
232
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
233
	case <-j.ctx.Done():
234 235 236
		if err == nil {
			con.Close()
		}
Jeromy's avatar
Jeromy committed
237 238
	}
}