swarm_dial.go 19.9 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

tavit ohanian's avatar
tavit ohanian committed
10 11 12
	"gitlab.dms3.io/p2p/go-p2p-core/network"
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
	"gitlab.dms3.io/p2p/go-p2p-core/transport"
Aarsh Shah's avatar
Aarsh Shah committed
13

14 15
	ma "gitlab.dms3.io/mf/go-multiaddr"
	manet "gitlab.dms3.io/mf/go-multiaddr/net"
tavit ohanian's avatar
tavit ohanian committed
16
	addrutil "gitlab.dms3.io/p2p/go-p2p-addrutil"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
31 32
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
	ErrDialBackoff = errors.New("dial backoff")
34 35 36

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
37 38 39 40

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
41 42 43 44 45 46 47 48

	// ErrAllDialsFailed is returned when connecting to a peer has ultimately failed
	ErrAllDialsFailed = errors.New("all dials failed")

	// ErrNoAddresses is returned when we fail to find any addresses for a
	// peer we're trying to dial.
	ErrNoAddresses = errors.New("no addresses")

Aliabbas Merchant's avatar
Aliabbas Merchant committed
49
	// ErrNoGoodAddresses is returned when we find addresses for a peer but
50 51
	// can't use any of them.
	ErrNoGoodAddresses = errors.New("no good addresses")
52 53 54 55

	// ErrGaterDisallowedConnection is returned when the gater prevents us from
	// forming a connection with a peer.
	ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56 57
)

Steven Allen's avatar
Steven Allen committed
58
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
61
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62

Steven Allen's avatar
Steven Allen committed
63 64 65
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
66

Steven Allen's avatar
Steven Allen committed
67 68 69
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
96

Steven Allen's avatar
Steven Allen committed
97 98
// DialBackoff is a type for tracking peer dial backoffs.
//
99
// * It's safe to use its zero value.
Steven Allen's avatar
Steven Allen committed
100 101 102
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Will Scott's avatar
Will Scott committed
103
	entries map[peer.ID]map[string]*backoffAddr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106
	lock    sync.RWMutex
}

Will Scott's avatar
Will Scott committed
107
type backoffAddr struct {
Jeromy's avatar
Jeromy committed
108 109 110 111
	tries int
	until time.Time
}

Will Scott's avatar
Will Scott committed
112
func (db *DialBackoff) init(ctx context.Context) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
	if db.entries == nil {
Will Scott's avatar
Will Scott committed
114
		db.entries = make(map[peer.ID]map[string]*backoffAddr)
Will Scott's avatar
Will Scott committed
115 116 117 118 119 120
	}
	go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
	ticker := time.NewTicker(BackoffMax)
Will Scott's avatar
Will Scott committed
121
	defer ticker.Stop()
Will Scott's avatar
Will Scott committed
122 123 124 125 126 127 128
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			db.cleanup()
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131 132
	}
}

// Backoff returns whether the client should backoff from dialing
Will Scott's avatar
Will Scott committed
133 134
// peer p at address addr
func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
136 137
	defer db.lock.Unlock()

138 139
	ap, found := db.entries[p][string(addr.Bytes())]
	return found && time.Now().Before(ap.until)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141
}

Steven Allen's avatar
Steven Allen committed
142 143 144 145 146 147 148 149
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
150

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152 153
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
154 155 156 157 158 159 160
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
Will Scott's avatar
Will Scott committed
161
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
Will Scott's avatar
Will Scott committed
162
	saddr := string(addr.Bytes())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
164 165 166
	defer db.lock.Unlock()
	bp, ok := db.entries[p]
	if !ok {
167 168
		bp = make(map[string]*backoffAddr, 1)
		db.entries[p] = bp
Will Scott's avatar
Will Scott committed
169
	}
Will Scott's avatar
Will Scott committed
170
	ba, ok := bp[saddr]
Will Scott's avatar
Will Scott committed
171
	if !ok {
Will Scott's avatar
Will Scott committed
172
		bp[saddr] = &backoffAddr{
Jeromy's avatar
Jeromy committed
173
			tries: 1,
Steven Allen's avatar
Steven Allen committed
174
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
175 176 177 178
		}
		return
	}

