Commit 74d19ef5 authored by Jeromy Johnson's avatar Jeromy Johnson

Merge pull request #38 from ipfs/sketch/dial-redo

refactor swarm dialing logic
parents 861161fc 422700f4
......@@ -40,12 +40,16 @@ func init() {
SupportedTransportProtocols = transports
}
// FilterAddrs is a filter that removes certain addresses, according to filter.
// if filter returns true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr {
// FilterAddrs is a filter that removes certain addresses, according the given filters.
// if all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
b := make([]ma.Multiaddr, 0, len(a))
for _, addr := range a {
if filter(addr) {
good := true
for _, filter := range filters {
good = good && filter(addr)
}
if good {
b = append(b, addr)
}
}
......@@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, func(m ma.Multiaddr) bool {
return AddrUsable(m, false)
})
return FilterAddrs(a, AddrUsableFunc)
}
func AddrUsableFunc(m ma.Multiaddr) bool {
return AddrUsable(m, false)
}
// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
......
package addrutil
import (
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)
// SubtractFilter returns a filter func that filters all of the given addresses
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool {
addrmap := make(map[string]bool)
for _, a := range addrs {
addrmap[string(a.Bytes())] = true
}
return func(a ma.Multiaddr) bool {
return !addrmap[string(a.Bytes())]
}
}
// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func IsFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
// FilterNeg returns a negated version of the passed in filter
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool {
return func(a ma.Multiaddr) bool {
return !f(a)
}
}
......@@ -2,7 +2,6 @@ package swarm
import (
"net"
"sort"
"sync"
"testing"
"time"
......@@ -494,38 +493,3 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly cleared backoff")
}
}
func mkAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func TestAddressSorting(t *testing.T) {
u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp")
local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234")
norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234")
l := AddrList{local, u1, u2l, norm}
sort.Sort(l)
if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}
if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}
if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}
if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}
package swarm
import (
"sync"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)
type dialResult struct {
Conn conn.Conn
Err error
}
type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
success bool
}
func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}
type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)
activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)
func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}
func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
go dl.executeDial(next)
}
}
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token
// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}
// take token
dl.fdConsuming++
}
// take second needed token and start dial!
go dl.executeDial(dj)
}
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}
// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
}
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}
con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
case <-j.ctx.Done():
}
}
package swarm
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
context "golang.org/x/net/context"
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
)
func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}
// in these tests I use addresses with tcp ports over a certain number to
// signify 'good' addresses that will succeed, and addresses below that number
// will fail. This lets us more easily test these different scenarios.
func tcpPortOver(a ma.Multiaddr, n int) bool {
port, err := a.ValueForProtocol(ma.P_TCP)
if err != nil {
panic(err)
}
pnum, err := strconv.Atoi(port)
if err != nil {
panic(err)
}
return pnum > n
}
func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) {
for _, a := range addrs {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: p,
addr: a,
resp: res,
})
}
}
func hangDialFunc(hang chan struct{}) dialfunc {
return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if mafmt.UTP.Matches(a) {
return conn.Conn(nil), nil
}
if tcpPortOver(a, 10) {
return conn.Conn(nil), nil
} else {
<-hang
return nil, fmt.Errorf("test bad dial")
}
}
}
func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
resch := make(chan dialResult)
pid := peer.ID("testpeer")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tryDialAddrs(ctx, l, pid, bads, resch)
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// complete a single hung dial
hang <- struct{}{}
select {
case r := <-resch:
if r.Err == nil {
t.Fatal("should have gotten failed dial result")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial completion")
}
select {
case r := <-resch:
if r.Err != nil {
t.Fatal("expected second result to be success!")
}
case <-time.After(time.Second):
}
}
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
good_tcp := addrWithPort(t, 20)
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
tryDialAddrs(ctx, l, pid, bads, resch)
}
// these dials should work normally, but will hang because we have taken
// up all the fd limiting
for _, pid := range pids {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good_tcp,
resp: resch,
})
}
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}
}
func TestTokenRedistribution(t *testing.T) {
hangchs := make(map[peer.ID]chan struct{})
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 10) {
return (conn.Conn)(nil), nil
} else {
<-hangchs[p]
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 8, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
hangchs[pid] = make(chan struct{})
tryDialAddrs(ctx, l, pid, bads, resch)
}
good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")
// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// unblock one dial for peer 0
hangchs[pids[0]] <- struct{}{}
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case <-resch:
t.Fatal("no more dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// add a bad dial job to peer 0 to fill their rate limiter
// and test that more dials for this peer won't interfere with peer 1's successful dial incoming
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
resp: resch,
})
hangchs[pids[1]] <- struct{}{}
// now one failed dial from peer 1 should get through and fail
// which will in turn unblock the successful dial on peer 1
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have succeeded!")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("should have gotten successful dial")
}
}
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil
} else {
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
......@@ -91,6 +91,8 @@ type Swarm struct {
proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
limiter *dialLimiter
}
// NewSwarm constructs a Swarm, with a Chan.
......@@ -123,6 +125,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
}
s.limiter = newDialLimiter(s.dialAddr)
// configure Swarm
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.
......@@ -156,6 +160,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
}
listenAddrs = filtered
}
return listenAddrs, nil
}
......
package swarm
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"
"time"
......@@ -13,7 +11,6 @@ import (
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
ma "github.com/jbenet/go-multiaddr"
"github.com/jbenet/go-multiaddr-net"
context "golang.org/x/net/context"
)
......@@ -42,6 +39,9 @@ const dialAttempts = 1
// number of concurrent outbound dials over transports that consume file descriptors
const concurrentFdDials = 160
// number of concurrent outbound dials to make per peer
const defaultPerPeerRateLimit = 8
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
......@@ -319,32 +319,40 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}
// get remote peer addrs
remoteAddrs := s.peers.Addrs(p)
// make sure we can use the addresses.
remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
ila, _ := s.InterfaceListenAddresses()
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
if len(remoteAddrs) == 0 {
err := errors.New("peer has no addresses")
logdial["error"] = err
return nil, err
}
remoteAddrs = s.filterAddrs(remoteAddrs)
if len(remoteAddrs) == 0 {
err := errors.New("all adresses for peer have been filtered out")
logdial["error"] = err
return nil, err
subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...)
// get live channel of addresses for peer, filtered by the given filters
/*
remoteAddrChan := s.peers.AddrsChan(ctx, p,
addrutil.AddrUsableFilter,
subtract_filter,
s.Filters.AddrBlocked)
*/
//////
/*
This code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
paddrs := s.peers.Addrs(p)
good_addrs := addrutil.FilterAddrs(paddrs,
addrutil.AddrUsableFunc,
subtract_filter,
addrutil.FilterNeg(s.Filters.AddrBlocked),
)
remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs))
for _, a := range good_addrs {
remoteAddrChan <- a
}
close(remoteAddrChan)
/////////
// try to get a connection to any addr
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
connC, err := s.dialAddrs(ctx, p, remoteAddrChan)
if err != nil {
logdial["error"] = err
return nil, err
......@@ -364,98 +372,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return swarmC, nil
}
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
// sort addresses so preferred addresses are dialed sooner
sort.Sort(AddrList(remoteAddrs))
// try to connect to one of the peer's known addresses.
// we dial concurrently to each of the addresses, which:
// * makes the process faster overall
// * attempts to get the fastest connection available.
// * mitigates the waste of trying bad addresses
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) {
log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func
conns := make(chan conn.Conn)
errs := make(chan error, len(remoteAddrs))
// use a single response type instead of errs and conns, reduces complexity *a ton*
respch := make(chan dialResult)
// dialSingleAddr is used in the rate-limited async thing below.
dialSingleAddr := func(addr ma.Multiaddr) {
// rebind chans in scope so we can nil them out easily
connsout := conns
errsout := errs
defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p)
exitErr := defaultDialFail
connC, err := s.dialAddr(ctx, p, addr)
if err != nil {
connsout = nil
} else if connC == nil {
// NOTE: this really should never happen
log.Errorf("failed to dial %s %s and got no error!", p, addr)
err = fmt.Errorf("failed to dial %s %s", p, addr)
connsout = nil
} else {
errsout = nil
}
// check parent still wants our results
var active int
for {
select {
case <-ctx.Done():
if connC != nil {
connC.Close()
case addr, ok := <-remoteAddrs:
if !ok {
remoteAddrs = nil
if active == 0 {
return nil, exitErr
}
continue
}
case errsout <- err:
case connsout <- connC:
}
}
// this whole thing is in a goroutine so we can use foundConn
// to end early.
go func() {
limiter := make(chan struct{}, 8)
for _, addr := range remoteAddrs {
// returns whatever ratelimiting is acceptable for workerAddr.
// may not rate limit at all.
rl := s.addrDialRateLimit(addr)
select {
case <-ctx.Done(): // our context was cancelled
return
case rl <- struct{}{}:
// take the token, move on
s.limitedDial(ctx, p, addr, respch)
active++
case <-ctx.Done():
if exitErr == defaultDialFail {
exitErr = ctx.Err()
}
select {
case <-ctx.Done(): // our context was cancelled
return
case limiter <- struct{}{}:
// take the token, move on
return nil, exitErr
case resp := <-respch:
active--
if resp.Err != nil {
log.Info("got error on dial: ", resp.Err)
// Errors are normal, lots of dials will fail
exitErr = resp.Err
if remoteAddrs == nil && active == 0 {
return nil, exitErr
}
} else if resp.Conn != nil {
return resp.Conn, nil
}
go func(rlc <-chan struct{}, a ma.Multiaddr) {
dialSingleAddr(a)
<-limiter
<-rlc
}(rl, addr)
}
}()
// wair for the results.
exitErr := fmt.Errorf("failed to dial %s", p)
for range remoteAddrs {
select {
case exitErr = <-errs: //
log.Debug("dial error: ", exitErr)
case connC := <-conns:
// take the first + return asap
return connC, nil
case <-ctx.Done():
// break out and return error
break
}
}
return nil, exitErr
}
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
s.limiter.AddDialJob(&dialJob{
addr: a,
peer: p,
resp: resp,
ctx: ctx,
})
}
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
......@@ -485,16 +459,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con
return connC, nil
}
func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
var out []ma.Multiaddr
for _, a := range addrs {
if !s.Filters.AddrBlocked(a) {
out = append(out, a)
}
}
return out
}
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
......@@ -514,72 +478,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
return swarmC, err
}
// addrDialRateLimit returns a ratelimiting channel for dialing transport
// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited.
func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} {
if isFDCostlyTransport(a) {
return s.fdRateLimit
}
// do not rate limit it at all
return make(chan struct{}, 1)
}
func isFDCostlyTransport(a ma.Multiaddr) bool {
return isTCPMultiaddr(a)
}
func isTCPMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
}
type AddrList []ma.Multiaddr
func (al AddrList) Len() int {
return len(al)
}
func (al AddrList) Swap(i, j int) {
al[i], al[j] = al[j], al[i]
}
func (al AddrList) Less(i, j int) bool {
a := al[i]
b := al[j]
// dial localhost addresses next, they should fail immediately
lba := manet.IsIPLoopback(a)
lbb := manet.IsIPLoopback(b)
if lba {
if !lbb {
return true
}
}
// dial utp and similar 'non-fd-consuming' addresses first
fda := isFDCostlyTransport(a)
fdb := isFDCostlyTransport(b)
if !fda {
if fdb {
return true
}
// if neither consume fd's, assume equal ordering
return false
}
// if 'b' doesnt take a file descriptor
if !fdb {
return false
}
// if 'b' is loopback and both take file descriptors
if lbb {
return false
}
// for the rest, just sort by bytes
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
}
......@@ -304,7 +304,7 @@ func TestAddrBlocking(t *testing.T) {
swarms := makeSwarms(ctx, t, 2)
swarms[0].SetConnHandler(func(conn *Conn) {
t.Fatalf("no connections should happen! -- %s", conn)
t.Errorf("no connections should happen! -- %s", conn)
})
_, block, err := net.ParseCIDR("127.0.0.1/8")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment