swarm_dial.go 13.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

tg's avatar
tg committed
10 11
	"github.com/hashicorp/go-multierror"

Steven Allen's avatar
Steven Allen committed
12
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
13 14
	addrutil "github.com/libp2p/go-addr-util"
	lgbl "github.com/libp2p/go-libp2p-loggables"
Steven Allen's avatar
Steven Allen committed
15
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
16
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
17
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
18
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
33 34
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
	ErrDialBackoff = errors.New("dial backoff")
36 37 38 39 40 41

	// ErrDialFailed is returned when connecting to a peer has ultimately failed
	ErrDialFailed = errors.New("dial attempt failed")

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
42 43 44 45

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47
)

Steven Allen's avatar
Steven Allen committed
48
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
51
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52

Steven Allen's avatar
Steven Allen committed
53 54 55
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
56

Steven Allen's avatar
Steven Allen committed
57 58 59
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
60

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
86

Steven Allen's avatar
Steven Allen committed
87 88
// DialBackoff is a type for tracking peer dial backoffs.
//
89
// * It's safe to use its zero value.
Steven Allen's avatar
Steven Allen committed
90 91 92
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Jeromy's avatar
Jeromy committed
93
	entries map[peer.ID]*backoffPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95 96
	lock    sync.RWMutex
}

Jeromy's avatar
Jeromy committed
97 98 99 100 101
type backoffPeer struct {
	tries int
	until time.Time
}

Steven Allen's avatar
Steven Allen committed
102
func (db *DialBackoff) init() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103
	if db.entries == nil {
Jeromy's avatar
Jeromy committed
104
		db.entries = make(map[peer.ID]*backoffPeer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106 107 108
	}
}

// Backoff returns whether the client should backoff from dialing
Jeromy's avatar
Jeromy committed
109
// peer p
Steven Allen's avatar
Steven Allen committed
110
func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
112
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
	db.init()
Jeromy's avatar
Jeromy committed
114 115 116 117 118 119
	bp, found := db.entries[p]
	if found && time.Now().Before(bp.until) {
		return true
	}

	return false
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
}

Steven Allen's avatar
Steven Allen committed
122 123 124 125 126 127 128 129
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
130

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
134 135 136 137 138 139 140 141
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
143
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
	db.init()
Jeromy's avatar
Jeromy committed
145 146 147 148
	bp, ok := db.entries[p]
	if !ok {
		db.entries[p] = &backoffPeer{
			tries: 1,
Steven Allen's avatar
Steven Allen committed
149
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
150 151 152 153
		}
		return
	}

Steven Allen's avatar
Steven Allen committed
154 155 156
	backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
157
	}
Steven Allen's avatar
Steven Allen committed
158
	bp.until = time.Now().Add(backoffTime)
Jeromy's avatar
Jeromy committed
159
	bp.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161 162 163
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
164
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
166
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167 168 169 170
	db.init()
	delete(db.entries, p)
}

