limiter.go 4.37 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"sync"
6
	"time"
Jeromy's avatar
Jeromy committed
7

Jeromy's avatar
Jeromy committed
8 9
	addrutil "github.com/libp2p/go-addr-util"
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
10
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
11
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
12 13 14
)

type dialResult struct {
Steven Allen's avatar
Steven Allen committed
15
	Conn transport.Conn
Jeromy's avatar
Jeromy committed
16
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
17 18 19 20 21 22 23 24 25 26 27
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
28 29 30 31 32 33 34 35 36
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

37 38 39 40 41 42 43 44 45
func (dj *dialJob) dialTimeout() time.Duration {
	timeout := DialTimeout
	if lowTimeoutFilters.AddrBlocked(dj.addr) {
		timeout = DialTimeoutLocal
	}

	return timeout
}

Jeromy's avatar
Jeromy committed
46
type dialLimiter struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
47 48
	lk sync.Mutex

Jeromy's avatar
Jeromy committed
49 50 51 52
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

Łukasz Magiera's avatar
Łukasz Magiera committed
53
	dialFunc dialfunc
Jeromy's avatar
Jeromy committed
54 55 56 57 58 59

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

Steven Allen's avatar
Steven Allen committed
60
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
Jeromy's avatar
Jeromy committed
61 62

func newDialLimiter(df dialfunc) *dialLimiter {
Steven Allen's avatar
Steven Allen committed
63
	return newDialLimiterWithParams(df, ConcurrentFdDials, DefaultPerPeerRateLimit)
Jeromy's avatar
Jeromy committed
64 65
}

66
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
67
	return &dialLimiter{
68 69
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
70 71 72 73 74 75
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
76 77 78 79 80 81 82 83 84 85 86
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
	dl.fdConsuming--

	if len(dl.waitingOnFd) > 0 {
		next := dl.waitingOnFd[0]
		dl.waitingOnFd[0] = nil // clear out memory
		dl.waitingOnFd = dl.waitingOnFd[1:]
		if len(dl.waitingOnFd) == 0 {
			dl.waitingOnFd = nil // clear out memory
Jeromy's avatar
Jeromy committed
87
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
88 89 90 91
		dl.fdConsuming++

		// we already have activePerPeer token at this point so we can just dial
		go dl.executeDial(next)
Jeromy's avatar
Jeromy committed
92
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
93
}
Jeromy's avatar
Jeromy committed
94

Łukasz Magiera's avatar
Łukasz Magiera committed
95
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
Jeromy's avatar
Jeromy committed
96 97 98 99 100 101 102 103 104 105
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
		if len(waitlist) == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
106
			delete(dl.waitingOnPeerLimit, next.peer)
Jeromy's avatar
Jeromy committed
107
		} else {
108
			waitlist[0] = nil // clear out memory
Łukasz Magiera's avatar
Łukasz Magiera committed
109
			dl.waitingOnPeerLimit[next.peer] = waitlist[1:]
Jeromy's avatar
Jeromy committed
110 111
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
112
		dl.activePerPeer[next.peer]++ // just kidding, we still want this token
113

Łukasz Magiera's avatar
Łukasz Magiera committed
114
		dl.addCheckFdLimit(next)
Jeromy's avatar
Jeromy committed
115 116 117
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
118 119 120
func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()
Jeromy's avatar
Jeromy committed
121

Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.freeFDToken()
Jeromy's avatar
Jeromy committed
124 125
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
126 127 128 129
	dl.freePeerToken(dj)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
Jeromy's avatar
Jeromy committed
130 131 132 133 134 135 136 137 138 139
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
140 141 142
	go dl.executeDial(dj)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

	dl.addCheckFdLimit(dj)
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()

	dl.addCheckPeerLimit(dj)
}

164
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
Łukasz Magiera's avatar
Łukasz Magiera committed
165 166
	dl.lk.Lock()
	defer dl.lk.Unlock()
167
	delete(dl.waitingOnPeerLimit, p)
Łukasz Magiera's avatar
Łukasz Magiera committed
168
	// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
169 170 171 172
	// remove them as we encounter them because they are 'cancelled' at this
	// point
}

Jeromy's avatar
Jeromy committed
173 174 175 176 177
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
178 179 180 181
	if j.cancelled() {
		return
	}

182 183 184 185
	dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
	defer cancel()

	con, err := dl.dialFunc(dctx, j.peer, j.addr)
Jeromy's avatar
Jeromy committed
186
	select {
Jeromy's avatar
Jeromy committed
187
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
188
	case <-j.ctx.Done():
189 190 191
		if err == nil {
			con.Close()
		}
Jeromy's avatar
Jeromy committed
192 193
	}
}