limiter.go 4.07 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5 6
	"sync"

Jeromy's avatar
Jeromy committed
7 8
	addrutil "github.com/libp2p/go-addr-util"
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
9
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
10
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
11 12 13
)

type dialResult struct {
Steven Allen's avatar
Steven Allen committed
14
	Conn transport.Conn
Jeromy's avatar
Jeromy committed
15
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24 25 26
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

Jeromy's avatar
Jeromy committed
36 37 38 39 40 41
type dialLimiter struct {
	rllock      sync.Mutex
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

Steven Allen's avatar
Steven Allen committed
42
	dialFunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
Jeromy's avatar
Jeromy committed
43 44 45 46 47 48

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

Steven Allen's avatar
Steven Allen committed
49
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
Jeromy's avatar
Jeromy committed
50 51

func newDialLimiter(df dialfunc) *dialLimiter {
Steven Allen's avatar
Steven Allen committed
52
	return newDialLimiterWithParams(df, ConcurrentFdDials, DefaultPerPeerRateLimit)
Jeromy's avatar
Jeromy committed
53 54
}

55
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
56
	return &dialLimiter{
57 58
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
59 60 61 62 63 64 65 66 67 68
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
69 70
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.fdConsuming--
71

Jeromy's avatar
Jeromy committed
72 73
		if len(dl.waitingOnFd) > 0 {
			next := dl.waitingOnFd[0]
74
			dl.waitingOnFd[0] = nil // clear out memory
Jeromy's avatar
Jeromy committed
75 76 77 78 79 80 81 82 83 84
			dl.waitingOnFd = dl.waitingOnFd[1:]
			if len(dl.waitingOnFd) == 0 {
				dl.waitingOnFd = nil // clear out memory
			}
			dl.fdConsuming++

			go dl.executeDial(next)
		}
	}

Jeromy's avatar
Jeromy committed
85 86 87 88 89 90 91 92 93
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
94

Jeromy's avatar
Jeromy committed
95 96 97
		if len(waitlist) == 1 {
			delete(dl.waitingOnPeerLimit, dj.peer)
		} else {
98
			waitlist[0] = nil // clear out memory
Jeromy's avatar
Jeromy committed
99 100 101 102
			dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
		}
		dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

103 104 105 106 107 108 109 110 111 112
		if addrutil.IsFDCostlyTransport(next.addr) {
			if dl.fdConsuming >= dl.fdLimit {
				dl.waitingOnFd = append(dl.waitingOnFd, next)
				return
			}

			// take token
			dl.fdConsuming++
		}

Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119 120 121 122 123 124 125
		// can kick this off right here, dials in this list already
		// have the other tokens needed
		go dl.executeDial(next)
	}
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
126 127 128 129 130 131 132
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

Jeromy's avatar
Jeromy committed
133 134 135 136 137 138 139 140 141 142
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
143 144 145 146
	// take second needed token and start dial!
	go dl.executeDial(dj)
}

147 148 149 150 151 152 153 154 155
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()
	delete(dl.waitingOnPeerLimit, p)
	// NB: the waitingOnFd list doesnt need to be cleaned out here, we will
	// remove them as we encounter them because they are 'cancelled' at this
	// point
}

Jeromy's avatar
Jeromy committed
156 157 158 159 160
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
161 162 163 164
	if j.cancelled() {
		return
	}

Jeromy's avatar
Jeromy committed
165 166
	con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
	select {
Jeromy's avatar
Jeromy committed
167
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
168
	case <-j.ctx.Done():
169 170 171
		if err == nil {
			con.Close()
		}
Jeromy's avatar
Jeromy committed
172 173
	}
}