limiter.go 3.39 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package swarm

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5 6 7 8
	"sync"

	peer "github.com/ipfs/go-libp2p-peer"
	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
9
	addrutil "github.com/libp2p/go-addr-util"
10
	iconn "github.com/libp2p/go-libp2p-interface-conn"
Jeromy's avatar
Jeromy committed
11 12 13
)

type dialResult struct {
14
	Conn iconn.Conn
Jeromy's avatar
Jeromy committed
15 16 17 18 19 20 21 22 23 24 25
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
26 27 28 29 30 31 32 33 34
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

Jeromy's avatar
Jeromy committed
35 36 37 38 39 40
type dialLimiter struct {
	rllock      sync.Mutex
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

41
	dialFunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
42 43 44 45 46 47

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

48
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
Jeromy's avatar
Jeromy committed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

func newDialLimiter(df dialfunc) *dialLimiter {
	return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
	return &dialLimiter{
		fdLimit:            fdl,
		perPeerLimit:       ppl,
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
68 69 70 71 72 73 74 75 76 77 78 79 80 81
	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.fdConsuming--
		if len(dl.waitingOnFd) > 0 {
			next := dl.waitingOnFd[0]
			dl.waitingOnFd = dl.waitingOnFd[1:]
			if len(dl.waitingOnFd) == 0 {
				dl.waitingOnFd = nil // clear out memory
			}
			dl.fdConsuming++

			go dl.executeDial(next)
		}
	}

Jeromy's avatar
Jeromy committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
		if len(waitlist) == 1 {
			delete(dl.waitingOnPeerLimit, dj.peer)
		} else {
			dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
		}
		dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

		// can kick this off right here, dials in this list already
		// have the other tokens needed
		go dl.executeDial(next)
	}

}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

Jeromy's avatar
Jeromy committed
112 113 114 115 116 117 118
	if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[dj.peer]
		dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
		return
	}
	dl.activePerPeer[dj.peer]++

Jeromy's avatar
Jeromy committed
119 120 121 122 123 124 125 126 127 128
	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

Jeromy's avatar
Jeromy committed
129 130 131 132
	// take second needed token and start dial!
	go dl.executeDial(dj)
}

Jeromy's avatar
Jeromy committed
133 134 135 136 137
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
138 139 140 141
	if j.cancelled() {
		return
	}

Jeromy's avatar
Jeromy committed
142 143 144 145 146 147
	con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
	select {
	case j.resp <- dialResult{Conn: con, Err: err}:
	case <-j.ctx.Done():
	}
}