swarm_dial.go 18.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

10 11 12
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/transport"
Aarsh Shah's avatar
Aarsh Shah committed
13 14

	addrutil "github.com/libp2p/go-addr-util"
15 16
	lgbl "github.com/libp2p/go-libp2p-loggables"

Steven Allen's avatar
Steven Allen committed
17
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
18
	ma "github.com/multiformats/go-multiaddr"
19
	manet "github.com/multiformats/go-multiaddr/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
34 35
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
	ErrDialBackoff = errors.New("dial backoff")
37 38 39

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
40 41 42 43

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
44 45 46 47 48 49 50 51

	// ErrAllDialsFailed is returned when connecting to a peer has ultimately failed
	ErrAllDialsFailed = errors.New("all dials failed")

	// ErrNoAddresses is returned when we fail to find any addresses for a
	// peer we're trying to dial.
	ErrNoAddresses = errors.New("no addresses")

Aliabbas Merchant's avatar
Aliabbas Merchant committed
52
	// ErrNoGoodAddresses is returned when we find addresses for a peer but
53 54
	// can't use any of them.
	ErrNoGoodAddresses = errors.New("no good addresses")
55 56 57 58

	// ErrGaterDisallowedConnection is returned when the gater prevents us from
	// forming a connection with a peer.
	ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60
)

Steven Allen's avatar
Steven Allen committed
61
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62 63
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
64
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65

Steven Allen's avatar
Steven Allen committed
66 67 68
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
69

Steven Allen's avatar
Steven Allen committed
70 71 72
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
73

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
99

Steven Allen's avatar
Steven Allen committed
100 101
// DialBackoff is a type for tracking peer dial backoffs.
//
102
// * It's safe to use its zero value.
Steven Allen's avatar
Steven Allen committed
103 104 105
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Will Scott's avatar
Will Scott committed
106
	entries map[peer.ID]map[string]*backoffAddr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109
	lock    sync.RWMutex
}

Will Scott's avatar
Will Scott committed
110
type backoffAddr struct {
Jeromy's avatar
Jeromy committed
111 112 113 114
	tries int
	until time.Time
}

Will Scott's avatar
Will Scott committed
115
func (db *DialBackoff) init(ctx context.Context) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
	if db.entries == nil {
Will Scott's avatar
Will Scott committed
117
		db.entries = make(map[peer.ID]map[string]*backoffAddr)
Will Scott's avatar
Will Scott committed
118 119 120 121 122 123
	}
	go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
	ticker := time.NewTicker(BackoffMax)
Will Scott's avatar
Will Scott committed
124
	defer ticker.Stop()
Will Scott's avatar
Will Scott committed
125 126 127 128 129 130 131
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			db.cleanup()
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135
	}
}

// Backoff returns whether the client should backoff from dialing
Will Scott's avatar
Will Scott committed
136 137
// peer p at address addr
func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
139 140
	defer db.lock.Unlock()

141 142
	ap, found := db.entries[p][string(addr.Bytes())]
	return found && time.Now().Before(ap.until)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143 144
}

Steven Allen's avatar
Steven Allen committed
145 146 147 148 149 150 151 152
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
153

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
157 158 159 160 161 162 163
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
Will Scott's avatar
Will Scott committed
164
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
Will Scott's avatar
Will Scott committed
165
	saddr := string(addr.Bytes())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
167 168 169
	defer db.lock.Unlock()
	bp, ok := db.entries[p]
	if !ok {
170 171
		bp = make(map[string]*backoffAddr, 1)
		db.entries[p] = bp
Will Scott's avatar
Will Scott committed
172
	}
Will Scott's avatar
Will Scott committed
173
	ba, ok := bp[saddr]
Will Scott's avatar
Will Scott committed
174
	if !ok {
Will Scott's avatar
Will Scott committed
175
		bp[saddr] = &backoffAddr{
Jeromy's avatar
Jeromy committed
176
			tries: 1,
Steven Allen's avatar
Steven Allen committed
177
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
178 179 180 181
		}
		return
	}

Will Scott's avatar
Will Scott committed
182
	backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
Steven Allen's avatar
Steven Allen committed
183 184
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
185
	}
Will Scott's avatar
Will Scott committed
186 187
	ba.until = time.Now().Add(backoffTime)
	ba.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189 190 191
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
192
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
194
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195 196 197
	delete(db.entries, p)
}

