swarm_dial.go 20.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

10 11 12
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/transport"
Aarsh Shah's avatar
Aarsh Shah committed
13 14

	addrutil "github.com/libp2p/go-addr-util"
15 16
	lgbl "github.com/libp2p/go-libp2p-loggables"

Jeromy's avatar
Jeromy committed
17
	ma "github.com/multiformats/go-multiaddr"
18
	manet "github.com/multiformats/go-multiaddr/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
33 34
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
	ErrDialBackoff = errors.New("dial backoff")
36 37 38

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
39 40 41 42

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
43 44 45 46 47 48 49 50

	// ErrAllDialsFailed is returned when connecting to a peer has ultimately failed
	ErrAllDialsFailed = errors.New("all dials failed")

	// ErrNoAddresses is returned when we fail to find any addresses for a
	// peer we're trying to dial.
	ErrNoAddresses = errors.New("no addresses")

Aliabbas Merchant's avatar
Aliabbas Merchant committed
51
	// ErrNoGoodAddresses is returned when we find addresses for a peer but
52 53
	// can't use any of them.
	ErrNoGoodAddresses = errors.New("no good addresses")
54 55 56 57

	// ErrGaterDisallowedConnection is returned when the gater prevents us from
	// forming a connection with a peer.
	ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59
)

Steven Allen's avatar
Steven Allen committed
60
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
63
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64

Steven Allen's avatar
Steven Allen committed
65 66 67
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
68

Steven Allen's avatar
Steven Allen committed
69 70 71
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
72

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
98

Steven Allen's avatar
Steven Allen committed
99 100
// DialBackoff is a type for tracking peer dial backoffs.
//
101
// * It's safe to use its zero value.
Steven Allen's avatar
Steven Allen committed
102 103 104
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Will Scott's avatar
Will Scott committed
105
	entries map[peer.ID]map[string]*backoffAddr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108
	lock    sync.RWMutex
}

Will Scott's avatar
Will Scott committed
109
type backoffAddr struct {
Jeromy's avatar
Jeromy committed
110 111 112 113
	tries int
	until time.Time
}

Will Scott's avatar
Will Scott committed
114
func (db *DialBackoff) init(ctx context.Context) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
	if db.entries == nil {
Will Scott's avatar
Will Scott committed
116
		db.entries = make(map[peer.ID]map[string]*backoffAddr)
Will Scott's avatar
Will Scott committed
117 118 119 120 121 122
	}
	go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
	ticker := time.NewTicker(BackoffMax)
Will Scott's avatar
Will Scott committed
123
	defer ticker.Stop()
Will Scott's avatar
Will Scott committed
124 125 126 127 128 129 130
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			db.cleanup()
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134
	}
}

// Backoff returns whether the client should backoff from dialing
Will Scott's avatar
Will Scott committed
135 136
// peer p at address addr
func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
138 139
	defer db.lock.Unlock()

140 141
	ap, found := db.entries[p][string(addr.Bytes())]
	return found && time.Now().Before(ap.until)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142 143
}

Steven Allen's avatar
Steven Allen committed
144 145 146 147 148 149 150 151
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
152

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154 155
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
156 157 158 159 160 161 162
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
Will Scott's avatar
Will Scott committed
163
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
Will Scott's avatar
Will Scott committed
164
	saddr := string(addr.Bytes())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
166 167 168
	defer db.lock.Unlock()
	bp, ok := db.entries[p]
	if !ok {
169 170
		bp = make(map[string]*backoffAddr, 1)
		db.entries[p] = bp
Will Scott's avatar
Will Scott committed
171
	}
Will Scott's avatar
Will Scott committed
172
	ba, ok := bp[saddr]
Will Scott's avatar
Will Scott committed
173
	if !ok {
Will Scott's avatar
Will Scott committed
174
		bp[saddr] = &backoffAddr{
Jeromy's avatar
Jeromy committed
175
			tries: 1,
Steven Allen's avatar
Steven Allen committed
176
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
177 178 179 180
		}
		return
	}

Will Scott's avatar
Will Scott committed
181
	backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
Steven Allen's avatar
Steven Allen committed
182 183
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
184
	}
