limiter.go 3.48 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
package swarm

import (
	"sync"

	peer "github.com/ipfs/go-libp2p-peer"
	ma "github.com/jbenet/go-multiaddr"
	context "golang.org/x/net/context"

	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
	addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)

type dialResult struct {
	Conn conn.Conn
	Err  error
}

type dialJob struct {
	addr    ma.Multiaddr
	peer    peer.ID
	ctx     context.Context
	resp    chan dialResult
	success bool
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35
func (dj *dialJob) cancelled() bool {
	select {
	case <-dj.ctx.Done():
		return true
	default:
		return false
	}
}

Jeromy's avatar
Jeromy committed
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
type dialLimiter struct {
	rllock      sync.Mutex
	fdConsuming int
	fdLimit     int
	waitingOnFd []*dialJob

	dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

	activePerPeer      map[peer.ID]int
	perPeerLimit       int
	waitingOnPeerLimit map[peer.ID][]*dialJob
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

func newDialLimiter(df dialfunc) *dialLimiter {
	return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
	return &dialLimiter{
		fdLimit:            fdl,
		perPeerLimit:       ppl,
		waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
		activePerPeer:      make(map[peer.ID]int),
		dialFunc:           df,
	}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

	// release tokens in reverse order than we take them
	dl.activePerPeer[dj.peer]--
	if dl.activePerPeer[dj.peer] == 0 {
		delete(dl.activePerPeer, dj.peer)
	}

	waitlist := dl.waitingOnPeerLimit[dj.peer]
	if !dj.success && len(waitlist) > 0 {
		next := waitlist[0]
		if len(waitlist) == 1 {
			delete(dl.waitingOnPeerLimit, dj.peer)
		} else {
			dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
		}
		dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

		// can kick this off right here, dials in this list already
		// have the other tokens needed
		go dl.executeDial(next)
	}

	if addrutil.IsFDCostlyTransport(dj.addr) {
		dl.fdConsuming--
		if len(dl.waitingOnFd) > 0 {
			next := dl.waitingOnFd[0]
			dl.waitingOnFd = dl.waitingOnFd[1:]
			dl.fdConsuming++

			// now, attempt to take the 'per peer limit' token
			dl.schedulePerPeerDial(next)
		}
	}
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
	dl.rllock.Lock()
	defer dl.rllock.Unlock()

	if addrutil.IsFDCostlyTransport(dj.addr) {
		if dl.fdConsuming >= dl.fdLimit {
			dl.waitingOnFd = append(dl.waitingOnFd, dj)
			return
		}

		// take token
		dl.fdConsuming++
	}

	dl.schedulePerPeerDial(dj)
}

// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
	defer dl.finishedDial(j)
Jeromy's avatar
Jeromy committed
128 129 130 131
	if j.cancelled() {
		return
	}

Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
	con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
	select {
	case j.resp <- dialResult{Conn: con, Err: err}:
	case <-j.ctx.Done():
	}
}

func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
	if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
		wlist := dl.waitingOnPeerLimit[j.peer]
		dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
		return
	}

	// take second needed token and start dial!
	dl.activePerPeer[j.peer]++
	go dl.executeDial(j)
}