Will Scott's avatar
Will Scott committed
198 199 200 201 202 203 204
func (db *DialBackoff) cleanup() {
	db.lock.Lock()
	defer db.lock.Unlock()
	now := time.Now()
	for p, e := range db.entries {
		good := false
		for _, backoff := range e {
205 206 207 208 209
			backoffTime := BackoffBase + BackoffCoef*time.Duration(backoff.tries*backoff.tries)
			if backoffTime > BackoffMax {
				backoffTime = BackoffMax
			}
			if now.Before(backoff.until.Add(backoffTime)) {
Will Scott's avatar
Will Scott committed
210 211 212 213 214
				good = true
				break
			}
		}
		if !good {
Will Scott's avatar
Will Scott committed
215
			delete(db.entries, p)
Will Scott's avatar
Will Scott committed
216 217 218 219
		}
	}
}

Steven Allen's avatar
Steven Allen committed
220
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222 223 224
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
225
// etc. to achieve connection.
226
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) {
227 228 229 230 231
	if s.gater != nil && !s.gater.InterceptPeerDial(p) {
		log.Debugf("gater disallowed outbound connection to peer %s", p.Pretty())
		return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
	}

Steven Allen's avatar
Steven Allen committed
232 233 234 235 236 237 238 239 240
	return s.dialPeer(ctx, p)
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
	log.Debugf("[%s] swarm dialing peer [%s]", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
242 243 244 245 246
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
247 248 249 250 251
	if p == s.local {
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
	}

252
	defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253

Steven Allen's avatar
Steven Allen committed
254
	conn := s.bestConnToPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
255 256 257 258 259 260 261
	forceDirect, _ := network.GetForceDirectDial(ctx)
	if forceDirect {
		if isDirectConn(conn) {
			return conn, nil
		}
	} else if conn != nil {
		// check if we already have an open connection first
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262 263 264
		return conn, nil
	}

265
	// apply the DialPeer timeout
266
	ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
267 268
	defer cancel()

269
	conn, err = s.dsync.DialLock(ctx, p)
270 271
	if err == nil {
		return conn, nil
Steven Allen's avatar
Steven Allen committed
272
	}
273

Steven Allen's avatar
Steven Allen committed
274
	log.Debugf("network for %s finished dialing %s", s.local, p)
275 276 277 278 279 280 281 282 283 284 285 286

	if ctx.Err() != nil {
		// Context error trumps any dial errors as it was likely the ultimate cause.
		return nil, ctx.Err()
	}

	if s.ctx.Err() != nil {
		// Ok, so the swarm is shutting down.
		return nil, ErrSwarmClosed
	}

	return nil, err
287
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288

289 290 291
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
Steven Allen's avatar
Steven Allen committed
292 293 294
	// Short circuit.
	// By the time we take the dial lock, we may already *have* a connection
	// to the peer.
Aarsh Shah's avatar
Aarsh Shah committed
295
	forceDirect, _ := network.GetForceDirectDial(ctx)
Steven Allen's avatar
Steven Allen committed
296
	c := s.bestConnToPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
297 298 299 300 301
	if forceDirect {
		if isDirectConn(c) {
			return c, nil
		}
	} else if c != nil {
Steven Allen's avatar
Steven Allen committed
302 303 304
		return c, nil
	}

Steven Allen's avatar
Steven Allen committed
305 306
	logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)

307 308 309
	// ok, we have been charged to dial! let's do it.
	// if it succeeds, dial will add the conn to the swarm itself.
	defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
Steven Allen's avatar
Steven Allen committed
310 311

	conn, err := s.dial(ctx, p)
312
	if err != nil {
Steven Allen's avatar
Steven Allen committed
313
		conn = s.bestConnToPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
314 315 316 317 318 319
		if forceDirect {
			if isDirectConn(conn) {
				log.Debugf("ignoring dial error because we already have a direct connection: %s", err)
				return conn, nil
			}
		} else if conn != nil {
Steven Allen's avatar
Steven Allen committed
320 321 322 323
			// Hm? What error?
			// Could have canceled the dial because we received a
			// connection or some other random reason.
			// Just ignore the error and return the connection.
Aarsh Shah's avatar
Aarsh Shah committed
324
			log.Debugf("ignoring dial error because we already have a connection: %s", err)
Steven Allen's avatar
Steven Allen committed
325 326
			return conn, nil
		}
327

Steven Allen's avatar
Steven Allen committed
328
		// ok, we failed.
329
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
330
	}
331
	return conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332 333
}