Will Scott's avatar
Will Scott committed
179
	backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
Steven Allen's avatar
Steven Allen committed
180 181
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
182
	}
Will Scott's avatar
Will Scott committed
183 184
	ba.until = time.Now().Add(backoffTime)
	ba.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185 186 187 188
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
189
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
191
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192 193 194
	delete(db.entries, p)
}

Will Scott's avatar
Will Scott committed
195 196 197 198 199 200 201
func (db *DialBackoff) cleanup() {
	db.lock.Lock()
	defer db.lock.Unlock()
	now := time.Now()
	for p, e := range db.entries {
		good := false
		for _, backoff := range e {
202 203 204 205 206
			backoffTime := BackoffBase + BackoffCoef*time.Duration(backoff.tries*backoff.tries)
			if backoffTime > BackoffMax {
				backoffTime = BackoffMax
			}
			if now.Before(backoff.until.Add(backoffTime)) {
Will Scott's avatar
Will Scott committed
207 208 209 210 211
				good = true
				break
			}
		}
		if !good {
Will Scott's avatar
Will Scott committed
212
			delete(db.entries, p)
Will Scott's avatar
Will Scott committed
213 214 215 216
		}
	}
}

Steven Allen's avatar
Steven Allen committed
217
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219 220 221
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
222
// etc. to achieve connection.
223
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) {
224 225 226 227 228
	if s.gater != nil && !s.gater.InterceptPeerDial(p) {
		log.Debugf("gater disallowed outbound connection to peer %s", p.Pretty())
		return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
	}

Steven Allen's avatar
Steven Allen committed
229 230 231 232 233 234
	// Avoid typed nil issues.
	c, err := s.dialPeer(ctx, p)
	if err != nil {
		return nil, err
	}
	return c, nil
Steven Allen's avatar
Steven Allen committed
235 236 237 238 239 240 241
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
Cory Schwartz's avatar
Cory Schwartz committed
242
	log.Debugw("dialing peer", "from", s.local, "to", p)
243 244 245 246 247
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248 249 250 251
	if p == s.local {
		return nil, ErrDialToSelf
	}

Steven Allen's avatar
Steven Allen committed
252 253 254
	// check if we already have an open (usable) connection first
	conn := s.bestAcceptableConnToPeer(ctx, p)
	if conn != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255 256 257
		return conn, nil
	}

258
	// apply the DialPeer timeout
259
	ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
260 261
	defer cancel()

262
	conn, err = s.dsync.DialLock(ctx, p)
263 264
	if err == nil {
		return conn, nil
Steven Allen's avatar
Steven Allen committed
265
	}
266

Steven Allen's avatar
Steven Allen committed
267
	log.Debugf("network for %s finished dialing %s", s.local, p)
268 269 270 271 272 273 274 275 276 277 278 279

	if ctx.Err() != nil {
		// Context error trumps any dial errors as it was likely the ultimate cause.
		return nil, ctx.Err()
	}

	if s.ctx.Err() != nil {
		// Ok, so the swarm is shutting down.
		return nil, ErrSwarmClosed
	}

	return nil, err
280
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281

282 283 284 285
///////////////////////////////////////////////////////////////////////////////////
// lo and behold, The Dialer
// TODO explain how all this works
//////////////////////////////////////////////////////////////////////////////////
286

287 288 289
type dialRequest struct {
	ctx   context.Context
	resch chan dialResponse
290 291
}

292 293 294
type dialResponse struct {
	conn *Conn
	err  error
295 296
}

vyzo's avatar
vyzo committed
297 298
// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
299
	if p == s.local {
300
		return ErrDialToSelf
Steven Allen's avatar
Steven Allen committed
301 302
	}

303 304
	go s.dialWorkerLoop(ctx, p, reqch)
	return nil
305
}
Steven Allen's avatar
Steven Allen committed
306

307
func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
308
	defer s.limiter.clearAllPeerDials(p)
Steven Allen's avatar
Steven Allen committed
309