Will Scott's avatar
Will Scott committed
185 186
	ba.until = time.Now().Add(backoffTime)
	ba.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189 190
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
191
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
193
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
194 195 196
	delete(db.entries, p)
}

Will Scott's avatar
Will Scott committed
197 198 199 200 201 202 203
func (db *DialBackoff) cleanup() {
	db.lock.Lock()
	defer db.lock.Unlock()
	now := time.Now()
	for p, e := range db.entries {
		good := false
		for _, backoff := range e {
204 205 206 207 208
			backoffTime := BackoffBase + BackoffCoef*time.Duration(backoff.tries*backoff.tries)
			if backoffTime > BackoffMax {
				backoffTime = BackoffMax
			}
			if now.Before(backoff.until.Add(backoffTime)) {
Will Scott's avatar
Will Scott committed
209 210 211 212 213
				good = true
				break
			}
		}
		if !good {
Will Scott's avatar
Will Scott committed
214
			delete(db.entries, p)
Will Scott's avatar
Will Scott committed
215 216 217 218
		}
	}
}

Steven Allen's avatar
Steven Allen committed
219
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
224
// etc. to achieve connection.
225
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) {
226 227 228 229 230
	if s.gater != nil && !s.gater.InterceptPeerDial(p) {
		log.Debugf("gater disallowed outbound connection to peer %s", p.Pretty())
		return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
	}

Steven Allen's avatar
Steven Allen committed
231 232 233 234 235 236 237 238 239
	return s.dialPeer(ctx, p)
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
	log.Debugf("[%s] swarm dialing peer [%s]", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
241 242 243 244 245
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246 247 248 249 250
	if p == s.local {
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
	}

251
	defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252

Steven Allen's avatar
Steven Allen committed
253 254 255
	// check if we already have an open (usable) connection first
	conn := s.bestAcceptableConnToPeer(ctx, p)
	if conn != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
256 257 258
		return conn, nil
	}

259
	// apply the DialPeer timeout
260
	ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
261 262
	defer cancel()

263
	conn, err = s.dsync.DialLock(ctx, p)
264 265
	if err == nil {
		return conn, nil
Steven Allen's avatar
Steven Allen committed
266
	}
267

Steven Allen's avatar
Steven Allen committed
268
	log.Debugf("network for %s finished dialing %s", s.local, p)
269 270 271 272 273 274 275 276 277 278 279 280

	if ctx.Err() != nil {
		// Context error trumps any dial errors as it was likely the ultimate cause.
		return nil, ctx.Err()
	}

	if s.ctx.Err() != nil {
		// Ok, so the swarm is shutting down.
		return nil, ErrSwarmClosed
	}

	return nil, err
281
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
282

283 284 285 286
///////////////////////////////////////////////////////////////////////////////////
// lo and behold, The Dialer
// TODO explain how all this works
//////////////////////////////////////////////////////////////////////////////////
287

288 289 290
type dialRequest struct {
	ctx   context.Context
	resch chan dialResponse
291 292
}

293 294 295
type dialResponse struct {
	conn *Conn
	err  error
296 297 298
}

// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials
299
func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
300
	if p == s.local {
301
		return ErrDialToSelf
Steven Allen's avatar
Steven Allen committed
302 303
	}

304 305
	go s.dialWorkerLoop(ctx, p, reqch)
	return nil
306
}
Steven Allen's avatar
Steven Allen committed
307

308
func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
309
	defer s.limiter.clearAllPeerDials(p)
Steven Allen's avatar
Steven Allen committed
310