Steven Allen's avatar
Steven Allen committed
171
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172 173 174 175
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
176 177 178 179 180 181 182 183 184 185 186
// etc. to achieve connection.
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
	return s.dialPeer(ctx, p)
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
	log.Debugf("[%s] swarm dialing peer [%s]", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
188 189 190 191 192
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195 196 197
	if p == s.local {
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
	}

198
	defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200

	// check if we already have an open connection first
Steven Allen's avatar
Steven Allen committed
201
	conn := s.bestConnToPeer(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203 204 205
	if conn != nil {
		return conn, nil
	}

206 207
	// if this peer has been backed off, lets get out of here
	if s.backf.Backoff(p) {
208
		log.Event(ctx, "swarmDialBackoff", p)
209 210 211
		return nil, ErrDialBackoff
	}

212 213 214 215
	// apply the DialPeer timeout
	ctx, cancel := context.WithTimeout(ctx, inet.GetDialPeerTimeout(ctx))
	defer cancel()

216
	conn, err = s.dsync.DialLock(ctx, p)
Steven Allen's avatar
Steven Allen committed
217 218 219
	if err != nil {
		return nil, err
	}
220

Steven Allen's avatar
Steven Allen committed
221 222
	log.Debugf("network for %s finished dialing %s", s.local, p)
	return conn, err
223
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224

225 226 227
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
Steven Allen's avatar
Steven Allen committed
228 229 230 231 232 233 234 235
	// Short circuit.
	// By the time we take the dial lock, we may already *have* a connection
	// to the peer.
	c := s.bestConnToPeer(p)
	if c != nil {
		return c, nil
	}

Steven Allen's avatar
Steven Allen committed
236 237
	logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)

238 239 240
	// ok, we have been charged to dial! let's do it.
	// if it succeeds, dial will add the conn to the swarm itself.
	defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
Steven Allen's avatar
Steven Allen committed
241 242

	conn, err := s.dial(ctx, p)
243
	if err != nil {
Steven Allen's avatar
Steven Allen committed
244 245 246 247 248 249 250 251 252
		conn = s.bestConnToPeer(p)
		if conn != nil {
			// Hm? What error?
			// Could have canceled the dial because we received a
			// connection or some other random reason.
			// Just ignore the error and return the connection.
			log.Debugf("ignoring dial error because we have a connection: %s", err)
			return conn, nil
		}
Matt Joiner's avatar
Matt Joiner committed
253
		if err != context.Canceled {
254 255 256
			log.Event(ctx, "swarmDialBackoffAdd", logdial)
			s.backf.AddBackoff(p) // let others know to backoff
		}
257

Steven Allen's avatar
Steven Allen committed
258
		// ok, we failed.
259
		return nil, fmt.Errorf("dial attempt failed: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260
	}
261
	return conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262 263
}

Steven Allen's avatar
Steven Allen committed
264 265 266 267 268
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269 270 271 272 273 274 275 276 277 278 279
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
	if p == s.local {
		log.Event(ctx, "swarmDialDoDialSelf", logdial)
		return nil, ErrDialToSelf
	}
	defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
	logdial["dial"] = "failure" // start off with failure. set to "success" at the end.

	sk := s.peers.PrivKey(s.local)
Matt Joiner's avatar
Matt Joiner committed
280
	logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281 282 283 284 285
	if sk == nil {
		// fine for sk to be nil, just log.
		log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

Jeromy's avatar
Jeromy committed
286 287
	//////
	/*
288
		This slice-to-chan code is temporary, the peerstore can currently provide
Jeromy's avatar
Jeromy committed
289 290 291 292 293
		a channel as an interface for receiving addresses, but more thought
		needs to be put into the execution. For now, this allows us to use
		the improved rate limiter, while maintaining the outward behaviour
		that we previously had (halting a dial when we run out of addrs)
	*/
Matt Joiner's avatar
Matt Joiner committed
294 295 296 297 298
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
		return nil, errors.New("no addresses")
	}
	goodAddrs := s.filterKnownUndialables(peerAddrs)
Matt Joiner's avatar
Matt Joiner committed
299 300 301
	if len(goodAddrs) == 0 {
		return nil, errors.New("no good addresses")
	}
302
	goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
303
	for _, a := range goodAddrs {
304
		goodAddrsChan <- a
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305
	}
306
	close(goodAddrsChan)
Jeromy's avatar
Jeromy committed
307
	/////////
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308 309

	// try to get a connection to any addr
310
	connC, err := s.dialAddrs(ctx, p, goodAddrsChan)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
	if err != nil {
Jeromy's avatar
Jeromy committed
312
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
313 314
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
315 316 317 318
	logdial["conn"] = logging.Metadata{
		"localAddr":  connC.LocalMultiaddr(),
		"remoteAddr": connC.RemoteMultiaddr(),
	}
319
	swarmC, err := s.addConn(connC, inet.DirOutbound)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320
	if err != nil {
Jeromy's avatar
Jeromy committed
321
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
322 323 324 325 326 327 328 329
		connC.Close() // close the connection. didn't work out :(
		return nil, err
	}

	logdial["dial"] = "success"
	return swarmC, nil
}

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
func (s *Swarm) filterKnownUndialables(addrs []ma.Multiaddr) []ma.Multiaddr {
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
		if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
		addrutil.FilterNeg(s.Filters.AddrBlocked),
	)
}

