limiter.go 4.34 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"sync"
6
	"time"
Jeromy's avatar
Jeromy committed
7

Jeromy's avatar
Jeromy committed
8 9
	addrutil "github.com/libp2p/go-addr-util"
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
10
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
11
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
12 13 14
)

type dialResult struct {
Steven Allen's avatar
Steven Allen committed
15
	Conn transport.Conn
Jeromy's avatar
Jeromy committed
16
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
17 18 19 20
	Err  error
}

type dialJob struct {
Steven Allen's avatar
Steven Allen committed
21 22 23 24
	addr ma.Multiaddr
	peer peer.ID
	ctx  context.Context
	resp chan dialResult
Jeromy's avatar
Jeromy committed
25 26
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

36
func (dj *dialJob) dialTimeout() time.Duration {
37
	timeout := transport.DialTimeout
38 39 40 41 42 43 44
	if lowTimeoutFilters.AddrBlocked(dj.addr) {
		timeout = DialTimeoutLocal
	}

	return timeout
}

Jeromy's avatar
Jeromy committed
45
type dialLimiter struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
46 47
	lk sync.Mutex

Jeromy's avatar
Jeromy committed
48 49 50 51
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

Łukasz Magiera's avatar
Łukasz Magiera committed
52
	dialFunc dialfunc
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

Steven Allen's avatar
Steven Allen committed
59
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
Jeromy's avatar
Jeromy committed
60 61

func newDialLimiter(df dialfunc) *dialLimiter {
Steven Allen's avatar
Steven Allen committed
62
	return newDialLimiterWithParams(df, ConcurrentFdDials, DefaultPerPeerRateLimit)
Jeromy's avatar
Jeromy committed
63 64
}

65
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
66
	return &dialLimiter{
67 68
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
69 70 71 72 73 74
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
75 76 77 78 79 80 81 82 83 84 85
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
	dl.fdConsuming--

	if len(dl.waitingOnFd) > 0 {
		next := dl.waitingOnFd[0]
		dl.waitingOnFd[0] = nil // clear out memory
		dl.waitingOnFd = dl.waitingOnFd[1:]
		if len(dl.waitingOnFd) == 0 {
			dl.waitingOnFd = nil // clear out memory
Jeromy's avatar
Jeromy committed
86
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
87 88 89 90
		dl.fdConsuming++

		// we already have activePerPeer token at this point so we can just dial
		go dl.executeDial(next)
Jeromy's avatar
Jeromy committed
91
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
92
}
Jeromy's avatar
Jeromy committed
93

Łukasz Magiera's avatar
Łukasz Magiera committed
94
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
Jeromy's avatar
Jeromy committed
95 96 97 98 99 100 101
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
Steven Allen's avatar
Steven Allen committed
102
	if len(waitlist) > 0 {
Jeromy's avatar
Jeromy committed
103 104
		next := waitlist[0]
		if len(waitlist) == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
105
			delete(dl.waitingOnPeerLimit, next.peer)
Jeromy's avatar
Jeromy committed
106
		} else {
107
			waitlist[0] = nil // clear out memory
Łukasz Magiera's avatar
Łukasz Magiera committed
108
			dl.waitingOnPeerLimit[next.peer] = waitlist[1:]
Jeromy's avatar
Jeromy committed
109 110
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
111
		dl.activePerPeer[next.peer]++ // just kidding, we still want this token
112

Łukasz Magiera's avatar
Łukasz Magiera committed
113
		dl.addCheckFdLimit(next)
Jeromy's avatar
Jeromy committed
114 115 116
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
117 118 119
func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()
Jeromy's avatar
Jeromy committed
120

Łukasz Magiera's avatar
Łukasz Magiera committed
121 122
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.freeFDToken()
Jeromy's avatar
Jeromy committed
123 124
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
125 126 127 128
	dl.freePeerToken(dj)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
Jeromy's avatar
Jeromy committed
129 130 131 132 133 134 135 136 137 138
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
139 140 141
	go dl.executeDial(dj)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

	dl.addCheckFdLimit(dj)
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.lk.Lock()
	defer dl.lk.Unlock()

	dl.addCheckPeerLimit(dj)
}

163
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
Łukasz Magiera's avatar
Łukasz Magiera committed
164 165
	dl.lk.Lock()
	defer dl.lk.Unlock()
166
	delete(dl.waitingOnPeerLimit, p)
Łukasz Magiera's avatar
Łukasz Magiera committed
167
	// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
168 169 170 171
	// remove them as we encounter them because they are 'cancelled' at this
	// point
}

Jeromy's avatar
Jeromy committed
172 173 174 175 176
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
177 178 179 180
	if j.cancelled() {
		return
	}

181 182 183 184
	dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
	defer cancel()

	con, err := dl.dialFunc(dctx, j.peer, j.addr)
Jeromy's avatar
Jeromy committed
185
	select {
Jeromy's avatar
Jeromy committed
186
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
187
	case <-j.ctx.Done():
188 189 190
		if err == nil {
			con.Close()
		}
Jeromy's avatar
Jeromy committed
191 192
	}
}