swarm_dial.go 13.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

Steven Allen's avatar
Steven Allen committed
10
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
11 12
	addrutil "github.com/libp2p/go-addr-util"
	lgbl "github.com/libp2p/go-libp2p-loggables"
Steven Allen's avatar
Steven Allen committed
13
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
14
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
15
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
16
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
31 32
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
	ErrDialBackoff = errors.New("dial backoff")
34 35 36

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
37 38 39 40

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
41 42 43 44 45 46 47 48 49 50 51

	// ErrAllDialsFailed is returned when connecting to a peer has ultimately failed
	ErrAllDialsFailed = errors.New("all dials failed")

	// ErrNoAddresses is returned when we fail to find any addresses for a
	// peer we're trying to dial.
	ErrNoAddresses = errors.New("no addresses")

	// ErrNoAddresses is returned when we find addresses for a peer but
	// can't use any of them.
	ErrNoGoodAddresses = errors.New("no good addresses")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53
)

Steven Allen's avatar
Steven Allen committed
54
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
57
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58

Steven Allen's avatar
Steven Allen committed
59 60 61
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
62

Steven Allen's avatar
Steven Allen committed
63 64 65
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
66

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
92

Steven Allen's avatar
Steven Allen committed
93 94
// DialBackoff is a type for tracking peer dial backoffs.
//
95
// * It's safe to use its zero value.
Steven Allen's avatar
Steven Allen committed
96 97 98
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Jeromy's avatar
Jeromy committed
99
	entries map[peer.ID]*backoffPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100 101 102
	lock    sync.RWMutex
}

Jeromy's avatar
Jeromy committed
103 104 105 106 107
type backoffPeer struct {
	tries int
	until time.Time
}

Steven Allen's avatar
Steven Allen committed
108
func (db *DialBackoff) init() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	if db.entries == nil {
Jeromy's avatar
Jeromy committed
110
		db.entries = make(map[peer.ID]*backoffPeer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111 112 113 114
	}
}

// Backoff returns whether the client should backoff from dialing
Jeromy's avatar
Jeromy committed
115
// peer p
Steven Allen's avatar
Steven Allen committed
116
func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
118
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119
	db.init()
Jeromy's avatar
Jeromy committed
120 121 122 123 124 125
	bp, found := db.entries[p]
	if found && time.Now().Before(bp.until) {
		return true
	}

	return false
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127
}

Steven Allen's avatar
Steven Allen committed
128 129 130 131 132 133 134 135
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
136

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137 138 139
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
140 141 142 143 144 145 146 147
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
149
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
	db.init()
Jeromy's avatar
Jeromy committed
151 152 153 154
	bp, ok := db.entries[p]
	if !ok {
		db.entries[p] = &backoffPeer{
			tries: 1,
Steven Allen's avatar
Steven Allen committed
155
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
156 157 158 159
		}
		return
	}

Steven Allen's avatar
Steven Allen committed
160 161 162
	backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
163
	}
Steven Allen's avatar
Steven Allen committed
164
	bp.until = time.Now().Add(backoffTime)
Jeromy's avatar
Jeromy committed
165
	bp.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167 168 169
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
170
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
172
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173 174 175 176
	db.init()
	delete(db.entries, p)
}