Steven Allen's avatar
Steven Allen committed
334 335 336 337 338
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Aarsh Shah's avatar
Aarsh Shah committed
339 340 341 342 343
func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return !t.Proxy()
}

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
// ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
	var localUdpAddrs []ma.Multiaddr // private udp
	var relayUdpAddrs []ma.Multiaddr // relay udp
	var othersUdp []ma.Multiaddr     // public udp

	var localFdAddrs []ma.Multiaddr // private fd consuming
	var relayFdAddrs []ma.Multiaddr //  relay fd consuming
	var othersFd []ma.Multiaddr     // public fd consuming

	for _, a := range addrs {
		if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
			if s.IsFdConsumingAddr(a) {
				relayFdAddrs = append(relayFdAddrs, a)
				continue
			}
			relayUdpAddrs = append(relayUdpAddrs, a)
		} else if manet.IsPrivateAddr(a) {
			if s.IsFdConsumingAddr(a) {
				localFdAddrs = append(localFdAddrs, a)
				continue
			}
			localUdpAddrs = append(localUdpAddrs, a)
		} else {
			if s.IsFdConsumingAddr(a) {
				othersFd = append(othersFd, a)
				continue
			}
			othersUdp = append(othersUdp, a)
		}
	}

	relays := append(relayUdpAddrs, relayFdAddrs...)
	fds := append(localFdAddrs, othersFd...)

	return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
383 384
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
Aarsh Shah's avatar
Aarsh Shah committed
385
	forceDirect, _ := network.GetForceDirectDial(ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
386 387 388 389 390 391 392 393 394
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
	if p == s.local {
		log.Event(ctx, "swarmDialDoDialSelf", logdial)
		return nil, ErrDialToSelf
	}
	defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
	logdial["dial"] = "failure" // start off with failure. set to "success" at the end.

	sk := s.peers.PrivKey(s.local)
Matt Joiner's avatar
Matt Joiner committed
395
	logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
396 397 398 399 400
	if sk == nil {
		// fine for sk to be nil, just log.
		log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

Jeromy's avatar
Jeromy committed
401
	//////
Matt Joiner's avatar
Matt Joiner committed
402 403
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
404
		return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
Matt Joiner's avatar
Matt Joiner committed
405
	}
406
	goodAddrs := s.filterKnownUndialables(p, peerAddrs)
Aarsh Shah's avatar
Aarsh Shah committed
407 408 409
	if forceDirect {
		goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
	}
Matt Joiner's avatar
Matt Joiner committed
410
	if len(goodAddrs) == 0 {
411
		return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
Matt Joiner's avatar
Matt Joiner committed
412
	}
Aarsh Shah's avatar
Aarsh Shah committed
413

Aarsh Shah's avatar
Aarsh Shah committed
414 415 416 417 418 419 420 421 422 423 424
	if !forceDirect {
		/////// Check backoff andnRank addresses
		var nonBackoff bool
		for _, a := range goodAddrs {
			// skip addresses in back-off
			if !s.backf.Backoff(p, a) {
				nonBackoff = true
			}
		}
		if !nonBackoff {
			return nil, ErrDialBackoff
Will Scott's avatar
Will Scott committed
425 426
		}
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427

428
	connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs))
429 430
	if dialErr != nil {
		logdial["error"] = dialErr.Cause.Error()
431 432 433 434
		switch dialErr.Cause {
		case context.Canceled, context.DeadlineExceeded:
			// Always prefer the context errors as we rely on being
			// able to check them.
435 436 437
			//
			// Removing this will BREAK backoff (causing us to
			// backoff when canceling dials).
438
			return nil, dialErr.Cause
439 440
		}
		return nil, dialErr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
441
	}
Steven Allen's avatar
Steven Allen committed
442 443 444 445
	logdial["conn"] = logging.Metadata{
		"localAddr":  connC.LocalMultiaddr(),
		"remoteAddr": connC.RemoteMultiaddr(),
	}
446
	swarmC, err := s.addConn(connC, network.DirOutbound)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447
	if err != nil {
Jeromy's avatar
Jeromy committed
448
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449
		connC.Close() // close the connection. didn't work out :(
450
		return nil, &DialError{Peer: p, Cause: err}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
451 452 453 454 455 456
	}

	logdial["dial"] = "success"
	return swarmC, nil
}

457 458 459 460 461
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
462
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
463 464 465 466 467
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
468
		if protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6 {
469 470 471 472 473 474 475 476 477
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
478 479 480
		func(addr ma.Multiaddr) bool {
			return s.gater == nil || s.gater.InterceptAddrDial(p, addr)
		},
481 482 483
	)
}