Steven Allen's avatar
Steven Allen committed
355
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.Conn, error) {
Jeromy's avatar
Jeromy committed
356
	log.Debugf("%s swarm dialing %s", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357 358 359 360

	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // cancel work when we exit func

Jeromy's avatar
Jeromy committed
361 362
	// use a single response type instead of errs and conns, reduces complexity *a ton*
	respch := make(chan dialResult)
tg's avatar
tg committed
363
	var dialErrors *multierror.Error
Jeromy's avatar
Jeromy committed
364

365 366
	defer s.limiter.clearAllPeerDials(p)

Jeromy's avatar
Jeromy committed
367
	var active int
368 369 370 371
	for remoteAddrs != nil || active > 0 {
		// Check for context cancellations and/or responses first.
		select {
		case <-ctx.Done():
tg's avatar
tg committed
372 373
			if dialError := dialErrors.ErrorOrNil(); dialError != nil {
				return nil, dialError
374
			}
tg's avatar
tg committed
375 376

			return nil, ctx.Err()
377 378 379 380
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
tg's avatar
tg committed
381 382
				log.Infof("got error on dial: %s", resp.Err)
				dialErrors = multierror.Append(dialErrors, resp.Err)
383 384 385 386 387 388 389 390 391 392
			} else if resp.Conn != nil {
				return resp.Conn, nil
			}

			// We got a result, try again from the top.
			continue
		default:
		}

		// Now, attempt to dial.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
393
		select {
Jeromy's avatar
Jeromy committed
394 395 396 397
		case addr, ok := <-remoteAddrs:
			if !ok {
				remoteAddrs = nil
				continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
398 399
			}

Jeromy's avatar
Jeromy committed
400 401 402
			s.limitedDial(ctx, p, addr, respch)
			active++
		case <-ctx.Done():
tg's avatar
tg committed
403 404
			if dialError := dialErrors.ErrorOrNil(); dialError != nil {
				return nil, dialError
Jeromy's avatar
Jeromy committed
405
			}
tg's avatar
tg committed
406 407

			return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
408 409 410 411
		case resp := <-respch:
			active--
			if resp.Err != nil {
				// Errors are normal, lots of dials will fail
tg's avatar
tg committed
412 413
				log.Infof("got error on dial: %s", resp.Err)
				dialErrors = multierror.Append(dialErrors, resp.Err)
Jeromy's avatar
Jeromy committed
414 415
			} else if resp.Conn != nil {
				return resp.Conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
416 417 418
			}
		}
	}
tg's avatar
tg committed
419 420 421 422 423 424

	if dialError := dialErrors.ErrorOrNil(); dialError != nil {
		return nil, dialError
	}

	return nil, inet.ErrNoRemoteAddrs
Jeromy's avatar
Jeromy committed
425 426
}

Jeromy's avatar
Jeromy committed
427 428 429
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
430 431 432 433 434 435 436
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438
}

Steven Allen's avatar
Steven Allen committed
439 440 441 442 443
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.Conn, error) {
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444 445
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

446 447
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
448
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449 450
	}

451
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
452
	if err != nil {
tg's avatar
tg committed
453
		return nil, fmt.Errorf("%s --> %s (%s) dial attempt failed: %s", s.local, p, addr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
454 455
	}

Steven Allen's avatar
Steven Allen committed
456 457
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
458
		connC.Close()
459
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
460 461
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
462 463 464 465 466
	}

	// success! we got one!
	return connC, nil
}