Steven Allen's avatar
Steven Allen committed
177
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179 180 181
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
182 183 184 185 186 187 188 189 190 191 192
// etc. to achieve connection.
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
	return s.dialPeer(ctx, p)
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
	log.Debugf("[%s] swarm dialing peer [%s]", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
194 195 196 197 198
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200 201 202 203
	if p == s.local {
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
	}

204
	defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205 206

	// check if we already have an open connection first
Steven Allen's avatar
Steven Allen committed
207
	conn := s.bestConnToPeer(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208 209 210 211
	if conn != nil {
		return conn, nil
	}

212 213
	// if this peer has been backed off, lets get out of here
	if s.backf.Backoff(p) {
214
		log.Event(ctx, "swarmDialBackoff", p)
215 216 217
		return nil, ErrDialBackoff
	}

218 219 220 221
	// apply the DialPeer timeout
	ctx, cancel := context.WithTimeout(ctx, inet.GetDialPeerTimeout(ctx))
	defer cancel()

222
	conn, err = s.dsync.DialLock(ctx, p)
Steven Allen's avatar
Steven Allen committed
223 224 225
	if err != nil {
		return nil, err
	}
226

Steven Allen's avatar
Steven Allen committed
227 228
	log.Debugf("network for %s finished dialing %s", s.local, p)
	return conn, err
229
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230

231 232 233
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
Steven Allen's avatar
Steven Allen committed
234 235 236 237 238 239 240 241
	// Short circuit.
	// By the time we take the dial lock, we may already *have* a connection
	// to the peer.
	c := s.bestConnToPeer(p)
	if c != nil {
		return c, nil
	}

Steven Allen's avatar
Steven Allen committed
242 243
	logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)

244 245 246
	// ok, we have been charged to dial! let's do it.
	// if it succeeds, dial will add the conn to the swarm itself.
	defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
Steven Allen's avatar
Steven Allen committed
247 248

	conn, err := s.dial(ctx, p)
249
	if err != nil {
Steven Allen's avatar
Steven Allen committed
250 251 252 253 254 255 256 257 258
		conn = s.bestConnToPeer(p)
		if conn != nil {
			// Hm? What error?
			// Could have canceled the dial because we received a
			// connection or some other random reason.
			// Just ignore the error and return the connection.
			log.Debugf("ignoring dial error because we have a connection: %s", err)
			return conn, nil
		}
Matt Joiner's avatar
Matt Joiner committed
259
		if err != context.Canceled {
260 261 262
			log.Event(ctx, "swarmDialBackoffAdd", logdial)
			s.backf.AddBackoff(p) // let others know to backoff
		}
263

Steven Allen's avatar
Steven Allen committed
264
		// ok, we failed.
265
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266
	}
267
	return conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
268 269
}

Steven Allen's avatar
Steven Allen committed
270 271 272 273 274
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275 276 277 278 279 280 281 282 283 284 285
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
	if p == s.local {
		log.Event(ctx, "swarmDialDoDialSelf", logdial)
		return nil, ErrDialToSelf
	}
	defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
	logdial["dial"] = "failure" // start off with failure. set to "success" at the end.

	sk := s.peers.PrivKey(s.local)
Matt Joiner's avatar
Matt Joiner committed
286
	logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
287 288 289 290 291
	if sk == nil {
		// fine for sk to be nil, just log.
		log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

Jeromy's avatar
Jeromy committed
292 293
	//////
	/*
294
		This slice-to-chan code is temporary, the peerstore can currently provide
Jeromy's avatar
Jeromy committed
295 296 297 298 299
		a channel as an interface for receiving addresses, but more thought
		needs to be put into the execution. For now, this allows us to use
		the improved rate limiter, while maintaining the outward behaviour
		that we previously had (halting a dial when we run out of addrs)
	*/
Matt Joiner's avatar
Matt Joiner committed
300 301
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
302
		return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
Matt Joiner's avatar
Matt Joiner committed
303 304
	}
	goodAddrs := s.filterKnownUndialables(peerAddrs)
Matt Joiner's avatar
Matt Joiner committed
305
	if len(goodAddrs) == 0 {
306
		return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
Matt Joiner's avatar
Matt Joiner committed
307
	}
308
	goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
309
	for _, a := range goodAddrs {
310
		goodAddrsChan <- a
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
	}
312
	close(goodAddrsChan)
Jeromy's avatar
Jeromy committed
313
	/////////
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314 315

	// try to get a connection to any addr
316 317 318 319 320 321 322 323 324 325 326 327
	connC, dialErr := s.dialAddrs(ctx, p, goodAddrsChan)
	if dialErr != nil {
		logdial["error"] = dialErr.Cause.Error()
		if dialErr.Cause == context.Canceled {
			// always prefer the "context canceled" error.
			// we rely on behing able to check `err == context.Canceled`
			//
			// Removing this will BREAK backoff (causing us to
			// backoff when canceling dials).
			return nil, context.Canceled
		}
		return nil, dialErr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
328
	}