311
	type pendRequest struct {
312
		req   dialRequest               // the original request
313 314 315 316 317
		err   *DialError                // dial error accumulator
		addrs map[ma.Multiaddr]struct{} // pending addr dials
	}

	type addrDial struct {
318
		addr     ma.Multiaddr
319 320 321 322
		ctx      context.Context
		conn     *Conn
		err      error
		requests []int
vyzo's avatar
vyzo committed
323
		dialed   bool
324 325 326 327 328 329
	}

	reqno := 0
	requests := make(map[int]*pendRequest)
	pending := make(map[ma.Multiaddr]*addrDial)

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
	dispatchError := func(ad *addrDial, err error) {
		ad.err = err
		for _, reqno := range ad.requests {
			pr, ok := requests[reqno]
			if !ok {
				// has already been dispatched
				continue
			}

			// accumulate the error
			pr.err.recordErr(ad.addr, err)

			delete(pr.addrs, ad.addr)
			if len(pr.addrs) == 0 {
				// all addrs have erred, dispatch dial error
345 346
				// but first do a last one check in case an acceptable connection has landed from
				// a simultaneous dial that started later and added new acceptable addrs
347
				c := s.bestAcceptableConnToPeer(pr.req.ctx, p)
348
				if c != nil {
349
					pr.req.resch <- dialResponse{conn: c}
350
				} else {
351
					pr.req.resch <- dialResponse{err: pr.err}
352
				}
353 354 355 356 357 358 359 360 361 362 363 364
				delete(requests, reqno)
			}
		}

		ad.requests = nil

		// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests
		if err == ErrDialBackoff {
			delete(pending, ad.addr)
		}
	}

vyzo's avatar
vyzo committed
365 366
	var triggerDial <-chan struct{}
	triggerNow := make(chan struct{})
367 368
	close(triggerNow)

369 370
	var nextDial []ma.Multiaddr
	active := 0
371 372
	done := false      // true when the request channel has been closed
	connected := false // true when a connection has been successfully established
373

374
	resch := make(chan dialResult)
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392