310
	type pendRequest struct {
311
		req   dialRequest               // the original request
312 313 314 315 316
		err   *DialError                // dial error accumulator
		addrs map[ma.Multiaddr]struct{} // pending addr dials
	}

	type addrDial struct {
317
		addr     ma.Multiaddr
318 319 320 321
		ctx      context.Context
		conn     *Conn
		err      error
		requests []int
vyzo's avatar
vyzo committed
322
		dialed   bool
323 324 325 326 327 328
	}

	reqno := 0
	requests := make(map[int]*pendRequest)
	pending := make(map[ma.Multiaddr]*addrDial)

329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
	dispatchError := func(ad *addrDial, err error) {
		ad.err = err
		for _, reqno := range ad.requests {
			pr, ok := requests[reqno]
			if !ok {
				// has already been dispatched
				continue
			}

			// accumulate the error
			pr.err.recordErr(ad.addr, err)

			delete(pr.addrs, ad.addr)
			if len(pr.addrs) == 0 {
				// all addrs have erred, dispatch dial error
344 345
				// but first do a last one check in case an acceptable connection has landed from
				// a simultaneous dial that started later and added new acceptable addrs
346
				c := s.bestAcceptableConnToPeer(pr.req.ctx, p)
347
				if c != nil {
348
					pr.req.resch <- dialResponse{conn: c}
349
				} else {
350
					pr.req.resch <- dialResponse{err: pr.err}
351
				}
352 353 354 355 356 357
				delete(requests, reqno)
			}
		}

		ad.requests = nil

358 359 360 361 362 363
		// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
		// this is necessary to support active listen scenarios, where a new dial comes in while
		// another dial is in progress, and needs to do a direct connection without inhibitions from
		// dial backoff.
		// it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff
		// regresses without this.
364 365 366 367 368
		if err == ErrDialBackoff {
			delete(pending, ad.addr)
		}
	}

vyzo's avatar
vyzo committed
369 370
	var triggerDial <-chan struct{}
	triggerNow := make(chan struct{})
371 372
	close(triggerNow)

373 374
	var nextDial []ma.Multiaddr
	active := 0
375 376
	done := false      // true when the request channel has been closed
	connected := false // true when a connection has been successfully established
377

378
	resch := make(chan dialResult)
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396