Steven Allen's avatar
Steven Allen committed
329 330 331 332
	logdial["conn"] = logging.Metadata{
		"localAddr":  connC.LocalMultiaddr(),
		"remoteAddr": connC.RemoteMultiaddr(),
	}
333
	swarmC, err := s.addConn(connC, inet.DirOutbound)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
334
	if err != nil {
Jeromy's avatar
Jeromy committed
335
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
336
		connC.Close() // close the connection. didn't work out :(
337
		return nil, &DialError{Peer: p, Cause: err}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338 339 340 341 342 343
	}

	logdial["dial"] = "success"
	return swarmC, nil
}

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
func (s *Swarm) filterKnownUndialables(addrs []ma.Multiaddr) []ma.Multiaddr {
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
		if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
		addrutil.FilterNeg(s.Filters.AddrBlocked),
	)
}

369
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.Conn, *DialError) {
Jeromy's avatar
Jeromy committed
370
	log.Debugf("%s swarm dialing %s", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
371 372 373 374

	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // cancel work when we exit func

Jeromy's avatar
Jeromy committed
375 376
	// use a single response type instead of errs and conns, reduces complexity *a ton*
	respch := make(chan dialResult)
377
	err := new(DialError)
Jeromy's avatar
Jeromy committed
378

379 380
	defer s.limiter.clearAllPeerDials(p)

Jeromy's avatar
Jeromy committed
381
	var active int
382
dialLoop:
383 384 385 386
	for remoteAddrs != nil || active > 0 {
		// Check for context cancellations and/or responses first.
		select {
		case <-ctx.Done():
387
			break dialLoop
388 389 390 391
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
tg's avatar
tg committed
392
				log.Infof("got error on dial: %s", resp.Err)
393
				err.recordErr(resp.Addr, resp.Err)
394 395 396 397 398 399 400 401 402 403
			} else if resp.Conn != nil {
				return resp.Conn, nil
			}

			// We got a result, try again from the top.
			continue
		default:
		}

		// Now, attempt to dial.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404
		select {
Jeromy's avatar
Jeromy committed
405 406 407 408
		case addr, ok := <-remoteAddrs:
			if !ok {
				remoteAddrs = nil
				continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
409 410
			}

Jeromy's avatar
Jeromy committed
411 412 413
			s.limitedDial(ctx, p, addr, respch)
			active++
		case <-ctx.Done():
414
			break dialLoop
Jeromy's avatar
Jeromy committed
415 416 417 418
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
tg's avatar
tg committed
419
				log.Infof("got error on dial: %s", resp.Err)
420
				err.recordErr(resp.Addr, resp.Err)
Jeromy's avatar
Jeromy committed
421 422
			} else if resp.Conn != nil {
				return resp.Conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
423 424 425
			}
		}
	}
tg's avatar
tg committed
426

427 428 429 430 431 432
	if ctxErr := ctx.Err(); ctxErr != nil {
		err.Cause = ctxErr
	} else if len(err.DialErrors) == 0 {
		err.Cause = inet.ErrNoRemoteAddrs
	} else {
		err.Cause = ErrAllDialsFailed
tg's avatar
tg committed
433
	}
434
	return nil, err
Jeromy's avatar
Jeromy committed
435 436
}

Jeromy's avatar
Jeromy committed
437 438 439
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
440 441 442 443 444 445 446
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447 448
}

Steven Allen's avatar
Steven Allen committed
449 450 451 452 453
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.Conn, error) {
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
454 455
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

456 457
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
458
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
459 460
	}

461
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
462
	if err != nil {
463
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464 465
	}

Steven Allen's avatar
Steven Allen committed
466 467
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
468
		connC.Close()
469
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
470 471
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
472 473 474 475 476
	}

	// success! we got one!
	return connC, nil
}