loop:
	for {
		select {
		case req, ok := <-reqch:
			if !ok {
				// request channel has been closed, wait for pending dials to complete
				if active > 0 {
					done = true
					reqch = nil
					triggerDial = nil
					continue loop
				}

				// no active dials, we are done
				return
			}

393
			c := s.bestAcceptableConnToPeer(req.ctx, p)
394
			if c != nil {
395
				req.resch <- dialResponse{conn: c}
396 397 398
				continue loop
			}

399
			addrs, err := s.addrsForDial(req.ctx, p)
400
			if err != nil {
401
				req.resch <- dialResponse{err: err}
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
				continue loop
			}

			// at this point, len(addrs) > 0 or else it would be error from addrsForDial
			// ranke them to process in order
			addrs = s.rankAddrs(addrs)

			// create the pending request object
			pr := &pendRequest{
				req:   req,
				err:   &DialError{Peer: p},
				addrs: make(map[ma.Multiaddr]struct{}),
			}
			for _, a := range addrs {
				pr.addrs[a] = struct{}{}
			}

			// check if any of the addrs has been successfully dialed and accumulate
			// errors from complete dials while collecting new addrs to dial/join
			var todial []ma.Multiaddr
			var tojoin []*addrDial

			for _, a := range addrs {
				ad, ok := pending[a]
				if !ok {
					todial = append(todial, a)
					continue
				}

				if ad.conn != nil {
					// dial to this addr was successful, complete the request
433
					req.resch <- dialResponse{conn: ad.conn}
434 435 436 437 438 439 440
					continue loop
				}

				if ad.err != nil {
					// dial to this addr errored, accumulate the error
					pr.err.recordErr(a, ad.err)
					delete(pr.addrs, a)
441
					continue
442 443 444 445 446 447 448 449
				}

				// dial is still pending, add to the join list
				tojoin = append(tojoin, ad)
			}

			if len(todial) == 0 && len(tojoin) == 0 {
				// all request applicable addrs have been dialed, we must have errored
450
				req.resch <- dialResponse{err: pr.err}
451 452 453 454 455 456 457 458
				continue loop
			}

			// the request has some pending or new dials, track it and schedule new dials
			reqno++
			requests[reqno] = pr

			for _, ad := range tojoin {
vyzo's avatar
vyzo committed
459
				if !ad.dialed {
460
					ad.ctx = s.mergeDialContexts(ad.ctx, req.ctx)
vyzo's avatar
vyzo committed
461
				}
462 463 464 465 466
				ad.requests = append(ad.requests, reqno)
			}

			if len(todial) > 0 {
				for _, a := range todial {
467
					pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{reqno}}
468 469 470 471 472
				}

				nextDial = append(nextDial, todial...)
				nextDial = s.rankAddrs(nextDial)

473 474
				// trigger a new dial now to account for the new addrs we added
				triggerDial = triggerNow
475 476 477
			}

		case <-triggerDial:
478 479 480 481 482 483 484 485 486
			// we dial batches of addresses together, logically belonging to the same batch
			// after a batch of addresses has been dialed, we add a delay before initiating the next batch
			dialed := false
			last := 0
			next := 0
			for i, addr := range nextDial {
				if dialed && !s.sameAddrBatch(nextDial[last], addr) {
					break
				}
487

488
				next = i + 1
489

490 491 492 493 494 495 496 497
				// spawn the dial
				ad := pending[addr]
				err := s.dialNextAddr(ad.ctx, p, addr, resch)
				if err != nil {
					dispatchError(ad, err)
					continue
				}

vyzo's avatar
vyzo committed
498
				ad.dialed = true
499 500 501
				dialed = true
				last = i
				active++
502 503
			}

504 505 506 507 508
			nextDial = nextDial[next:]
			if !dialed || len(nextDial) == 0 {
				// we didn't dial anything because of backoff or we don't have any more addresses
				triggerDial = nil
			}
509 510 511 512

		case res := <-resch:
			active--

513 514 515 516
			if res.Conn != nil {
				connected = true
			}

517
			if done && active == 0 {
518 519 520 521 522 523 524 525 526
				if res.Conn != nil {
					// we got an actual connection, but the dial has been cancelled
					// Should we close it? I think not, we should just add it to the swarm
					_, err := s.addConn(res.Conn, network.DirOutbound)
					if err != nil {
						// well duh, now we have to close it
						res.Conn.Close()
					}
				}
527 528 529
				return
			}

530
			ad := pending[res.Addr]
531

532 533 534 535 536 537 538
			if res.Conn != nil {
				// we got a connection, add it to the swarm
				conn, err := s.addConn(res.Conn, network.DirOutbound)
				if err != nil {
					// oops no, we failed to add it to the swarm
					res.Conn.Close()
					dispatchError(ad, err)
539 540 541
					if active == 0 && len(nextDial) > 0 {
						triggerDial = triggerNow
					}
542 543
					continue loop
				}
544

545 546
				// dispatch to still pending requests
				for _, reqno := range ad.requests {
547 548 549 550 551 552
					pr, ok := requests[reqno]
					if !ok {
						// it has already dispatched a connection
						continue
					}

553
					pr.req.resch <- dialResponse{conn: conn}
554 555 556
					delete(requests, reqno)
				}

557 558 559
				ad.conn = conn
				ad.requests = nil

560 561 562
				continue loop
			}

563
			// it must be an error -- add backoff if applicable and dispatch
564 565 566
			if res.Err != context.Canceled && !connected {
				// we only add backoff if there has not been a successful connection
				// for consistency with the old dialer behavior.
567
				s.backf.AddBackoff(p, res.Addr)
568
			}
569

570
			dispatchError(ad, res.Err)
571 572 573
			if active == 0 && len(nextDial) > 0 {
				triggerDial = triggerNow
			}
Steven Allen's avatar
Steven Allen committed
574
		}
575 576
	}
}
577

578 579 580 581 582 583 584 585 586
func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, error) {
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
		return nil, ErrNoAddresses
	}

	goodAddrs := s.filterKnownUndialables(p, peerAddrs)
	if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect {
		goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
587
	}
588 589 590 591 592 593 594 595

	if len(goodAddrs) == 0 {
		return nil, ErrNoGoodAddresses
	}

	return goodAddrs, nil
}

vyzo's avatar
vyzo committed
596 597 598 599 600 601 602 603 604 605 606 607
func (s *Swarm) mergeDialContexts(a, b context.Context) context.Context {
	dialCtx := a

	if simConnect, reason := network.GetSimultaneousConnect(b); simConnect {
		if simConnect, _ := network.GetSimultaneousConnect(a); !simConnect {
			dialCtx = network.WithSimultaneousConnect(dialCtx, reason)
		}
	}

	return dialCtx
}

