limiter.go 3.73 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5 6
	"sync"

Jeromy's avatar
Jeromy committed
7 8 9 10
	addrutil "github.com/libp2p/go-addr-util"
	iconn "github.com/libp2p/go-libp2p-interface-conn"
	peer "github.com/libp2p/go-libp2p-peer"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
11 12 13
)

type dialResult struct {
14
	Conn iconn.Conn
Jeromy's avatar
Jeromy committed
15
	Addr ma.Multiaddr
Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24 25 26
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

Jeromy's avatar
Jeromy committed
36 37 38 39 40 41
type dialLimiter struct {
	rllock      sync.Mutex
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

42
	dialFunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
43 44 45 46 47 48

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

49
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
50 51 52 53 54

func newDialLimiter(df dialfunc) *dialLimiter {
	return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

55
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
Jeromy's avatar
Jeromy committed
56
	return &dialLimiter{
57 58
		fdLimit:            fdLimit,
		perPeerLimit:       perPeerLimit,
Jeromy's avatar
Jeromy committed
59 60 61 62 63 64 65 66 67 68
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
69 70
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.fdConsuming--
71

Jeromy's avatar
Jeromy committed
72 73
		if len(dl.waitingOnFd) > 0 {
			next := dl.waitingOnFd[0]
74
			dl.waitingOnFd[0] = nil // clear out memory
Jeromy's avatar
Jeromy committed
75 76 77 78 79 80 81 82 83 84
			dl.waitingOnFd = dl.waitingOnFd[1:]
			if len(dl.waitingOnFd) == 0 {
				dl.waitingOnFd = nil // clear out memory
			}
			dl.fdConsuming++

			go dl.executeDial(next)
		}
	}

Jeromy's avatar
Jeromy committed
85 86 87 88 89 90 91 92 93
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
94

Jeromy's avatar
Jeromy committed
95 96 97
		if len(waitlist) == 1 {
			delete(dl.waitingOnPeerLimit, dj.peer)
		} else {
98
			waitlist[0] = nil // clear out memory
Jeromy's avatar
Jeromy committed
99 100 101 102
			dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
		}
		dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

103 104 105 106 107 108 109 110 111 112
		if addrutil.IsFDCostlyTransport(next.addr) {
			if dl.fdConsuming >= dl.fdLimit {
				dl.waitingOnFd = append(dl.waitingOnFd, next)
				return
			}

			// take token
			dl.fdConsuming++
		}

Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119 120 121 122 123 124 125
		// can kick this off right here, dials in this list already
		// have the other tokens needed
		go dl.executeDial(next)
	}
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
126 127 128 129 130 131 132
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

Jeromy's avatar
Jeromy committed
133 134 135 136 137 138 139 140 141 142
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
143 144 145 146
	// take second needed token and start dial!
	go dl.executeDial(dj)
}

Jeromy's avatar
Jeromy committed
147 148 149 150 151
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
152 153 154 155
	if j.cancelled() {
		return
	}

Jeromy's avatar
Jeromy committed
156 157
	con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
	select {
Jeromy's avatar
Jeromy committed
158
	case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
Jeromy's avatar
Jeromy committed
159 160 161
	case <-j.ctx.Done():
	}
}