loop:
	for {
		select {
		case req, ok := <-reqch:
			if !ok {
				// request channel has been closed, wait for pending dials to complete
				if active > 0 {
					done = true
					reqch = nil
					triggerDial = nil
					continue loop
				}

				// no active dials, we are done
				return
			}

397
			c := s.bestAcceptableConnToPeer(req.ctx, p)
398
			if c != nil {
399
				req.resch <- dialResponse{conn: c}
400 401 402
				continue loop
			}

403
			addrs, err := s.addrsForDial(req.ctx, p)
404
			if err != nil {
405
				req.resch <- dialResponse{err: err}
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
				continue loop
			}

			// at this point, len(addrs) > 0 or else it would be error from addrsForDial
			// ranke them to process in order
			addrs = s.rankAddrs(addrs)

			// create the pending request object
			pr := &pendRequest{
				req:   req,
				err:   &DialError{Peer: p},
				addrs: make(map[ma.Multiaddr]struct{}),
			}
			for _, a := range addrs {
				pr.addrs[a] = struct{}{}
			}

			// check if any of the addrs has been successfully dialed and accumulate
			// errors from complete dials while collecting new addrs to dial/join
			var todial []ma.Multiaddr
			var tojoin []*addrDial

			for _, a := range addrs {
				ad, ok := pending[a]
				if !ok {
					todial = append(todial, a)
					continue
				}

				if ad.conn != nil {
					// dial to this addr was successful, complete the request
437
					req.resch <- dialResponse{conn: ad.conn}
438 439 440 441 442 443 444
					continue loop
				}

				if ad.err != nil {
					// dial to this addr errored, accumulate the error
					pr.err.recordErr(a, ad.err)
					delete(pr.addrs, a)
445
					continue
446 447 448 449 450 451 452 453
				}

				// dial is still pending, add to the join list
				tojoin = append(tojoin, ad)
			}

			if len(todial) == 0 && len(tojoin) == 0 {
				// all request applicable addrs have been dialed, we must have errored
454
				req.resch <- dialResponse{err: pr.err}
455 456 457 458 459 460 461 462
				continue loop
			}

			// the request has some pending or new dials, track it and schedule new dials
			reqno++
			requests[reqno] = pr

			for _, ad := range tojoin {
vyzo's avatar
vyzo committed
463
				if !ad.dialed {
464
					ad.ctx = s.mergeDialContexts(ad.ctx, req.ctx)
vyzo's avatar
vyzo committed
465
				}
466 467 468 469 470
				ad.requests = append(ad.requests, reqno)
			}

			if len(todial) > 0 {
				for _, a := range todial {
471
					pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{reqno}}
472 473 474 475 476
				}

				nextDial = append(nextDial, todial...)
				nextDial = s.rankAddrs(nextDial)

477 478
				// trigger a new dial now to account for the new addrs we added
				triggerDial = triggerNow
479 480 481
			}

		case <-triggerDial:
vyzo's avatar
vyzo committed
482
			for _, addr := range nextDial {
483 484 485 486 487 488
				// spawn the dial
				ad := pending[addr]
				err := s.dialNextAddr(ad.ctx, p, addr, resch)
				if err != nil {
					dispatchError(ad, err)
				}
489 490
			}

vyzo's avatar
vyzo committed
491 492
			nextDial = nil
			triggerDial = nil
493 494 495 496

		case res := <-resch:
			active--

497 498 499 500
			if res.Conn != nil {
				connected = true
			}

501
			if done && active == 0 {
502 503 504 505 506 507 508 509 510
				if res.Conn != nil {
					// we got an actual connection, but the dial has been cancelled
					// Should we close it? I think not, we should just add it to the swarm
					_, err := s.addConn(res.Conn, network.DirOutbound)
					if err != nil {
						// well duh, now we have to close it
						res.Conn.Close()
					}
				}
511 512 513
				return
			}

514
			ad := pending[res.Addr]
515

516 517 518 519 520 521 522
			if res.Conn != nil {
				// we got a connection, add it to the swarm
				conn, err := s.addConn(res.Conn, network.DirOutbound)
				if err != nil {
					// oops no, we failed to add it to the swarm
					res.Conn.Close()
					dispatchError(ad, err)
523 524 525
					if active == 0 && len(nextDial) > 0 {
						triggerDial = triggerNow
					}
526 527
					continue loop
				}
528

529 530
				// dispatch to still pending requests
				for _, reqno := range ad.requests {
531 532 533 534 535 536
					pr, ok := requests[reqno]
					if !ok {
						// it has already dispatched a connection
						continue
					}

537
					pr.req.resch <- dialResponse{conn: conn}
538 539 540
					delete(requests, reqno)
				}

541 542 543
				ad.conn = conn
				ad.requests = nil

544 545 546
				continue loop
			}

547
			// it must be an error -- add backoff if applicable and dispatch
548 549 550
			if res.Err != context.Canceled && !connected {
				// we only add backoff if there has not been a successful connection
				// for consistency with the old dialer behavior.
551
				s.backf.AddBackoff(p, res.Addr)
552
			}
553

554
			dispatchError(ad, res.Err)
555 556 557
			if active == 0 && len(nextDial) > 0 {
				triggerDial = triggerNow
			}
Steven Allen's avatar
Steven Allen committed
558
		}
559 560
	}
}
561

562 563 564 565 566 567 568 569 570
func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, error) {
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
		return nil, ErrNoAddresses
	}

	goodAddrs := s.filterKnownUndialables(p, peerAddrs)
	if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect {
		goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
571
	}
572 573 574 575 576 577 578 579

	if len(goodAddrs) == 0 {
		return nil, ErrNoGoodAddresses
	}

	return goodAddrs, nil
}

vyzo's avatar
vyzo committed
580 581 582 583 584 585 586 587 588 589 590 591
func (s *Swarm) mergeDialContexts(a, b context.Context) context.Context {
	dialCtx := a

	if simConnect, reason := network.GetSimultaneousConnect(b); simConnect {
		if simConnect, _ := network.GetSimultaneousConnect(a); !simConnect {
			dialCtx = network.WithSimultaneousConnect(dialCtx, reason)
		}
	}

	return dialCtx
}