608
func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error {
609 610 611
	// check the dial backoff
	if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect {
		if s.backf.Backoff(p, addr) {
612
			return ErrDialBackoff
613 614 615 616
		}
	}

	// start the dial
617
	s.limitedDial(ctx, p, addr, resch)
618

619
	return nil
620 621
}

622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool {
	// is it a relay addr?
	if s.IsRelayAddr(a) {
		return s.IsRelayAddr(b)
	}

	// is it an expensive addr?
	if s.IsExpensiveAddr(a) {
		return s.IsExpensiveAddr(b)
	}

	// is it a public addr?
	if !manet.IsPrivateAddr(a) {
		return !manet.IsPrivateAddr(b) &&
			s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b)
	}

	// it's a private addr
	return manet.IsPrivateAddr(b)
}

Steven Allen's avatar
Steven Allen committed
643 644 645 646 647
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Aarsh Shah's avatar
Aarsh Shah committed
648 649 650 651 652
func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return !t.Proxy()
}

653 654 655 656 657
// ranks addresses in descending order of preference for dialing, with the following rules:
// NonRelay > Relay
// NonWS > WS
// Private > Public
// UDP > TCP
658
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
659 660 661 662 663 664 665 666 667 668 669 670 671
	addrTier := func(a ma.Multiaddr) (tier int) {
		if s.IsRelayAddr(a) {
			tier |= 0b1000
		}
		if s.IsExpensiveAddr(a) {
			tier |= 0b0100
		}
		if !manet.IsPrivateAddr(a) {
			tier |= 0b0010
		}
		if s.IsFdConsumingAddr(a) {
			tier |= 0b0001
		}
672

673 674
		return tier
	}
675

676
	tiers := make([][]ma.Multiaddr, 16)
677
	for _, a := range addrs {
678 679
		tier := addrTier(a)
		tiers[tier] = append(tiers[tier], a)
680 681
	}

682 683 684 685
	result := make([]ma.Multiaddr, 0, len(addrs))
	for _, tier := range tiers {
		result = append(result, tier...)
	}
686

687
	return result
688 689
}

690 691 692 693 694
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
695
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
696 697 698 699 700
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
701
		if protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6 {
702 703 704 705 706 707 708 709 710
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
711 712 713
		func(addr ma.Multiaddr) bool {
			return s.gater == nil || s.gater.InterceptAddrDial(p, addr)
		},
714 715 716
	)
}

Jeromy's avatar
Jeromy committed
717 718 719
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
720 721 722 723 724 725 726
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
727 728
}

729
// dialAddr is the actual dial for an addr, indirectly invoked through the limiter
730
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
Steven Allen's avatar
Steven Allen committed
731 732 733 734
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
735 736
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

737 738
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
739
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
740 741
	}

742
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
743
	if err != nil {
744
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
745 746
	}

Steven Allen's avatar
Steven Allen committed
747 748
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
749
		connC.Close()
750
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
751 752
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
753 754 755 756 757
	}

	// success! we got one!
	return connC, nil
}
Aarsh Shah's avatar
Aarsh Shah committed
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778

// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport.
// This function checks if any of the transport protocols in the address requires a file descriptor.
// For now:
// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming.
// For a circuit-relay address, we look at the address of the relay server/proxy
// and use the same logic as above to decide.
func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool {
	first, _ := ma.SplitFunc(addr, func(c ma.Component) bool {
		return c.Protocol().Code == ma.P_CIRCUIT
	})

	// for safety
	if first == nil {
		return true
	}

	_, err1 := first.ValueForProtocol(ma.P_TCP)
	_, err2 := first.ValueForProtocol(ma.P_UNIX)
	return err1 == nil || err2 == nil
}
779 780 781 782 783 784 785 786 787 788 789

func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool {
	_, err1 := addr.ValueForProtocol(ma.P_WS)
	_, err2 := addr.ValueForProtocol(ma.P_WSS)
	return err1 == nil || err2 == nil
}

func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool {
	_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
	return err == nil
}