Aarsh Shah's avatar
Aarsh Shah committed
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) {
	/*
		This slice-to-chan code is temporary, the peerstore can currently provide
		a channel as an interface for receiving addresses, but more thought
		needs to be put into the execution. For now, this allows us to use
		the improved rate limiter, while maintaining the outward behaviour
		that we previously had (halting a dial when we run out of addrs)
	*/
	var remoteAddrChan chan ma.Multiaddr
	if len(remoteAddrs) > 0 {
		remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs))
		for i := range remoteAddrs {
			remoteAddrChan <- remoteAddrs[i]
		}
		close(remoteAddrChan)
	}

Jeromy's avatar
Jeromy committed
501
	log.Debugf("%s swarm dialing %s", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502 503 504 505

	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // cancel work when we exit func

Jeromy's avatar
Jeromy committed
506 507
	// use a single response type instead of errs and conns, reduces complexity *a ton*
	respch := make(chan dialResult)
508
	err := &DialError{Peer: p}
Jeromy's avatar
Jeromy committed
509

510 511
	defer s.limiter.clearAllPeerDials(p)

Jeromy's avatar
Jeromy committed
512
	var active int
513
dialLoop:
Aarsh Shah's avatar
Aarsh Shah committed
514
	for remoteAddrChan != nil || active > 0 {
515 516 517
		// Check for context cancellations and/or responses first.
		select {
		case <-ctx.Done():
518
			break dialLoop
519 520 521 522
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
Will Scott's avatar
Will Scott committed
523 524 525 526
				if resp.Err != context.Canceled {
					s.backf.AddBackoff(p, resp.Addr)
				}

tg's avatar
tg committed
527
				log.Infof("got error on dial: %s", resp.Err)
528
				err.recordErr(resp.Addr, resp.Err)
529 530 531 532 533 534 535 536 537 538
			} else if resp.Conn != nil {
				return resp.Conn, nil
			}

			// We got a result, try again from the top.
			continue
		default:
		}

		// Now, attempt to dial.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
539
		select {
Aarsh Shah's avatar
Aarsh Shah committed
540
		case addr, ok := <-remoteAddrChan:
Jeromy's avatar
Jeromy committed
541
			if !ok {
Aarsh Shah's avatar
Aarsh Shah committed
542
				remoteAddrChan = nil
Jeromy's avatar
Jeromy committed
543
				continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
544 545
			}

Jeromy's avatar
Jeromy committed
546 547 548
			s.limitedDial(ctx, p, addr, respch)
			active++
		case <-ctx.Done():
549
			break dialLoop
Jeromy's avatar
Jeromy committed
550 551 552 553
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
Will Scott's avatar
Will Scott committed
554 555 556 557
				if resp.Err != context.Canceled {
					s.backf.AddBackoff(p, resp.Addr)
				}

tg's avatar
tg committed
558
				log.Infof("got error on dial: %s", resp.Err)
559
				err.recordErr(resp.Addr, resp.Err)
Jeromy's avatar
Jeromy committed
560 561
			} else if resp.Conn != nil {
				return resp.Conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
562 563 564
			}
		}
	}
tg's avatar
tg committed
565

566 567 568
	if ctxErr := ctx.Err(); ctxErr != nil {
		err.Cause = ctxErr
	} else if len(err.DialErrors) == 0 {
569
		err.Cause = network.ErrNoRemoteAddrs
570 571
	} else {
		err.Cause = ErrAllDialsFailed
tg's avatar
tg committed
572
	}
573
	return nil, err
Jeromy's avatar
Jeromy committed
574 575
}

Jeromy's avatar
Jeromy committed
576 577 578
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
579 580 581 582 583 584 585
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
586 587
}

588
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
Steven Allen's avatar
Steven Allen committed
589 590 591 592
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593 594
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

595 596
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
597
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
598 599
	}

600
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
601
	if err != nil {
602
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
603 604
	}

Steven Allen's avatar
Steven Allen committed
605 606
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
607
		connC.Close()
608
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
609 610
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
611 612 613 614 615
	}

	// success! we got one!
	return connC, nil
}
Aarsh Shah's avatar
Aarsh Shah committed
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636

// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport.
// This function checks if any of the transport protocols in the address requires a file descriptor.
// For now:
// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming.
// For a circuit-relay address, we look at the address of the relay server/proxy
// and use the same logic as above to decide.
func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool {
	first, _ := ma.SplitFunc(addr, func(c ma.Component) bool {
		return c.Protocol().Code == ma.P_CIRCUIT
	})

	// for safety
	if first == nil {
		return true
	}

	_, err1 := first.ValueForProtocol(ma.P_TCP)
	_, err2 := first.ValueForProtocol(ma.P_UNIX)
	return err1 == nil || err2 == nil
}