592
func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error {
593 594 595
	// check the dial backoff
	if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect {
		if s.backf.Backoff(p, addr) {
596
			return ErrDialBackoff
597 598 599 600
		}
	}

	// start the dial
601
	s.limitedDial(ctx, p, addr, resch)
602

603
	return nil
604 605
}

Steven Allen's avatar
Steven Allen committed
606 607 608 609 610
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Aarsh Shah's avatar
Aarsh Shah committed
611 612 613 614 615
func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return !t.Proxy()
}

616 617 618 619 620
// ranks addresses in descending order of preference for dialing, with the following rules:
// NonRelay > Relay
// NonWS > WS
// Private > Public
// UDP > TCP
621
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
622
	addrTier := func(a ma.Multiaddr) (tier int) {
623
		if isRelayAddr(a) {
624 625
			tier |= 0b1000
		}
626
		if isExpensiveAddr(a) {
627 628 629 630 631
			tier |= 0b0100
		}
		if !manet.IsPrivateAddr(a) {
			tier |= 0b0010
		}
632
		if isFdConsumingAddr(a) {
633 634
			tier |= 0b0001
		}
635

636 637
		return tier
	}
638

639
	tiers := make([][]ma.Multiaddr, 16)
640
	for _, a := range addrs {
641 642
		tier := addrTier(a)
		tiers[tier] = append(tiers[tier], a)
643 644
	}

645 646 647 648
	result := make([]ma.Multiaddr, 0, len(addrs))
	for _, tier := range tiers {
		result = append(result, tier...)
	}
649

650
	return result
651 652
}

653 654 655 656 657
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
658
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
659 660 661 662 663
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
664
		if protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6 {
665 666 667 668 669 670 671 672 673
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
674 675 676
		func(addr ma.Multiaddr) bool {
			return s.gater == nil || s.gater.InterceptAddrDial(p, addr)
		},
677 678 679
	)
}

Jeromy's avatar
Jeromy committed
680 681 682
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
683 684 685 686 687 688 689
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
690 691
}

692
// dialAddr is the actual dial for an addr, indirectly invoked through the limiter
693
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
Steven Allen's avatar
Steven Allen committed
694 695 696 697
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
698 699
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

700 701
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
702
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
703 704
	}

705
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
706
	if err != nil {
707
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
708 709
	}

Steven Allen's avatar
Steven Allen committed
710 711
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
712
		connC.Close()
713
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
714 715
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
716 717 718 719 720
	}

	// success! we got one!
	return connC, nil
}
Aarsh Shah's avatar
Aarsh Shah committed
721

tavit ohanian's avatar
tavit ohanian committed
722
// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-p2p-core/transport.
Aarsh Shah's avatar
Aarsh Shah committed
723 724 725 726 727
// This function checks if any of the transport protocols in the address requires a file descriptor.
// For now:
// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming.
// For a circuit-relay address, we look at the address of the relay server/proxy
// and use the same logic as above to decide.
728
func isFdConsumingAddr(addr ma.Multiaddr) bool {
Aarsh Shah's avatar
Aarsh Shah committed
729 730 731 732 733 734 735 736 737 738 739 740 741
	first, _ := ma.SplitFunc(addr, func(c ma.Component) bool {
		return c.Protocol().Code == ma.P_CIRCUIT
	})

	// for safety
	if first == nil {
		return true
	}

	_, err1 := first.ValueForProtocol(ma.P_TCP)
	_, err2 := first.ValueForProtocol(ma.P_UNIX)
	return err1 == nil || err2 == nil
}
742

743
func isExpensiveAddr(addr ma.Multiaddr) bool {
744 745 746 747 748
	_, err1 := addr.ValueForProtocol(ma.P_WS)
	_, err2 := addr.ValueForProtocol(ma.P_WSS)
	return err1 == nil || err2 == nil
}

749
func isRelayAddr(addr ma.Multiaddr) bool {
750 751 752
	_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
	return err == nil
}